From e0a2770934a51d10fd1b1be13b94bbb19339c2a7 Mon Sep 17 00:00:00 2001 From: shoulinwei Date: Fri, 10 Jun 2022 13:02:40 +0800 Subject: [PATCH] mci 0-1 --- csst_dfs_api_local/common/db.sql | 142 +++++++++++++ csst_dfs_api_local/mci/__init__.py | 5 + csst_dfs_api_local/mci/calmerge.py | 299 ++++++++++++++++++++++++++++ csst_dfs_api_local/mci/ingest.py | 118 +++++++++++ csst_dfs_api_local/mci/level0.py | 233 ++++++++++++++++++++++ csst_dfs_api_local/mci/level0prc.py | 124 ++++++++++++ csst_dfs_api_local/mci/level1.py | 213 ++++++++++++++++++++ csst_dfs_api_local/mci/level1prc.py | 123 ++++++++++++ setup.cfg | 3 +- tests/test_mci_cal_merge.py | 46 +++++ tests/test_mci_ingest.py | 14 ++ tests/test_mci_level0.py | 36 ++++ tests/test_mci_level0_prc.py | 28 +++ tests/test_mci_level1.py | 41 ++++ tests/test_mci_level1_prc.py | 28 +++ 15 files changed, 1452 insertions(+), 1 deletion(-) create mode 100644 csst_dfs_api_local/mci/calmerge.py create mode 100644 csst_dfs_api_local/mci/ingest.py create mode 100644 csst_dfs_api_local/mci/level0.py create mode 100644 csst_dfs_api_local/mci/level0prc.py create mode 100644 csst_dfs_api_local/mci/level1.py create mode 100644 csst_dfs_api_local/mci/level1prc.py create mode 100644 tests/test_mci_cal_merge.py create mode 100644 tests/test_mci_ingest.py create mode 100644 tests/test_mci_level0.py create mode 100644 tests/test_mci_level0_prc.py create mode 100644 tests/test_mci_level1.py create mode 100644 tests/test_mci_level1_prc.py diff --git a/csst_dfs_api_local/common/db.sql b/csst_dfs_api_local/common/db.sql index 5e6f32d..961f420 100644 --- a/csst_dfs_api_local/common/db.sql +++ b/csst_dfs_api_local/common/db.sql @@ -52,7 +52,27 @@ drop table if exists ifs_level1_header; drop table if exists ifs_level1_prc; drop table if exists ifs_level1_ref; +/*----------------mci------------------------------*/ +drop table if exists mci_level0_data; + +drop table if exists mci_level0_header; + +drop table if exists mci_level0_prc; + +drop table if exists mci_cal2level0; + +drop table if exists mci_cal_header; + +drop table if exists mci_cal_merge; + +drop table if exists mci_level1_data; + +drop table if exists mci_level1_header; + +drop table if exists mci_level1_prc; + +drop table if exists mci_level1_ref; /*----------------sls------------------------------*/ drop table if exists sls_level0_data; @@ -395,6 +415,128 @@ create table ifs_level1_prc prc_time datetime, result_file_path varchar(256) ); +/*===========================mci===================================*/ +create table mci_level0_data +( + id integer PRIMARY KEY autoincrement, + level0_id varchar(20) not null, + obs_id varchar(10) not null, + detector_no varchar(10) not null, + obs_type varchar(16), + obs_time datetime, + exp_time float, + detector_status_id int(20), + filename varchar(128), + file_path varchar(256), + qc0_status tinyint(1), + qc0_time datetime, + prc_status tinyint(1), + prc_time datetime, + create_time datetime +); + +create table mci_level0_header +( + id int(20) not null, + obs_time datetime, + exp_time float, + ra float, + "dec" float, + create_time datetime, + primary key (id) +); + +create table mci_level0_prc +( + id integer PRIMARY KEY autoincrement, + level0_id varchar(20) not null, + pipeline_id varchar(64) not null, + prc_module varchar(32) not null, + params_file_path varchar(256), + prc_status int(2), + prc_time datetime, + result_file_path varchar(256) +); +create table mci_cal2level0 +( + merge_id int(20) not null, + level0_id varchar(20) not null, + primary key (merge_id, level0_id) +); + +create table mci_cal_header +( + id int(20) not null, + obs_time datetime, + exp_time float, + ra float, + "dec" float, + create_time datetime, + primary key (id) +); + +create table mci_cal_merge +( + id integer PRIMARY KEY autoincrement, + cal_id varchar(20) not null, + detector_no varchar(10) not null, + ref_type varchar(16), + obs_time datetime, + exp_time float, + filename varchar(128), + file_path varchar(256), + qc1_status tinyint(1), + qc1_time datetime, + prc_status tinyint(1), + prc_time datetime, + create_time datetime +); + +create table mci_level1_data +( + id integer PRIMARY KEY autoincrement, + level0_id varchar(20) not null, + data_type varchar(64) not null, + cor_sci_id int(20), + prc_params varchar(1024), + filename varchar(128), + file_path varchar(256), + prc_status tinyint(1), + prc_time datetime, + qc1_status tinyint(1), + qc1_time datetime, + create_time datetime, + pipeline_id varchar(60) +); + +create table mci_level1_ref ( + level1_id int(20) not null, + ref_type varchar(64) not null, + cal_id int(20) not null, + primary key (level1_id, ref_type) +); + +create table mci_level1_header +( + id int(20) not null, + obs_time datetime, + exp_time float, + ra float, + "dec" float, + create_time datetime, + primary key (id) +); +create table mci_level1_prc +( + id integer PRIMARY KEY autoincrement, + level1_id int(20) not null, + pipeline_id varchar(64) not null, + prc_module varchar(32) not null, + params_file_path varchar(256), + prc_status int(2), + prc_time datetime, + result_file_path varchar(256) +); /*===========================sls===================================*/ create table sls_level0_data ( diff --git a/csst_dfs_api_local/mci/__init__.py b/csst_dfs_api_local/mci/__init__.py index e69de29..15cff41 100644 --- a/csst_dfs_api_local/mci/__init__.py +++ b/csst_dfs_api_local/mci/__init__.py @@ -0,0 +1,5 @@ +from .calmerge import CalMergeApi +from .level0 import Level0DataApi +from .level0prc import Level0PrcApi +from .level1 import Level1DataApi +from .level1prc import Level1PrcApi \ No newline at end of file diff --git a/csst_dfs_api_local/mci/calmerge.py b/csst_dfs_api_local/mci/calmerge.py new file mode 100644 index 0000000..02ce8e2 --- /dev/null +++ b/csst_dfs_api_local/mci/calmerge.py @@ -0,0 +1,299 @@ +import os +import logging +import time, datetime +import shutil + +from ..common.db import DBClient +from ..common.utils import * +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.mci import CalMergeRecord +from csst_dfs_commons.models.common import from_dict_list +from .level0 import Level0DataApi + +log = logging.getLogger('csst') + +class CalMergeApi(object): + def __init__(self, sub_system = "mci"): + self.sub_system = sub_system + self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") + self.db = DBClient() + self.level0Api = Level0DataApi() + + def get_latest_by_l0(self, **kwargs): + ''' retrieve calibration merge records from database by level0 data + + :param kwargs: Parameter dictionary, key items support: + level0_id: [str], + ref_type: [str] + + :returns: csst_dfs_common.models.Result + ''' + try: + level0_id = get_parameter(kwargs, "level0_id") + ref_type = get_parameter(kwargs, "ref_type") + + level0_data = self.level0Api.get_by_level0_id(level0_id) + if level0_data is None: + return Result.error(message = "level0 data [%s]not found"%(level0_id)) + + sql_data = f"select * from mci_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1" + + r = self.db.select_one(sql_data) + if r: + rec = CalMergeRecord().from_dict(r) + return Result.ok_data(data=rec) + + sql_data = f"select * from mci_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time <= '{level0_data.data.obs_time}' order by obs_time DESC limit 1" + + r = self.db.select_one(sql_data) + if r: + rec = CalMergeRecord().from_dict(r) + return Result.ok_data(data=rec) + + return Result.error(message = "not found") + + except Exception as e: + return Result.error(message=str(e)) + + def find(self, **kwargs): + ''' retrieve calibration merge records from database + + parameter kwargs: + detector_no: [str] + ref_type: [str] + obs_time: (start,end) + qc1_status : [int] + prc_status : [int] + file_name: [str] + limit: limits returns the number of records,default 0:no-limit + + return: csst_dfs_common.models.Result + ''' + try: + detector_no = get_parameter(kwargs, "detector_no") + ref_type = get_parameter(kwargs, "ref_type") + exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0] + exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1] + qc1_status = get_parameter(kwargs, "qc1_status") + prc_status = get_parameter(kwargs, "prc_status") + file_name = get_parameter(kwargs, "file_name") + limit = get_parameter(kwargs, "limit", 0) + + sql_count = "select count(*) as c from mci_cal_merge where 1=1" + sql_data = f"select * from mci_cal_merge where 1=1" + + sql_condition = "" + if detector_no: + sql_condition = f"{sql_condition} and detector_no='{detector_no}'" + if ref_type: + sql_condition = f"{sql_condition} and ref_type='{ref_type}'" + if exp_time_start: + sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'" + if exp_time_end: + sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'" + if qc1_status: + sql_condition = f"{sql_condition} and qc1_status={qc1_status}" + if prc_status: + sql_condition = f"{sql_condition} and prc_status={prc_status}" + + if file_name: + sql_condition = f" and filename={file_name}" + + sql_count = f"{sql_count} {sql_condition}" + sql_data = f"{sql_data} {sql_condition}" + + log.info(sql_count) + log.info(sql_data) + + if limit > 0: + sql_data = f"{sql_data} limit {limit}" + + totalCount = self.db.select_one(sql_count) + _, records = self.db.select_many(sql_data) + + return Result.ok_data(data=from_dict_list(CalMergeRecord, records)).append("totalCount", totalCount['c']) + + except Exception as e: + return Result.error(message=str(e)) + + def get(self, **kwargs): + ''' fetch a record from database + + parameter kwargs: + id : [int], + cal_id : [str] + + return csst_dfs_common.models.Result + ''' + id = get_parameter(kwargs, "id", 0) + cal_id = get_parameter(kwargs, "cal_id", "") + + if id == 0 and cal_id == "": + return Result.error(message="at least define id or cal_id") + + if id != 0: + return self.get_by_id(id) + if cal_id != "": + return self.get_by_cal_id(cal_id) + + def get_by_id(self, iid: int): + try: + r = self.db.select_one( + "select * from mci_cal_merge where id=?", (iid,)) + if r: + + sql_get_level0_id = f"select level0_id from mci_cal2level0 where merge_id={r['id']}" + _, records = self.db.select_many(sql_get_level0_id) + level0_ids = [r["level0_id"] for r in records] + + rec = CalMergeRecord().from_dict(r) + rec.level0_ids = level0_ids + return Result.ok_data(data=rec) + else: + return Result.error(message=f"id:{iid} not found") + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def get_by_cal_id(self, cal_id: str): + try: + r = self.db.select_one( + "select * from mci_cal_merge where cal_id=?", (cal_id,)) + if r: + + sql_get_level0_id = f"select level0_id from mci_cal2level0 where merge_id={r['id']}" + _, records = self.db.select_many(sql_get_level0_id) + level0_ids = [r["level0_id"] for r in records] + + rec = CalMergeRecord().from_dict(r) + rec.level0_ids = level0_ids + return Result.ok_data(data=rec) + else: + return Result.error(message=f"id:{id} not found") + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def update_qc1_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + cal_id : [str], + status : [int] + + return csst_dfs_common.models.Result + ''' + id = get_parameter(kwargs, "id", 0) + cal_id = get_parameter(kwargs, "cal_id", "") + result = self.get(id = id, cal_id = cal_id) + + if not result.success: + return Result.error(message="not found") + + id = result.data.id + status = get_parameter(kwargs, "status") + + try: + self.db.execute( + 'update mci_cal_merge set qc1_status=?, qc1_time=? where id=?', + (status, format_time_ms(time.time()), id) + ) + self.db.end() + return Result.ok_data() + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + cal_id : [str], + status : [int] + + return csst_dfs_common.models.Result + ''' + id = get_parameter(kwargs, "id", 0) + cal_id = get_parameter(kwargs, "cal_id", "") + result = self.get(id = id, cal_id = cal_id) + + if not result.success: + return Result.error(message="not found") + + id = result.data.id + status = get_parameter(kwargs, "status") + + try: + existed = self.db.exists( + "select * from mci_cal_merge where id=?", + (id,) + ) + if not existed: + log.warning('%s not found' %(id, )) + return Result.error(message ='%s not found' %(id, )) + self.db.execute( + 'update mci_cal_merge set prc_status=?, prc_time=? where id=?', + (status, format_time_ms(time.time()), id) + ) + self.db.end() + return Result.ok_data() + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def write(self, **kwargs): + ''' insert a calibration merge record into database + + parameter kwargs: + cal_id : [str] + detector_no : [str] + ref_type : [str] + obs_time : [str] + exp_time : [float] + prc_status : [int] + prc_time : [str] + filename : [str] + file_path : [str] + level0_ids : [list] + return csst_dfs_common.models.Result + ''' + + rec = CalMergeRecord( + id = 0, + cal_id = get_parameter(kwargs, "cal_id"), + detector_no = get_parameter(kwargs, "detector_no"), + ref_type = get_parameter(kwargs, "ref_type"), + obs_time = get_parameter(kwargs, "obs_time"), + exp_time = get_parameter(kwargs, "exp_time"), + filename = get_parameter(kwargs, "filename"), + file_path = get_parameter(kwargs, "file_path"), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time"), + level0_ids = get_parameter(kwargs, "level0_ids", []) + ) + try: + self.db.execute( + 'INSERT INTO mci_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \ + VALUES(?,?,?,?,?,?,?,?,?,?)', + (rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time())) + ) + self.db.end() + rec.id = self.db.last_row_id() + + sql_level0_ids = "insert into mci_cal2level0 (merge_id,level0_id) values " + values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids] + _ = self.db.execute(sql_level0_ids + ",".join(values)) + + self.db.end() + + return Result.ok_data(data=rec) + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) \ No newline at end of file diff --git a/csst_dfs_api_local/mci/ingest.py b/csst_dfs_api_local/mci/ingest.py new file mode 100644 index 0000000..e2a698f --- /dev/null +++ b/csst_dfs_api_local/mci/ingest.py @@ -0,0 +1,118 @@ +import os, sys +import argparse +import logging +from astropy.io import fits +import datetime +import shutil + +from csst_dfs_api_local.common.db import DBClient +log = logging.getLogger('csst-dfs-api-local') + +def ingest(): + + parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files") + parser.add_argument('-i','--infile', dest="infile", help="a file or a directory") + parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="whether copy files after import") + args = parser.parse_args(sys.argv[1:]) + + import_root_dir = args.infile + if import_root_dir is None or (not os.path.isfile(import_root_dir) and not os.path.isdir(import_root_dir)): + parser.print_help() + sys.exit(0) + + db = DBClient() + if os.path.isfile(import_root_dir): + log.info(f"prepare import {import_root_dir}") + ingest_one(import_root_dir, db, args.copyfiles) + if os.path.isdir(import_root_dir): + for (path, _, file_names) in os.walk(import_root_dir): + for filename in file_names: + if filename.find(".fits") > 0: + file_full_path = os.path.join(path, filename) + log.info(f"prepare import {file_full_path}") + try: + ingest_one(file_full_path, db, args.copyfiles) + except Exception as e: + print(f"{file_full_path} import error!!!") + log.error(e) + + db.close() + +def ingest_one(file_path, db, copyfiles): + dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") + + hdul = fits.open(file_path) + header = hdul[0].header + header1 = hdul[1].header + + obs_id = header["OBSID"] + exp_start_time = f"{header['DATE-OBS']}" + exp_time = header['EXPTIME'] + + module_id = header["INSTRUME"] + obs_type = header["OBSTYPE"] + qc0_status = -1 + prc_status = -1 + time_now = datetime.datetime.now() + create_time = time_now.strftime('%Y-%m-%d %H:%M:%S') + + facility_status_id = 0 + module_status_id = 0 + + existed = db.exists("select * from t_observation where obs_id=?", (obs_id,)) + if not existed: + db.execute("insert into t_observation \ + (obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ + values (?,?,?,?,?,?,?,?,?,?)", + (obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time)) + db.end() + #level0 + detector = header1["DETNAM"] + filename = header["FILENAME"] + + existed = db.exists( + "select * from mci_level0_data where filename=?", + (filename,) + ) + if existed: + log.warning('%s has already been imported' %(file_path, )) + db.end() + return + + detector_status_id = 0 + + file_full_path = file_path + + if copyfiles: + file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS" + if not os.path.exists(file_dir): + os.makedirs(file_dir) + file_full_path = f"{file_dir}/{filename}.fits" + + level0_id = f"{obs_id}{detector}" + + c = db.execute("insert into mci_level0_data \ + (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ + values (?,?,?,?,?,?,?,?,?,?,?,?)", + (level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) + db.end() + level0_id_id = db.last_row_id() + #level0-header + ra_obj = header["OBJ_RA"] + dec_obj = header["OBJ_DEC"] + db.execute("delete from mci_level0_header where id=?",(level0_id_id,)) + db.execute("insert into mci_level0_header \ + (id, obs_time, exp_time, ra, `dec`, create_time) \ + values (?,?,?,?,?,?)", + (level0_id_id, exp_start_time, exp_time, ra_obj, dec_obj, create_time)) + + if copyfiles: + #copy files + shutil.copyfile(file_path, file_full_path) + + db.end() + + print(f"{file_path} imported") + +if __name__ == "__main__": + ingest() \ No newline at end of file diff --git a/csst_dfs_api_local/mci/level0.py b/csst_dfs_api_local/mci/level0.py new file mode 100644 index 0000000..071032e --- /dev/null +++ b/csst_dfs_api_local/mci/level0.py @@ -0,0 +1,233 @@ +import os +import logging +import time, datetime +import shutil + +from ..common.db import DBClient +from ..common.utils import * +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.mci import Level0Record +from csst_dfs_commons.models.common import from_dict_list + +log = logging.getLogger('csst') + +class Level0DataApi(object): + def __init__(self, sub_system = "mci"): + self.sub_system = sub_system + self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") + self.db = DBClient() + + def find(self, **kwargs): + ''' retrieve level0 records from database + + parameter kwargs: + obs_id: [str] + detector_no: [str] + obs_type: [str] + obs_time : (start, end), + qc0_status : [int], + prc_status : [int], + file_name: [str] + limit: limits returns the number of records,default 0:no-limit + + return: csst_dfs_common.models.Result + ''' + try: + obs_id = get_parameter(kwargs, "obs_id") + detector_no = get_parameter(kwargs, "detector_no") + obs_type = get_parameter(kwargs, "obs_type") + exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0] + exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1] + qc0_status = get_parameter(kwargs, "qc0_status") + prc_status = get_parameter(kwargs, "prc_status") + file_name = get_parameter(kwargs, "file_name") + limit = get_parameter(kwargs, "limit", 0) + + sql_count = "select count(*) as c from mci_level0_data where 1=1" + sql_data = f"select * from mci_level0_data where 1=1" + + sql_condition = "" + if obs_id: + sql_condition = f"{sql_condition} and obs_id='{obs_id}'" + if detector_no: + sql_condition = f"{sql_condition} and detector_no='{detector_no}'" + if obs_type: + sql_condition = f"{sql_condition} and obs_type='{obs_type}'" + if exp_time_start: + sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'" + if exp_time_end: + sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'" + if qc0_status: + sql_condition = f"{sql_condition} and qc0_status={qc0_status}" + if prc_status: + sql_condition = f"{sql_condition} and prc_status={prc_status}" + if file_name: + sql_condition = f" and filename='{file_name}'" + + sql_count = f"{sql_count} {sql_condition}" + sql_data = f"{sql_data} {sql_condition}" + + if limit > 0: + sql_data = f"{sql_data} limit {limit}" + + totalCount = self.db.select_one(sql_count) + _, records = self.db.select_many(sql_data) + + return Result.ok_data(data=from_dict_list(Level0Record, records)).append("totalCount", totalCount['c']) + + except Exception as e: + return Result.error(message=str(e)) + + def get(self, **kwargs): + ''' fetch a record from database + + parameter kwargs: + id : [int], + level0_id : [str] + + return csst_dfs_common.models.Result + ''' + id = get_parameter(kwargs, "id", 0) + level0_id = get_parameter(kwargs, "level0_id", "") + + if id == 0 and level0_id == "": + return Result.error(message="at least define id or level0_id") + + if id != 0: + return self.get_by_id(id) + if level0_id != "": + return self.get_by_level0_id(level0_id) + + def get_by_id(self, id: int): + try: + r = self.db.select_one( + "select * from mci_level0_data where id=?", (id,)) + if r: + return Result.ok_data(data=Level0Record().from_dict(r)) + else: + return Result.error(message=f"id:{id} not found") + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def get_by_level0_id(self, level0_id: str): + try: + r = self.db.select_one( + "select * from mci_level0_data where level0_id=?", (level0_id,)) + if r: + return Result.ok_data(data=Level0Record().from_dict(r)) + else: + return Result.error(message=f"level0_id:{level0_id} not found") + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + level0_id : [str], + status : [int] + + return csst_dfs_common.models.Result + ''' + id = get_parameter(kwargs, "id") + level0_id = get_parameter(kwargs, "level0_id") + result = self.get(id = id, level0_id = level0_id) + + if not result.success: + return Result.error(message="not found") + + id = result.data.id + status = get_parameter(kwargs, "status") + try: + self.db.execute( + 'update mci_level0_data set prc_status=?, prc_time=? where id=?', + (status, format_time_ms(time.time()), id) + ) + self.db.end() + return Result.ok_data() + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def update_qc0_status(self, **kwargs): + ''' update the status of QC0 + + parameter kwargs: + id : [int], + level0_id : [str], + status : [int] + ''' + id = get_parameter(kwargs, "id") + level0_id = get_parameter(kwargs, "level0_id") + result = self.get(id = id, level0_id = level0_id) + + if not result.success: + return Result.error(message="not found") + + id = result.data.id + status = get_parameter(kwargs, "status") + try: + self.db.execute( + 'update mci_level0_data set qc0_status=?, qc0_time=? where id=?', + (status, format_time_ms(time.time()), id) + ) + self.db.end() + return Result.ok_data() + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def write(self, **kwargs): + ''' insert a level0 data record into database + + parameter kwargs: + obs_id = [str] + detector_no = [str] + obs_type = [str] + obs_time = [str] + exp_time = [int] + detector_status_id = [int] + filename = [str] + file_path = [str] + return: csst_dfs_common.models.Result + ''' + rec = Level0Record( + obs_id = get_parameter(kwargs, "obs_id"), + detector_no = get_parameter(kwargs, "detector_no"), + obs_type = get_parameter(kwargs, "obs_type"), + obs_time = get_parameter(kwargs, "obs_time"), + exp_time = get_parameter(kwargs, "exp_time"), + detector_status_id = get_parameter(kwargs, "detector_status_id"), + filename = get_parameter(kwargs, "filename"), + file_path = get_parameter(kwargs, "file_path") + ) + rec.level0_id = f"{rec.obs_id}{rec.detector_no}" + try: + existed = self.db.exists( + "select * from mci_level0_data where filename=?", + (rec.filename,) + ) + if existed: + log.warning('%s existed' %(rec.filename, )) + return Result.error(message ='%s existed' %(rec.filename, )) + + self.db.execute( + 'INSERT INTO mci_level0_data (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ + VALUES(?,?,?,?,?,?,?,?,?,?,?,?)', + (rec.level0_id, rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,-1,-1,format_time_ms(time.time())) + ) + self.db.end() + rec.id = self.db.last_row_id() + + return Result.ok_data(data=rec) + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + diff --git a/csst_dfs_api_local/mci/level0prc.py b/csst_dfs_api_local/mci/level0prc.py new file mode 100644 index 0000000..b7aa033 --- /dev/null +++ b/csst_dfs_api_local/mci/level0prc.py @@ -0,0 +1,124 @@ +import os +import logging +import time, datetime +import shutil + +from ..common.db import DBClient +from ..common.utils import * +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.mci import Level0PrcRecord +from csst_dfs_commons.models.common import from_dict_list + +log = logging.getLogger('csst') + +class Level0PrcApi(object): + def __init__(self, sub_system = "mci"): + self.sub_system = sub_system + self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") + self.db = DBClient() + + def find(self, **kwargs): + ''' retrieve level0 procedure records from database + + parameter kwargs: + level0_id: [str] + pipeline_id: [str] + prc_module: [str] + prc_status : [int] + + return: csst_dfs_common.models.Result + ''' + try: + level0_id = get_parameter(kwargs, "level0_id") + pipeline_id = get_parameter(kwargs, "pipeline_id") + prc_module = get_parameter(kwargs, "prc_module") + prc_status = get_parameter(kwargs, "prc_status") + + sql_data = f"select * from mci_level0_prc" + + sql_condition = f"where level0_id='{level0_id}'" + if pipeline_id: + sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'" + if prc_module: + sql_condition = sql_condition + " and prc_module ='" + prc_module + "'" + if prc_status: + sql_condition = f"{sql_condition} and prc_status={prc_status}" + + sql_data = f"{sql_data} {sql_condition}" + + _, records = self.db.select_many(sql_data) + return Result.ok_data(data=from_dict_list(Level0PrcRecord, records)).append("totalCount", len(records)) + + except Exception as e: + return Result.error(message=str(e)) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + status : [int] + + return csst_dfs_common.models.Result + ''' + id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + + try: + existed = self.db.exists( + "select * from mci_level0_prc where id=?", + (id,) + ) + if not existed: + log.warning('%s not found' %(id, )) + return Result.error(message ='%s not found' %(id, )) + self.db.execute( + 'update mci_level0_prc set prc_status=?, prc_time=? where id=?', + (status, format_time_ms(time.time()), id) + ) + self.db.end() + return Result.ok_data() + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def write(self, **kwargs): + ''' insert a level0 procedure record into database + + parameter kwargs: + level0_id : [str] + pipeline_id : [str] + prc_module : [str] + params_file_path : [str] + prc_status : [int] + prc_time : [str] + result_file_path : [str] + return csst_dfs_common.models.Result + ''' + + rec = Level0PrcRecord( + id = 0, + level0_id = get_parameter(kwargs, "level0_id"), + pipeline_id = get_parameter(kwargs, "pipeline_id"), + prc_module = get_parameter(kwargs, "prc_module"), + params_file_path = get_parameter(kwargs, "params_file_path"), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time"), + result_file_path = get_parameter(kwargs, "result_file_path") + ) + try: + self.db.execute( + 'INSERT INTO mci_level0_prc (level0_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ + VALUES(?,?,?,?,?,?,?)', + (rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) + ) + self.db.end() + rec.id = self.db.last_row_id() + + return Result.ok_data(data=rec) + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + diff --git a/csst_dfs_api_local/mci/level1.py b/csst_dfs_api_local/mci/level1.py new file mode 100644 index 0000000..05458cf --- /dev/null +++ b/csst_dfs_api_local/mci/level1.py @@ -0,0 +1,213 @@ +import os +import logging +import time, datetime +import shutil + +from ..common.db import DBClient +from ..common.utils import * +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.mci import Level1Record +from csst_dfs_commons.models.common import from_dict_list + +log = logging.getLogger('csst') + +class Level1DataApi(object): + def __init__(self, sub_system = "mci"): + self.sub_system = sub_system + self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") + self.db = DBClient() + + def find(self, **kwargs): + ''' retrieve level1 records from database + + parameter kwargs: + level0_id: [str] + data_type: [str] + create_time : (start, end), + qc1_status : [int], + prc_status : [int], + filename: [str] + limit: limits returns the number of records,default 0:no-limit + + return: csst_dfs_common.models.Result + ''' + try: + level0_id = get_parameter(kwargs, "level0_id") + data_type = get_parameter(kwargs, "data_type") + create_time_start = get_parameter(kwargs, "create_time", [None, None])[0] + create_time_end = get_parameter(kwargs, "create_time", [None, None])[1] + qc1_status = get_parameter(kwargs, "qc1_status") + prc_status = get_parameter(kwargs, "prc_status") + filename = get_parameter(kwargs, "filename") + limit = get_parameter(kwargs, "limit", 0) + + sql_count = "select count(*) as c from mci_level1_data where 1=1" + sql_data = f"select * from mci_level1_data where 1=1" + + sql_condition = "" + if level0_id: + sql_condition = f"{sql_condition} and level0_id='{level0_id}'" + if data_type: + sql_condition = f"{sql_condition} and data_type='{data_type}'" + if create_time_start: + sql_condition = f"{sql_condition} and create_time >='{create_time_start}'" + if create_time_end: + sql_condition = f"{sql_condition} and create_time <='{create_time_end}'" + if qc1_status: + sql_condition = f"{sql_condition} and qc1_status={qc1_status}" + if prc_status: + sql_condition = f"{sql_condition} and prc_status={prc_status}" + if filename: + sql_condition = f" and filename='{filename}'" + + sql_count = f"{sql_count} {sql_condition}" + sql_data = f"{sql_data} {sql_condition}" + + if limit > 0: + sql_data = f"{sql_data} limit {limit}" + + totalCount = self.db.select_one(sql_count) + _, recs = self.db.select_many(sql_data) + return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c']) + + except Exception as e: + return Result.error(message=str(e)) + + + def get(self, **kwargs): + ''' + parameter kwargs: + id = [int] + + return dict or None + ''' + try: + fits_id = get_parameter(kwargs, "id", -1) + r = self.db.select_one( + "select * from mci_level1_data where id=?", (fits_id,)) + + if r: + return Result.ok_data(data=Level1Record().from_dict(r)) + else: + return Result.error(message=f"id:{fits_id} not found") + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + status : [int] + + return csst_dfs_common.models.Result + ''' + fits_id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + try: + existed = self.db.exists( + "select * from mci_level1_data where id=?", + (fits_id,) + ) + if not existed: + log.warning('%s not found' %(fits_id, )) + return Result.error(message ='%s not found' %(fits_id, )) + self.db.execute( + 'update mci_level1_data set prc_status=?, prc_time=? where id=?', + (status, format_time_ms(time.time()), fits_id) + ) + self.db.end() + return Result.ok_data() + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def update_qc1_status(self, **kwargs): + ''' update the status of QC1 + + parameter kwargs: + id : [int], + status : [int] + ''' + fits_id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + try: + existed = self.db.exists( + "select * from mci_level1_data where id=?", + (fits_id,) + ) + if not existed: + log.warning('%s not found' %(fits_id, )) + return Result.error(message ='%s not found' %(fits_id, )) + self.db.execute( + 'update mci_level1_data set qc1_status=?, qc1_time=? where id=?', + (status, format_time_ms(time.time()), fits_id) + ) + self.db.end() + return Result.ok_data() + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def write(self, **kwargs): + ''' insert a level1 record into database + + parameter kwargs: + level0_id : [str] + data_type : [str] + cor_sci_id : [int] + prc_params : [str] + filename : [str] + file_path : [str] + prc_status : [int] + prc_time : [str] + pipeline_id : [str] + refs : [dict] + + return csst_dfs_common.models.Result + ''' + try: + rec = Level1Record( + id = 0, + level0_id = get_parameter(kwargs, "level0_id"), + data_type = get_parameter(kwargs, "data_type"), + cor_sci_id = get_parameter(kwargs, "cor_sci_id"), + prc_params = get_parameter(kwargs, "prc_params"), + filename = get_parameter(kwargs, "filename"), + file_path = get_parameter(kwargs, "file_path"), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), + pipeline_id = get_parameter(kwargs, "pipeline_id"), + refs = get_parameter(kwargs, "refs", {}) + ) + existed = self.db.exists( + "select * from mci_level1_data where filename=?", + (rec.filename,) + ) + if existed: + log.error(f'{rec.filename} has already been existed') + return Result.error(message=f'{rec.filename} has already been existed') + + now_str = format_time_ms(time.time()) + self.db.execute( + 'INSERT INTO mci_level1_data (level0_id,data_type,cor_sci_id,prc_params,filename,file_path,qc1_status,prc_status,prc_time, create_time,pipeline_id) \ + VALUES(?,?,?,?,?,?,?,?,?,?,?)', + (rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status,rec.prc_time, now_str, rec.pipeline_id,) + ) + self.db.end() + rec.id = self.db.last_row_id() + + if rec.refs.items(): + sql_refs = "insert into mci_level1_ref (level1_id,ref_type,cal_id) values " + values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()] + _ = self.db.execute(sql_refs + ",".join(values)) + self.db.end() + + rec.create_time = now_str + return Result.ok_data(data=rec) + except Exception as e: + log.error(e) + return Result.error(message=str(e)) \ No newline at end of file diff --git a/csst_dfs_api_local/mci/level1prc.py b/csst_dfs_api_local/mci/level1prc.py new file mode 100644 index 0000000..74891b5 --- /dev/null +++ b/csst_dfs_api_local/mci/level1prc.py @@ -0,0 +1,123 @@ +import os +import logging +import time, datetime +import shutil + +from ..common.db import DBClient +from ..common.utils import * +from csst_dfs_commons.models import Result +from csst_dfs_commons.models.mci import Level1PrcRecord +from csst_dfs_commons.models.common import from_dict_list + +log = logging.getLogger('csst') + +class Level1PrcApi(object): + def __init__(self): + self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") + self.db = DBClient() + + def find(self, **kwargs): + ''' retrieve level1 procedure records from database + + parameter kwargs: + level1_id: [int] + pipeline_id: [str] + prc_module: [str] + prc_status : [int] + + return: csst_dfs_common.models.Result + ''' + try: + level1_id = get_parameter(kwargs, "level1_id", 0) + pipeline_id = get_parameter(kwargs, "pipeline_id") + prc_module = get_parameter(kwargs, "prc_module") + prc_status = get_parameter(kwargs, "prc_status") + + sql_data = f"select * from mci_level1_prc" + + sql_condition = f"where level1_id={level1_id}" + if pipeline_id: + sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'" + if prc_module: + sql_condition = sql_condition + " and prc_module ='" + prc_module + "'" + if prc_status: + sql_condition = f"{sql_condition} and prc_status={prc_status}" + + sql_data = f"{sql_data} {sql_condition}" + + _, records = self.db.select_many(sql_data) + return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records)) + + except Exception as e: + return Result.error(message=str(e)) + + def update_proc_status(self, **kwargs): + ''' update the status of reduction + + parameter kwargs: + id : [int], + status : [int] + + return csst_dfs_common.models.Result + ''' + id = get_parameter(kwargs, "id") + status = get_parameter(kwargs, "status") + + try: + existed = self.db.exists( + "select * from mci_level1_prc where id=?", + (id,) + ) + if not existed: + log.warning('%s not found' %(id, )) + return Result.error(message ='%s not found' %(id, )) + self.db.execute( + 'update mci_level1_prc set prc_status=?, prc_time=? where id=?', + (status, format_time_ms(time.time()), id) + ) + self.db.end() + return Result.ok_data() + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def write(self, **kwargs): + ''' insert a level1 procedure record into database + + parameter kwargs: + level1_id : [int] + pipeline_id : [str] + prc_module : [str] + params_file_path : [str] + prc_status : [int] + prc_time : [str] + result_file_path : [str] + return csst_dfs_common.models.Result + ''' + + rec = Level1PrcRecord( + id = 0, + level1_id = get_parameter(kwargs, "level1_id", 0), + pipeline_id = get_parameter(kwargs, "pipeline_id"), + prc_module = get_parameter(kwargs, "prc_module"), + params_file_path = get_parameter(kwargs, "params_file_path"), + prc_status = get_parameter(kwargs, "prc_status", -1), + prc_time = get_parameter(kwargs, "prc_time"), + result_file_path = get_parameter(kwargs, "result_file_path") + ) + try: + self.db.execute( + 'INSERT INTO mci_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ + VALUES(?,?,?,?,?,?,?)', + (rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) + ) + self.db.end() + rec.id = self.db.last_row_id() + + return Result.ok_data(data=rec) + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + diff --git a/setup.cfg b/setup.cfg index af85141..495fac3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,4 +28,5 @@ csst_dfs_api_local.common = *.sql [options.entry_points] console_scripts = csst-msc-ingest-local = csst_dfs_api_local.msc.ingest:ingest - csst-ifs-ingest-local = csst_dfs_api_local.ifs.ingest:ingest \ No newline at end of file + csst-ifs-ingest-local = csst_dfs_api_local.ifs.ingest:ingest + csst-mci-ingest-local = csst_dfs_api_local.mci.ingest:ingest \ No newline at end of file diff --git a/tests/test_mci_cal_merge.py b/tests/test_mci_cal_merge.py new file mode 100644 index 0000000..be8e44a --- /dev/null +++ b/tests/test_mci_cal_merge.py @@ -0,0 +1,46 @@ +import os +import unittest +from astropy.io import fits + +from csst_dfs_api_local.mci.calmerge import CalMergeApi + +class MCICalMergeApiTestCase(unittest.TestCase): + + def setUp(self): + self.api = CalMergeApi() + + def test_find(self): + recs = self.api.find(detector_no='01', + ref_type = "bias", + obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) + print('find:', recs) + + def test_get_latest_by_l0(self): + rec = self.api.get_latest_by_l0(level0_id='000001301', ref_type = "bias") + print('get:', rec) + + def test_get(self): + rec = self.api.get(cal_id='0000231') + print('get:', rec) + + def test_update_proc_status(self): + rec = self.api.update_proc_status(id = 100, status = 1) + print('update_proc_status:', rec) + + def test_update_qc1_status(self): + rec = self.api.update_qc1_status(id = 100, status = 2) + print('update_qc1_status:', rec) + + def test_write(self): + rec = self.api.write( + cal_id='0000231', + detector_no='01', + ref_type = "bias", + obs_time = "2021-06-04 11:12:13", + exp_time = 150, + filename = "/opt/dddasd.params", + file_path = "/opt/dddasd.fits", + prc_status = 3, + prc_time = '2021-06-04 11:12:13', + level0_ids = ['0000231','0000232','0000233','0000234']) + print('write:', rec) \ No newline at end of file diff --git a/tests/test_mci_ingest.py b/tests/test_mci_ingest.py new file mode 100644 index 0000000..49fc448 --- /dev/null +++ b/tests/test_mci_ingest.py @@ -0,0 +1,14 @@ +import unittest + +from csst_dfs_api_local.mci.ingest import ingest, ingesst_one + +class MCILevel0DataApiTestCase(unittest.TestCase): + + def setUp(self): + pass + + def ingesst_one(self): + ingesst_one() + + def ingest(self): + ingest diff --git a/tests/test_mci_level0.py b/tests/test_mci_level0.py new file mode 100644 index 0000000..6584c6a --- /dev/null +++ b/tests/test_mci_level0.py @@ -0,0 +1,36 @@ +import unittest + +from csst_dfs_api_local.mci import Level0DataApi + +class MCILevel0DataApiTestCase(unittest.TestCase): + + def setUp(self): + self.api = Level0DataApi() + + def test_find(self): + recs = self.api.find(obs_id = '300000055', obs_type = 'sky', limit = 0) + print('find:', recs) + + def test_get(self): + rec = self.api.get(id = 31) + print('get:', rec) + + def test_update_proc_status(self): + rec = self.api.update_proc_status(id = 31, status = 6) + print('update_proc_status:', rec) + + def test_update_qc0_status(self): + rec = self.api.update_qc0_status(id = 31, status = 7) + print('update_qc0_status:', rec) + + def test_write(self): + rec = self.api.write( + obs_id = '0000013', + detector_no = "01", + obs_type = "sky", + obs_time = "2021-06-06 11:12:13", + exp_time = 150, + detector_status_id = 3, + filename = "MSC_00001234", + file_path = "/opt/MSC_00001234.fits") + print('write:', rec) diff --git a/tests/test_mci_level0_prc.py b/tests/test_mci_level0_prc.py new file mode 100644 index 0000000..3a926d5 --- /dev/null +++ b/tests/test_mci_level0_prc.py @@ -0,0 +1,28 @@ +import os +import unittest +from astropy.io import fits + +from csst_dfs_api_local.mci.level0prc import Level0PrcApi + +class MCILevel0PrcTestCase(unittest.TestCase): + + def setUp(self): + self.api = Level0PrcApi() + + def test_find(self): + recs = self.api.find(level0_id='300000055CCD231-c4') + print('find:', recs) + + def test_update_proc_status(self): + rec = self.api.update_proc_status(id = 1, status = 4) + print('update_proc_status:', rec) + + def test_write(self): + rec = self.api.write(level0_id='300000055CCD231-c4', + pipeline_id = "P1", + prc_module = "QC0", + params_file_path = "/opt/dddasd.params", + prc_status = 3, + prc_time = '2021-06-04 11:12:13', + result_file_path = "/opt/dddasd.header") + print('write:', rec) \ No newline at end of file diff --git a/tests/test_mci_level1.py b/tests/test_mci_level1.py new file mode 100644 index 0000000..ede8742 --- /dev/null +++ b/tests/test_mci_level1.py @@ -0,0 +1,41 @@ +import unittest + +from csst_dfs_api_local.mci import Level1DataApi + +class MCILevel1DataApiTestCase(unittest.TestCase): + + def setUp(self): + self.api = Level1DataApi() + + def test_find(self): + recs = self.api.find( + level0_id='0000223', + create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13") + ) + print('find:', recs) + + def test_get(self): + rec = self.api.get(id = 1) + print('get:', rec) + + def test_update_proc_status(self): + rec = self.api.update_proc_status(id = 1, status = 4) + print('update_proc_status:', rec) + + def test_update_qc1_status(self): + rec = self.api.update_qc1_status(id = 1, status = 7) + print('update_qc1_status:', rec) + + def test_write(self): + rec = self.api.write( + level0_id='0000223', + data_type = "sci", + cor_sci_id = 2, + prc_params = "/opt/dddasd.params", + prc_status = 3, + prc_time = '2021-06-05 11:12:13', + filename = "dddasd223234.fits", + file_path = "/opt/dddasd23.fits", + pipeline_id = "P2", + refs = {'dark': 1, 'bias': 2, 'flat': 3 }) + print('write:', rec) \ No newline at end of file diff --git a/tests/test_mci_level1_prc.py b/tests/test_mci_level1_prc.py new file mode 100644 index 0000000..5d8628b --- /dev/null +++ b/tests/test_mci_level1_prc.py @@ -0,0 +1,28 @@ +import os +import unittest +from astropy.io import fits + +from csst_dfs_api_local.mci.level1prc import Level1PrcApi + +class MCILevel1PrcTestCase(unittest.TestCase): + + def setUp(self): + self.api = Level1PrcApi() + + def test_find(self): + recs = self.api.find(level1_id=1) + print('find:', recs) + + def test_update_proc_status(self): + rec = self.api.update_proc_status(id = 1, status = 4) + print('update_proc_status:', rec) + + def test_write(self): + rec = self.api.write(level1_id=1, + pipeline_id = "P1", + prc_module = "QC0", + params_file_path = "/opt/dddasd.params", + prc_status = 3, + prc_time = '2021-06-04 11:12:13', + result_file_path = "/opt/dddasd.header") + print('write:', rec) \ No newline at end of file -- GitLab