diff --git a/.hypothesis/unicode_data/11.0.0/charmap.json.gz b/.hypothesis/unicode_data/11.0.0/charmap.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..bf6877bbcc6737421c21ad79740b7dce16aa3947 Binary files /dev/null and b/.hypothesis/unicode_data/11.0.0/charmap.json.gz differ diff --git a/csst_dfs_api_cluster/facility/__init__.py b/csst_dfs_api_cluster/facility/__init__.py index 4a31a70cbda269bff6b387fe339d379ccdcf6ef9..30e9e4f7d27e58721fae07fb0075ee7c43d6e3d7 100644 --- a/csst_dfs_api_cluster/facility/__init__.py +++ b/csst_dfs_api_cluster/facility/__init__.py @@ -2,4 +2,5 @@ from .calmerge import CalMergeApi from .detector import DetectorApi from .level0 import Level0DataApi from .level0prc import Level0PrcApi +from .level1prc import Level1PrcApi from .observation import ObservationApi \ No newline at end of file diff --git a/csst_dfs_api_cluster/facility/level1prc.py b/csst_dfs_api_cluster/facility/level1prc.py new file mode 100644 index 0000000000000000000000000000000000000000..621c55e941a8d65d84b471d53a6faed31051f982 --- /dev/null +++ b/csst_dfs_api_cluster/facility/level1prc.py @@ -0,0 +1,103 @@ +import grpc + +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.common import from_proto_model_list +from csst_dfs_commons.models.facility import Level1PrcRecord + +from csst_dfs_proto.facility.level1prc import level1prc_pb2, level1prc_pb2_grpc + +from ..common.service import ServiceProxy +from ..common.utils import * + +class Level1PrcApi(object): + def __init__(self): + self.stub = level1prc_pb2_grpc.Level1PrcSrvStub(ServiceProxy().channel()) + + def find(self, **kwargs): + ''' retrieve level1 procedure records from database + + parameter kwargs: + level1_id: [str] + pipeline_id: [str] + prc_module: [str] + prc_status : [int] + + return: csst_dfs_common.models.Result + ''' + try: + resp, _ = self.stub.Find.with_call(level1prc_pb2.FindLevel1PrcReq( + level1_id = get_parameter(kwargs, "level1_id"), + pipeline_id = get_parameter(kwargs, "pipeline_id"), + prc_module = get_parameter(kwargs, "prc_module"), + prc_status = get_parameter(kwargs, "prc_status"), + other_conditions = {"test":"cnlab.test"} + ),metadata = get_auth_headers()) + + if resp.success: + return Result.ok_data(data = from_proto_model_list(Level1PrcRecord, resp.records)).append("totalCount", resp.totalCount) + else: + return Result.error(message = str(resp.error.detail)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + status : [int] + + return csst_dfs_common.models.Result + ''' + id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + + try: + resp,_ = self.stub.UpdateProcStatus.with_call( + level1prc_pb2.UpdateProcStatusReq(id=id, status=status), + metadata = get_auth_headers() + ) + if resp.success: + return Result.ok_data() + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def write(self, **kwargs): + ''' insert a level1 procedure record into database + + parameter kwargs: + level1_id : [int] + pipeline_id : [str] + prc_module : [str] + params_file_path : [str] + prc_status : [int] + prc_time : [str] + result_file_path : [str] + return csst_dfs_common.models.Result + ''' + + rec = level1prc_pb2.Level1PrcRecord( + id = 0, + level1_id = get_parameter(kwargs, "level1_id"), + pipeline_id = get_parameter(kwargs, "pipeline_id"), + prc_module = get_parameter(kwargs, "prc_module"), + params_file_path = get_parameter(kwargs, "params_file_path"), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time"), + result_file_path = get_parameter(kwargs, "result_file_path") + ) + req = level1prc_pb2.WriteLevel1PrcReq(record = rec) + try: + resp,_ = self.stub.Write.with_call(req,metadata = get_auth_headers()) + if resp.success: + return Result.ok_data(data = Level1PrcRecord().from_proto_model(resp.record)) + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + + diff --git a/csst_dfs_api_cluster/ifs/level1.py b/csst_dfs_api_cluster/ifs/level1.py index 09d67f573b0ea5b217230ac4ddbc70817ac7f7bc..8c4380cd97d3359218e58a8d9cee5200f4b636d2 100644 --- a/csst_dfs_api_cluster/ifs/level1.py +++ b/csst_dfs_api_cluster/ifs/level1.py @@ -24,7 +24,6 @@ class Level1DataApi(object): parameter kwargs: level0_id: [str] data_type: [str] - obs_type: [str] create_time : (start, end), qc1_status : [int], prc_status : [int], @@ -69,7 +68,7 @@ class Level1DataApi(object): ),metadata = get_auth_headers()) if resp.record is None or resp.record.id == 0: - return Result.error(message=f"id:{id} not found") + return Result.error(message=f"id:{fits_id} not found") return Result.ok_data(data=Level1Record().from_proto_model(resp.record)) @@ -128,17 +127,12 @@ class Level1DataApi(object): data_type : [str] cor_sci_id : [int] prc_params : [str] - flat_id : [int] - dark_id : [int] - bias_id : [int] - lamp_id : [int] - arc_id : [int] - sky_id : [int] filename : [str] file_path : [str] prc_status : [int] prc_time : [str] pipeline_id : [str] + refs: [dict] return csst_dfs_common.models.Result ''' @@ -149,17 +143,12 @@ class Level1DataApi(object): data_type = get_parameter(kwargs, "data_type"), cor_sci_id = get_parameter(kwargs, "cor_sci_id"), prc_params = get_parameter(kwargs, "prc_params"), - flat_id = get_parameter(kwargs, "flat_id"), - dark_id = get_parameter(kwargs, "dark_id"), - bias_id = get_parameter(kwargs, "bias_id"), - lamp_id = get_parameter(kwargs, "lamp_id"), - arc_id = get_parameter(kwargs, "arc_id"), - sky_id = get_parameter(kwargs, "sky_id"), filename = get_parameter(kwargs, "filename"), file_path = get_parameter(kwargs, "file_path"), prc_status = get_parameter(kwargs, "prc_status", -1), prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id") + pipeline_id = get_parameter(kwargs, "pipeline_id"), + refs = get_parameter(kwargs, "refs", {}) ) def stream(rec): with open(rec.file_path, 'rb') as f: diff --git a/csst_dfs_api_cluster/msc/level1.py b/csst_dfs_api_cluster/msc/level1.py index 6ec3823cae4758bcd76ce8658fc12b2f34cfb310..45ea553d5c52e9b65530139c4e9abf755f7f3cc8 100644 --- a/csst_dfs_api_cluster/msc/level1.py +++ b/csst_dfs_api_cluster/msc/level1.py @@ -24,7 +24,6 @@ class Level1DataApi(object): parameter kwargs: level0_id: [str] data_type: [str] - obs_type: [str] create_time : (start, end), qc1_status : [int], prc_status : [int], @@ -69,7 +68,7 @@ class Level1DataApi(object): ),metadata = get_auth_headers()) if resp.record is None or resp.record.id == 0: - return Result.error(message=f"id:{id} not found") + return Result.error(message=f"id:{fits_id} not found") return Result.ok_data(data = Level1Record().from_proto_model(resp.record)) @@ -128,14 +127,12 @@ class Level1DataApi(object): data_type : [str] cor_sci_id : [int] prc_params : [str] - flat_id : [int] - dark_id : [int] - bias_id : [int] filename : [str] file_path : [str] prc_status : [int] prc_time : [str] pipeline_id : [str] + refs: [dict] return csst_dfs_common.models.Result ''' @@ -146,14 +143,12 @@ class Level1DataApi(object): data_type = get_parameter(kwargs, "data_type"), cor_sci_id = get_parameter(kwargs, "cor_sci_id"), prc_params = get_parameter(kwargs, "prc_params"), - flat_id = get_parameter(kwargs, "flat_id"), - dark_id = get_parameter(kwargs, "dark_id"), - bias_id = get_parameter(kwargs, "bias_id"), filename = get_parameter(kwargs, "filename", ""), file_path = get_parameter(kwargs, "file_path", ""), prc_status = get_parameter(kwargs, "prc_status", -1), prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id") + pipeline_id = get_parameter(kwargs, "pipeline_id"), + refs = get_parameter(kwargs, "refs", {}) ) def stream(rec): with open(rec.file_path, 'rb') as f: diff --git a/csst_dfs_api_cluster/sls/__init__.py b/csst_dfs_api_cluster/sls/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..819de80eeba5ef22911a8443de66f86fe63b39d8 --- /dev/null +++ b/csst_dfs_api_cluster/sls/__init__.py @@ -0,0 +1,2 @@ +from .level1 import Level1DataApi +from .level2spectra import Level2SpectraApi \ No newline at end of file diff --git a/csst_dfs_api_cluster/sls/level1.py b/csst_dfs_api_cluster/sls/level1.py new file mode 100644 index 0000000000000000000000000000000000000000..0c4e1d1cda55257def16ee432ecc465ca2e1a463 --- /dev/null +++ b/csst_dfs_api_cluster/sls/level1.py @@ -0,0 +1,172 @@ +import os +import grpc +import datetime + +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.common import from_proto_model_list +from csst_dfs_commons.models.sls import Level1Record +from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE +from csst_dfs_proto.sls.level1 import level1_pb2, level1_pb2_grpc + +from ..common.service import ServiceProxy +from ..common.utils import * + +class Level1DataApi(object): + """ + Level1 Data Operation Class + """ + def __init__(self): + self.stub = level1_pb2_grpc.Level1SrvStub(ServiceProxy().channel()) + + def find(self, **kwargs): + ''' retrieve level1 records from database + + parameter kwargs: + level0_id: [str] + data_type: [str] + create_time : (start, end), + qc1_status : [int], + prc_status : [int], + filename: [str] + limit: limits returns the number of records,default 0:no-limit + + return: csst_dfs_common.models.Result + ''' + try: + resp, _ = self.stub.Find.with_call(level1_pb2.FindLevel1Req( + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type"), + create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], + create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], + qc1_status = get_parameter(kwargs, "qc1_status"), + prc_status = get_parameter(kwargs, "prc_status"), + filename = get_parameter(kwargs, "filename"), + limit = get_parameter(kwargs, "limit", 0), + other_conditions = {"test":"cnlab.test"} + ),metadata = get_auth_headers()) + + if resp.success: + return Result.ok_data(data=from_proto_model_list(Level1Record, resp.records)).append("totalCount", resp.totalCount) + else: + return Result.error(message = str(resp.error.detail)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def get(self, **kwargs): + ''' fetch a record from database + + parameter kwargs: + id : [int] + + return csst_dfs_common.models.Result + ''' + try: + fits_id = get_parameter(kwargs, "id") + resp, _ = self.stub.Get.with_call(level1_pb2.GetLevel1Req( + id = fits_id + ),metadata = get_auth_headers()) + + if resp.record is None or resp.record.id == 0: + return Result.error(message=f"id:{fits_id} not found") + + return Result.ok_data(data = Level1Record().from_proto_model(resp.record)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + status : [int] + + return csst_dfs_common.models.Result + ''' + fits_id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + try: + resp,_ = self.stub.UpdateProcStatus.with_call( + level1_pb2.UpdateProcStatusReq(id=fits_id, status=status), + metadata = get_auth_headers() + ) + if resp.success: + return Result.ok_data() + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def update_qc1_status(self, **kwargs): + ''' update the status of QC0 + + parameter kwargs: + id : [int], + status : [int] + ''' + fits_id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + try: + resp,_ = self.stub.UpdateQc1Status.with_call( + level1_pb2.UpdateQc1StatusReq(id=fits_id, status=status), + metadata = get_auth_headers() + ) + if resp.success: + return Result.ok_data() + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def write(self, **kwargs): + ''' insert a level1 record into database + + parameter kwargs: + level0_id : [str] + data_type : [str] + prc_params : [str] + filename : [str] + file_path : [str] + prc_status : [int] + prc_time : [str] + pipeline_id : [str] + refs: [dict] + + return csst_dfs_common.models.Result + ''' + + rec = level1_pb2.Level1Record( + id = 0, + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type"), + prc_params = get_parameter(kwargs, "prc_params"), + filename = get_parameter(kwargs, "filename", ""), + file_path = get_parameter(kwargs, "file_path", ""), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), + pipeline_id = get_parameter(kwargs, "pipeline_id"), + refs = get_parameter(kwargs, "refs", {}) + ) + def stream(rec): + with open(rec.file_path, 'rb') as f: + while True: + data = f.read(UPLOAD_CHUNK_SIZE) + if not data: + break + yield level1_pb2.WriteLevel1Req(record = rec, data = data) + try: + if not rec.file_path: + return Result.error(message="file_path is blank") + if not os.path.exists(rec.file_path): + return Result.error(message="the file [%s] not existed" % (rec.file_path, )) + if not rec.filename: + rec.filename = os.path.basename(rec.file_path) + + resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) + if resp.success: + return Result.ok_data(data=Level1Record().from_proto_model(resp.record)) + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) diff --git a/csst_dfs_api_cluster/sls/level2spectra.py b/csst_dfs_api_cluster/sls/level2spectra.py new file mode 100644 index 0000000000000000000000000000000000000000..0de6bbea42aabb157b4f5b535de842316326fce5 --- /dev/null +++ b/csst_dfs_api_cluster/sls/level2spectra.py @@ -0,0 +1,171 @@ +import os +import grpc +import datetime + +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.common import from_proto_model_list +from csst_dfs_commons.models.sls import Level2Spectra +from csst_dfs_commons.models.constants import UPLOAD_CHUNK_SIZE +from csst_dfs_proto.sls.level2spectra import level2spectra_pb2, level2spectra_pb2_grpc + +from ..common.service import ServiceProxy +from ..common.utils import * + +class Level2SpectraApi(object): + """ + Level2spectra Data Operation Class + """ + def __init__(self): + self.stub = level2spectra_pb2_grpc.Level2spectraSrvStub(ServiceProxy().channel()) + + def find(self, **kwargs): + ''' retrieve level2spectra records from database + + :param kwargs: Parameter dictionary, key items support: + level1_id: [int] + spectra_id: [str] + create_time : (start, end), + qc1_status : [int], + prc_status : [int], + filename: [str] + limit: limits returns the number of records,default 0:no-limit + + :returns: csst_dfs_common.models.Result + ''' + try: + resp, _ = self.stub.Find.with_call(level2spectra_pb2.FindLevel2spectraReq( + level1_id = get_parameter(kwargs, "level1_id",0), + spectra_id = get_parameter(kwargs, "spectra_id"), + create_time_start = get_parameter(kwargs, "create_time", [None, None])[0], + create_time_end = get_parameter(kwargs, "create_time", [None, None])[1], + qc1_status = get_parameter(kwargs, "qc1_status"), + prc_status = get_parameter(kwargs, "prc_status"), + filename = get_parameter(kwargs, "filename"), + limit = get_parameter(kwargs, "limit", 0), + other_conditions = {"test":"cnlab.test"} + ),metadata = get_auth_headers()) + + if resp.success: + return Result.ok_data(data=from_proto_model_list(Level2Spectra, resp.records)).append("totalCount", resp.totalCount) + else: + return Result.error(message = str(resp.error.detail)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def get(self, **kwargs): + ''' fetch a record from database + + parameter kwargs: + id : [int] + + return csst_dfs_common.models.Result + ''' + try: + fits_id = get_parameter(kwargs, "id") + resp, _ = self.stub.Get.with_call(level2spectra_pb2.GetLevel2spectraReq( + id = fits_id + ),metadata = get_auth_headers()) + + if resp.record is None or resp.record.id == 0: + return Result.error(message=f"id:{fits_id} not found") + + return Result.ok_data(data = Level2Spectra().from_proto_model(resp.record)) + + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + status : [int] + + return csst_dfs_common.models.Result + ''' + fits_id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + try: + resp,_ = self.stub.UpdateProcStatus.with_call( + level2spectra_pb2.UpdateProcStatusReq(id=fits_id, status=status), + metadata = get_auth_headers() + ) + if resp.success: + return Result.ok_data() + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def update_qc1_status(self, **kwargs): + ''' update the status of QC0 + + parameter kwargs: + id : [int], + status : [int] + ''' + fits_id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + try: + resp,_ = self.stub.UpdateQc1Status.with_call( + level2spectra_pb2.UpdateQc1StatusReq(id=fits_id, status=status), + metadata = get_auth_headers() + ) + if resp.success: + return Result.ok_data() + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) + + def write(self, **kwargs): + ''' insert a level2spectra record into database + + parameter kwargs: + level1_id: [int] + spectra_id : [str] + region : [str] + filename : [str] + file_path : [str] + prc_status : [int] + prc_time : [str] + pipeline_id : [str] + refs: [dict] + + return csst_dfs_common.models.Result + ''' + + rec = level2spectra_pb2.Level2spectraRecord( + id = 0, + level1_id = get_parameter(kwargs, "level1_id", 0), + spectra_id = get_parameter(kwargs, "spectra_id"), + region = get_parameter(kwargs, "region"), + filename = get_parameter(kwargs, "filename", ""), + file_path = get_parameter(kwargs, "file_path", ""), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), + pipeline_id = get_parameter(kwargs, "pipeline_id") + ) + def stream(rec): + with open(rec.file_path, 'rb') as f: + while True: + data = f.read(UPLOAD_CHUNK_SIZE) + if not data: + break + yield level2spectra_pb2.WriteLevel2spectraReq(record = rec, data = data) + try: + if not rec.file_path: + return Result.error(message="file_path is blank") + if not os.path.exists(rec.file_path): + return Result.error(message="the file [%s] not existed" % (rec.file_path, )) + if not rec.filename: + rec.filename = os.path.basename(rec.file_path) + + resp,_ = self.stub.Write.with_call(stream(rec),metadata = get_auth_headers()) + if resp.success: + return Result.ok_data(data=Level2Spectra().from_proto_model(resp.record)) + else: + return Result.error(message = str(resp.error.detail)) + except grpc.RpcError as e: + return Result.error(message="%s:%s" % (e.code().value, e.details)) diff --git a/tests/test_msc_level1.py b/tests/test_msc_level1.py index b0395202766696777c87ebd2fa7b28697b9f0ce4..d126c9c04440f974b7e6dfa211b99ac13b75cc6e 100644 --- a/tests/test_msc_level1.py +++ b/tests/test_msc_level1.py @@ -15,30 +15,30 @@ class MSCLevel1DataTestCase(unittest.TestCase): create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) print('find:', recs) - def test_get(self): - rec = self.api.get(id = 2) - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 2, status = 4) - print('update_proc_status:', rec) - - def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 2, status = 7) - print('update_qc1_status:', rec) - - def test_write(self): - rec = self.api.write( - level0_id='1', - data_type = "sci", - cor_sci_id = 1, - prc_params = "/opt/dddasd.params", - flat_id = 1, - dark_id = 2, - bias_id = 3, - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - filename = "MSC_MS_210525121500_100000001_09_raw", - file_path = "/opt/temp/csst/MSC_MS_210525121500_100000001_09_raw.fits", - pipeline_id = "P1") - print('write:', rec) \ No newline at end of file + # def test_get(self): + # rec = self.api.get(id = 2) + # print('get:', rec) + + # def test_update_proc_status(self): + # rec = self.api.update_proc_status(id = 2, status = 4) + # print('update_proc_status:', rec) + + # def test_update_qc1_status(self): + # rec = self.api.update_qc1_status(id = 2, status = 7) + # print('update_qc1_status:', rec) + + # def test_write(self): + # rec = self.api.write( + # level0_id='1', + # data_type = "sci", + # cor_sci_id = 1, + # prc_params = "/opt/dddasd.params", + # flat_id = 1, + # dark_id = 2, + # bias_id = 3, + # prc_status = 3, + # prc_time = '2021-06-04 11:12:13', + # filename = "MSC_MS_210525121500_100000001_09_raw", + # file_path = "/opt/temp/csst/MSC_MS_210525121500_100000001_09_raw.fits", + # pipeline_id = "P1") + # print('write:', rec) \ No newline at end of file