Commit 64bea9bf authored by Wei Shoulin's avatar Wei Shoulin
Browse files

un level0

parent 535e15bd
...@@ -42,8 +42,10 @@ class Result(dict): ...@@ -42,8 +42,10 @@ class Result(dict):
return self return self
class Request(object): class Request(object):
def __init__(self): def __init__(self, **kvargs):
super(Request, self).__init__() super(Request, self).__init__()
for k, v in kvargs.items():
self.append(k, v)
@staticmethod @staticmethod
def from_dict(d: dict): def from_dict(d: dict):
...@@ -54,4 +56,12 @@ class Request(object): ...@@ -54,4 +56,12 @@ class Request(object):
def append(self, k: str, v): def append(self, k: str, v):
self.__setattr__(k, v) self.__setattr__(k, v)
return self return self
\ No newline at end of file
def __getattr__(self, attr):
try:
return object.__getattribute__(self, attr)
except:
return ''
\ No newline at end of file
import dataclasses import dataclasses
from typing import Dict from typing import Dict
from .common import BaseModel, default_field from .common import BaseModel, default_field
@dataclasses.dataclass
class Level0PrcRecord(BaseModel):
id: int = 0
level0_id: str = ""
pipeline_id: str = ""
prc_module: str = ""
params_file_path: str=""
prc_status: int = 0
prc_time: str=""
result_file_path: str=""
@dataclasses.dataclass
class Level0Record(BaseModel):
id: int = 0
level0_id: str = ""
obs_id: str = ""
detector_no: str = ""
obs_type: str = ""
obs_time: str=""
exp_time: float = 0
detector_status_id: int = 0
filename: str=""
file_path: str=""
qc0_status: int = 0
qc0_time: str=""
prc_status: int = 0
prc_time: str=""
create_time: str=""
header: Dict[str,object] = default_field({})
@dataclasses.dataclass
class CalMergeRecord(BaseModel):
id: int = 0
cal_id: str = ""
detector_no: str = ""
ref_type: str = ""
obs_time: str=""
exp_time: float = 0
filename: str=""
file_path: str=""
qc1_status: int = 0
qc1_time: str=""
prc_status: int = 0
prc_time: str=""
prc_time: str=""
create_time: str=""
level0_ids: list = dataclasses.field(default_factory=list)
@dataclasses.dataclass @dataclasses.dataclass
class Level1Record(BaseModel): class Level1Record(BaseModel):
......
import dataclasses import dataclasses
from typing import List from typing import List,Dict
from .common import BaseModel from .common import BaseModel, default_field
@dataclasses.dataclass @dataclasses.dataclass
class Observation(BaseModel): class Observation(BaseModel):
...@@ -90,3 +90,52 @@ class Level2ProducerRuning(BaseModel): ...@@ -90,3 +90,52 @@ class Level2ProducerRuning(BaseModel):
prc_result: str="" prc_result: str=""
create_time: str="" create_time: str=""
update_time: str="" update_time: str=""
@dataclasses.dataclass
class Level0PrcRecord(BaseModel):
id: int = 0
level0_id: str = ""
pipeline_id: str = ""
prc_module: str = ""
params_file_path: str=""
prc_status: int = 0
prc_time: str=""
result_file_path: str=""
@dataclasses.dataclass
class Level0Record(BaseModel):
id: int = 0
level0_id: str = ""
obs_id: str = ""
detector_no: str = ""
obs_type: str = ""
obs_time: str=""
exp_time: float = 0
detector_status_id: int = 0
filename: str=""
file_path: str=""
qc0_status: int = 0
qc0_time: str=""
prc_status: int = 0
prc_time: str=""
create_time: str=""
header: Dict[str,object] = default_field({})
@dataclasses.dataclass
class CalMergeRecord(BaseModel):
id: int = 0
cal_id: str = ""
detector_no: str = ""
ref_type: str = ""
obs_time: str=""
exp_time: float = 0
filename: str=""
file_path: str=""
qc1_status: int = 0
qc1_time: str=""
prc_status: int = 0
prc_time: str=""
prc_time: str=""
create_time: str=""
level0_ids: list = dataclasses.field(default_factory=list)
\ No newline at end of file
...@@ -2,53 +2,6 @@ import dataclasses ...@@ -2,53 +2,6 @@ import dataclasses
from typing import Dict from typing import Dict
from .common import BaseModel, default_field from .common import BaseModel, default_field
@dataclasses.dataclass
class Level0PrcRecord(BaseModel):
id: int = 0
level0_id: str = ""
pipeline_id: str = ""
prc_module: str = ""
params_file_path: str=""
prc_status: int = 0
prc_time: str=""
result_file_path: str=""
@dataclasses.dataclass
class Level0Record(BaseModel):
id: int = 0
level0_id: str = ""
obs_id: str = ""
detector_no: str = ""
obs_type: str = ""
obs_time: str=""
exp_time: float = 0
detector_status_id: int = 0
filename: str=""
file_path: str=""
qc0_status: int = 0
qc0_time: str=""
prc_status: int = 0
prc_time: str=""
create_time: str=""
header: Dict[str,object] = default_field({})
@dataclasses.dataclass
class CalMergeRecord(BaseModel):
id: int = 0
cal_id: str = ""
detector_no: str = ""
ref_type: str = ""
obs_time: str=""
exp_time: float = 0
filename: str=""
file_path: str=""
qc1_status: int = 0
qc1_time: str=""
prc_status: int = 0
prc_time: str=""
prc_time: str=""
create_time: str=""
level0_ids: list = dataclasses.field(default_factory=list)
@dataclasses.dataclass @dataclasses.dataclass
class Level1Record(BaseModel): class Level1Record(BaseModel):
id: int = 0 id: int = 0
......
...@@ -2,53 +2,6 @@ import dataclasses ...@@ -2,53 +2,6 @@ import dataclasses
from typing import Dict from typing import Dict
from .common import BaseModel, default_field from .common import BaseModel, default_field
@dataclasses.dataclass
class Level0PrcRecord(BaseModel):
id: int = 0
level0_id: str = ""
pipeline_id: str = ""
prc_module: str = ""
params_file_path: str=""
prc_status: int = 0
prc_time: str=""
result_file_path: str=""
@dataclasses.dataclass
class Level0Record(BaseModel):
id: int = 0
level0_id: str = ""
obs_id: str = ""
detector_no: str = ""
obs_type: str = ""
obs_time: str=""
exp_time: float = 0
detector_status_id: int = 0
filename: str=""
file_path: str=""
qc0_status: int = 0
qc0_time: str=""
prc_status: int = 0
prc_time: str=""
create_time: str=""
header: Dict[str,object] = default_field({})
@dataclasses.dataclass
class CalMergeRecord(BaseModel):
id: int = 0
cal_id: str = ""
detector_no: str = ""
ref_type: str = ""
obs_time: str=""
exp_time: float = 0
filename: str=""
file_path: str=""
qc1_status: int = 0
qc1_time: str=""
prc_status: int = 0
prc_time: str=""
prc_time: str=""
create_time: str=""
level0_ids: list = dataclasses.field(default_factory=list)
@dataclasses.dataclass @dataclasses.dataclass
class Level1Record(BaseModel): class Level1Record(BaseModel):
id: int = 0 id: int = 0
......
...@@ -2,54 +2,6 @@ import dataclasses ...@@ -2,54 +2,6 @@ import dataclasses
from typing import Dict from typing import Dict
from .common import BaseModel, default_field from .common import BaseModel, default_field
@dataclasses.dataclass
class Level0PrcRecord(BaseModel):
id: int = 0
level0_id: str = ""
pipeline_id: str = ""
prc_module: str = ""
params_file_path: str=""
prc_status: int = 0
prc_time: str=""
result_file_path: str=""
@dataclasses.dataclass
class Level0Record(BaseModel):
id: int = 0
level0_id: str = ""
obs_id: str = ""
detector_no: str = ""
obs_type: str = ""
obs_time: str=""
exp_time: float = 0
detector_status_id: int = 0
filename: str=""
file_path: str=""
qc0_status: int = 0
qc0_time: str=""
prc_status: int = 0
prc_time: str=""
create_time: str=""
header: Dict[str,object] = default_field({})
@dataclasses.dataclass
class CalMergeRecord(BaseModel):
id: int = 0
cal_id: str = ""
detector_no: str = ""
ref_type: str = ""
obs_time: str=""
exp_time: float = 0
filename: str=""
file_path: str=""
qc1_status: int = 0
qc1_time: str=""
prc_status: int = 0
prc_time: str=""
prc_time: str=""
create_time: str=""
level0_ids: list = dataclasses.field(default_factory=list)
@dataclasses.dataclass @dataclasses.dataclass
class Level1Record(BaseModel): class Level1Record(BaseModel):
id: int = 0 id: int = 0
......
...@@ -2,54 +2,6 @@ import dataclasses ...@@ -2,54 +2,6 @@ import dataclasses
from typing import Dict from typing import Dict
from .common import BaseModel, default_field from .common import BaseModel, default_field
@dataclasses.dataclass
class Level0PrcRecord(BaseModel):
id: int = 0
level0_id: str = ""
pipeline_id: str = ""
prc_module: str = ""
params_file_path: str=""
prc_status: int = 0
prc_time: str=""
result_file_path: str=""
@dataclasses.dataclass
class Level0Record(BaseModel):
id: int = 0
level0_id: str = ""
obs_id: str = ""
detector_no: str = ""
obs_type: str = ""
obs_time: str=""
exp_time: float = 0
detector_status_id: int = 0
filename: str=""
file_path: str=""
qc0_status: int = 0
qc0_time: str=""
prc_status: int = 0
prc_time: str=""
create_time: str=""
header: Dict[str,object] = default_field({})
@dataclasses.dataclass
class CalMergeRecord(BaseModel):
id: int = 0
cal_id: str = ""
detector_no: str = ""
ref_type: str = ""
obs_time: str=""
exp_time: float = 0
filename: str=""
file_path: str=""
qc1_status: int = 0
qc1_time: str=""
prc_status: int = 0
prc_time: str=""
prc_time: str=""
create_time: str=""
level0_ids: list = dataclasses.field(default_factory=list)
@dataclasses.dataclass @dataclasses.dataclass
class Level1Record(BaseModel): class Level1Record(BaseModel):
id: int = 0 id: int = 0
......
import unittest
from astropy.io import fits
from csst_dfs_commons.models import Request
class CommonModelsTestCase(unittest.TestCase):
def setUp(self):
pass
def test_request_result(self):
r = Request(id = 1)
assert r.id == 1, "not equal"
assert r.xxx == '', "not equal"
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment