diff --git a/csst_dfs_api_local/common/db.sql b/csst_dfs_api_local/common/db.sql index 82ed0e1293e720cd788748bf6b87f92cfd9915cc..b49d6d7974afa5f2847ac4e21648258444d149ef 100644 --- a/csst_dfs_api_local/common/db.sql +++ b/csst_dfs_api_local/common/db.sql @@ -36,7 +36,7 @@ drop table if exists t_observation; create table ifs_level1_data ( id integer PRIMARY KEY autoincrement, - raw_id int(20) not null, + level0_id varchar(20) not null, data_type varchar(64) not null, cor_sci_id int(20), prc_params varchar(1024), @@ -76,7 +76,7 @@ create table ifs_level1_header create table msc_level1_data ( id integer PRIMARY KEY autoincrement, - raw_id int(20) not null, + level0_id varchar(20) not null, data_type varchar(64) not null, cor_sci_id int(20), prc_params varchar(1024), @@ -113,7 +113,7 @@ create table msc_level1_header create table t_cal2level0 ( merge_id int(20) not null, - level0_id int(20) not null, + level0_id varchar(20) not null, primary key (merge_id, level0_id) ); @@ -137,7 +137,8 @@ create table t_cal_header create table t_cal_merge ( id integer PRIMARY KEY autoincrement, - detector_no varchar(64) not null, + cal_id varchar(20) not null, + detector_no varchar(10) not null, ref_type varchar(16), obs_time datetime, exp_time float, @@ -155,7 +156,7 @@ create table t_cal_merge /*==============================================================*/ create table t_detector ( - no varchar(64) not null, + no varchar(10) not null, detector_name varchar(256) not null, module_id varchar(20), filter_id varchar(20), @@ -170,7 +171,7 @@ create table t_detector create table t_detector_status ( id integer PRIMARY KEY autoincrement, - detector_no varchar(64) not null, + detector_no varchar(10) not null, status varchar(256) not null, status_time datetime, create_time datetime @@ -205,8 +206,9 @@ create table t_guiding create table t_level0_data ( id integer PRIMARY KEY autoincrement, - obs_id int(20) not null, - detector_no varchar(64) not null, + level0_id varchar(20) not null, + obs_id varchar(10) not null, + detector_no varchar(10) not null, obs_type varchar(16), obs_time datetime, exp_time float, @@ -240,13 +242,13 @@ create table t_level0_header create table t_level0_prc ( id integer PRIMARY KEY autoincrement, - level0_id int(20) not null, + level0_id varchar(20) not null, pipeline_id varchar(64) not null, prc_module varchar(32) not null, - params_id varchar(256), + params_file_path varchar(256), prc_status int(2), prc_time datetime, - file_path varchar(256) + result_file_path varchar(256) ); /*==============================================================*/ @@ -267,6 +269,7 @@ create table t_module_status create table t_observation ( id integer PRIMARY KEY autoincrement, + obs_id varchar(10), obs_time datetime, exp_time float, module_id varchar(20), diff --git a/csst_dfs_api_local/common/ingest.py b/csst_dfs_api_local/common/ingest.py index 37f92450f35f79a71713a2a7998d9bdfe5690c4d..e744dac4ec0d21d83d4a17a949d5412a436969c0 100644 --- a/csst_dfs_api_local/common/ingest.py +++ b/csst_dfs_api_local/common/ingest.py @@ -12,7 +12,7 @@ def ingest(): db = DBClient() parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files") parser.add_argument('-i','--infile', dest="infile", help="a file or a directory") - parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="move files after import") + parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="copy files after import") args = parser.parse_args(sys.argv[1:]) import_root_dir = args.infile @@ -50,13 +50,13 @@ def ingesst_one(file_path, db, copyfiles): facility_status_id = 0 module_status_id = 0 - existed = db.exists("select * from t_observation where id=?", (obs_id,)) + existed = db.exists("select * from t_observation where obs_id=?", (obs_id,)) if not existed: db.execute("insert into t_observation \ - (id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ + (obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ values (?,?,?,?,?,?,?,?,?,?)", (obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time)) - + db.end() #level0 detector = header["DETECTOR"] filename = header["FILENAME"] @@ -75,26 +75,27 @@ def ingesst_one(file_path, db, copyfiles): file_full_path = file_path if copyfiles: - obs_id_str = "%07d" % (obs_id) - file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id_str}/MS" + file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS" if not os.path.exists(file_dir): os.makedirs(file_dir) - file_full_path = f"{file_dir}/{filename}.fits" + file_full_path = f"{file_dir}/{filename}.fits" + + level0_id = f"{obs_id}{detector}" c = db.execute("insert into t_level0_data \ - (obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?,?)", - (obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) + (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ + values (?,?,?,?,?,?,?,?,?,?,?,?)", + (level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) db.end() - level0_id = db.last_row_id() + level0_id_id = db.last_row_id() #level0-header ra_obj = header["RA_OBJ"] dec_obj = header["DEC_OBJ"] - db.execute("delete from t_level0_header where id=?",(level0_id,)) + db.execute("delete from t_level0_header where id=?",(level0_id_id,)) db.execute("insert into t_level0_header \ (id, obs_time, exp_time, ra, `dec`, create_time) \ values (?,?,?,?,?,?)", - (level0_id, exp_start_time, exp_time, ra_obj, dec_obj, create_time)) + (level0_id_id, exp_start_time, exp_time, ra_obj, dec_obj, create_time)) if copyfiles: #copy files diff --git a/csst_dfs_api_local/facility/calmerge.py b/csst_dfs_api_local/facility/calmerge.py index a71603dfb1bca47e85c9963edaedd5077921a8ad..82d6a8e3ebd7720e4d21a6e2acd52a7ccf196ed9 100644 --- a/csst_dfs_api_local/facility/calmerge.py +++ b/csst_dfs_api_local/facility/calmerge.py @@ -81,12 +81,24 @@ class CalMergeApi(object): ''' fetch a record from database parameter kwargs: - id : [int] + id : [int], + cal_id : [str] return csst_dfs_common.models.Result ''' + id = get_parameter(kwargs, "id", 0) + cal_id = get_parameter(kwargs, "cal_id", "") + + if id == 0 and cal_id == "": + return Result.error(message="at least define id or cal_id") + + if id != 0: + return self.get_by_id(id) + if cal_id != "": + return self.get_by_cal_id(cal_id) + + def get_by_id(self, id: str): try: - id = get_parameter(kwargs, "id") r = self.db.select_one( "select * from t_cal_merge where id=?", (id,)) if r: @@ -99,7 +111,28 @@ class CalMergeApi(object): rec.level0_ids = level0_ids return Result.ok_data(data=rec) else: - return Result.error(message=f"id:{id} not found") + return Result.error(message=f"id:{id} not found") + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def get_by_cal_id(self, cal_id: str): + try: + r = self.db.select_one( + "select * from t_cal_merge where cal_id=?", (cal_id,)) + if r: + + sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" + _, records = self.db.select_many(sql_get_level0_id) + level0_ids = [r["level0_id"] for r in records] + + rec = CalMergeRecord().from_dict(r) + rec.level0_ids = level0_ids + return Result.ok_data(data=rec) + else: + return Result.error(message=f"id:{id} not found") + except Exception as e: log.error(e) return Result.error(message=str(e)) @@ -109,20 +142,22 @@ class CalMergeApi(object): parameter kwargs: id : [int], + cal_id : [str], status : [int] return csst_dfs_common.models.Result ''' - id = get_parameter(kwargs, "id") + id = get_parameter(kwargs, "id", 0) + cal_id = get_parameter(kwargs, "cal_id", "") + result = self.get(id = id, cal_id = cal_id) + + if not result.success: + return Result.error(message="not found") + + id = result.data.id status = get_parameter(kwargs, "status") + try: - existed = self.db.exists( - "select * from t_cal_merge where id=?", - (id,) - ) - if not existed: - log.warning('%s not found' %(id, )) - return Result.error(message ='%s not found' %(id, )) self.db.execute( 'update t_cal_merge set qc1_status=?, qc1_time=? where id=?', (status, format_time_ms(time.time()), id) @@ -132,18 +167,26 @@ class CalMergeApi(object): except Exception as e: log.error(e) - return Result.error(message=e.message) + return Result.error(message=str(e)) def update_proc_status(self, **kwargs): ''' update the status of reduction parameter kwargs: id : [int], + cal_id : [str], status : [int] return csst_dfs_common.models.Result ''' - id = get_parameter(kwargs, "id") + id = get_parameter(kwargs, "id", 0) + cal_id = get_parameter(kwargs, "cal_id", "") + result = self.get(id = id, cal_id = cal_id) + + if not result.success: + return Result.error(message="not found") + + id = result.data.id status = get_parameter(kwargs, "status") try: @@ -163,13 +206,13 @@ class CalMergeApi(object): except Exception as e: log.error(e) - return Result.error(message=e.message) + return Result.error(message=str(e)) def write(self, **kwargs): ''' insert a calibration merge record into database parameter kwargs: - id : [int] + cal_id : [str] detector_no : [str] ref_type : [str] obs_time : [str] @@ -184,6 +227,7 @@ class CalMergeApi(object): rec = CalMergeRecord( id = 0, + cal_id = get_parameter(kwargs, "cal_id"), detector_no = get_parameter(kwargs, "detector_no"), ref_type = get_parameter(kwargs, "ref_type"), obs_time = get_parameter(kwargs, "obs_time"), @@ -196,9 +240,9 @@ class CalMergeApi(object): ) try: self.db.execute( - 'INSERT INTO t_cal_merge (detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \ - VALUES(?,?,?,?,?,?,?,?,?)', - (rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time())) + 'INSERT INTO t_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \ + VALUES(?,?,?,?,?,?,?,?,?,?)', + (rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time())) ) self.db.end() rec.id = self.db.last_row_id() diff --git a/csst_dfs_api_local/facility/level0.py b/csst_dfs_api_local/facility/level0.py index 0fcb8366d46815fcdab58349bdb7ebf21fef456e..31a2c0684573b48e7b7e1d5a093474f146424089 100644 --- a/csst_dfs_api_local/facility/level0.py +++ b/csst_dfs_api_local/facility/level0.py @@ -20,7 +20,7 @@ class Level0DataApi(object): ''' retrieve level0 records from database parameter kwargs: - obs_id: [int] + obs_id: [str] detector_no: [str] obs_type: [str] obs_time : (start, end), @@ -46,8 +46,8 @@ class Level0DataApi(object): sql_data = f"select * from t_level0_data where 1=1" sql_condition = "" - if obs_id > 0: - sql_condition = f"{sql_condition} and obs_id={obs_id}" + if obs_id: + sql_condition = f"{sql_condition} and obs_id='{obs_id}'" if detector_no: sql_condition = f"{sql_condition} and detector_no='{detector_no}'" if obs_type: @@ -81,44 +81,69 @@ class Level0DataApi(object): ''' fetch a record from database parameter kwargs: - fits_id : [int] + id : [int], + level0_id : [str] return csst_dfs_common.models.Result ''' + id = get_parameter(kwargs, "id", 0) + level0_id = get_parameter(kwargs, "level0_id", "") + + if id == 0 and level0_id == "": + return Result.error(message="at least define id or level0_id") + + if id != 0: + return self.get_by_id(id) + if level0_id != "": + return self.get_by_level0_id(level0_id) + + def get_by_id(self, id: int): try: - fits_id = get_parameter(kwargs, "fits_id", -1) r = self.db.select_one( - "select * from t_level0_data where id=?", (fits_id,)) + "select * from t_level0_data where id=?", (id,)) if r: return Result.ok_data(data=Level0Record().from_dict(r)) else: - return Result.error(message=f"fits_id:{fits_id} not found") + return Result.error(message=f"id:{id} not found") except Exception as e: log.error(e) - return Result.error(message=str(e)) + return Result.error(message=str(e)) + + def get_by_level0_id(self, level0_id: str): + try: + r = self.db.select_one( + "select * from t_level0_data where level0_id=?", (level0_id,)) + if r: + return Result.ok_data(data=Level0Record().from_dict(r)) + else: + return Result.error(message=f"level0_id:{level0_id} not found") + except Exception as e: + log.error(e) + return Result.error(message=str(e)) def update_proc_status(self, **kwargs): ''' update the status of reduction parameter kwargs: - fits_id : [int], + id : [int], + level0_id : [str], status : [int] return csst_dfs_common.models.Result ''' - fits_id = get_parameter(kwargs, "fits_id") + id = get_parameter(kwargs, "id") + level0_id = get_parameter(kwargs, "level0_id") + result = self.get(id = id, level0_id = level0_id) + + if not result.success: + return Result.error(message="not found") + + id = result.data.id status = get_parameter(kwargs, "status") try: - existed = self.db.exists( - "select * from t_level0_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) self.db.execute( 'update t_level0_data set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) + (status, format_time_ms(time.time()), id) ) self.db.end() return Result.ok_data() @@ -131,22 +156,23 @@ class Level0DataApi(object): ''' update the status of QC0 parameter kwargs: - fits_id : [int], + id : [int], + level0_id : [str], status : [int] ''' - fits_id = get_parameter(kwargs, "fits_id") + id = get_parameter(kwargs, "id") + level0_id = get_parameter(kwargs, "level0_id") + result = self.get(id = id, level0_id = level0_id) + + if not result.success: + return Result.error(message="not found") + + id = result.data.id status = get_parameter(kwargs, "status") try: - existed = self.db.exists( - "select * from t_level0_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) self.db.execute( 'update t_level0_data set qc0_status=?, qc0_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) + (status, format_time_ms(time.time()), id) ) self.db.end() return Result.ok_data() @@ -159,7 +185,7 @@ class Level0DataApi(object): ''' insert a level0 data record into database parameter kwargs: - obs_id = [int] + obs_id = [str] detector_no = [str] obs_type = [str] obs_time = [str] @@ -179,6 +205,7 @@ class Level0DataApi(object): filename = get_parameter(kwargs, "filename"), file_path = get_parameter(kwargs, "file_path") ) + rec.level0_id = f"{rec.obs_id}{rec.detector_no}" try: existed = self.db.exists( "select * from t_level0_data where filename=?", @@ -189,9 +216,9 @@ class Level0DataApi(object): return Result.error(message ='%s existed' %(rec.filename, )) self.db.execute( - 'INSERT INTO t_level0_data (obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ - VALUES(?,?,?,?,?,?,?,?,?,?,?)', - (rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,-1,-1,format_time_ms(time.time())) + 'INSERT INTO t_level0_data (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ + VALUES(?,?,?,?,?,?,?,?,?,?,?,?)', + (rec.level0_id, rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,-1,-1,format_time_ms(time.time())) ) self.db.end() rec.id = self.db.last_row_id() diff --git a/csst_dfs_api_local/facility/level0prc.py b/csst_dfs_api_local/facility/level0prc.py index 8f73206681a46e6043413b18bf055f461f58d3ce..1d09811930cd3b115bb90e6356ac3119b83d103a 100644 --- a/csst_dfs_api_local/facility/level0prc.py +++ b/csst_dfs_api_local/facility/level0prc.py @@ -20,7 +20,7 @@ class Level0PrcApi(object): ''' retrieve level0 procedure records from database parameter kwargs: - level0_id: [int] + level0_id: [str] pipeline_id: [str] prc_module: [str] prc_status : [int] @@ -35,7 +35,7 @@ class Level0PrcApi(object): sql_data = f"select * from t_level0_prc" - sql_condition = f"where level0_id={level0_id}" + sql_condition = f"where level0_id='{level0_id}'" if pipeline_id: sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'" if prc_module: @@ -86,13 +86,13 @@ class Level0PrcApi(object): ''' insert a level0 procedure record into database parameter kwargs: - level0_id : [int] + level0_id : [str] pipeline_id : [str] prc_module : [str] - params_id : [str] + params_file_path : [str] prc_status : [int] prc_time : [str] - file_path : [str] + result_file_path : [str] return csst_dfs_common.models.Result ''' @@ -101,16 +101,16 @@ class Level0PrcApi(object): level0_id = get_parameter(kwargs, "level0_id"), pipeline_id = get_parameter(kwargs, "pipeline_id"), prc_module = get_parameter(kwargs, "prc_module"), - params_id = get_parameter(kwargs, "params_id"), + params_file_path = get_parameter(kwargs, "params_file_path"), prc_status = get_parameter(kwargs, "prc_status", -1), prc_time = get_parameter(kwargs, "prc_time"), - file_path = get_parameter(kwargs, "file_path") + result_file_path = get_parameter(kwargs, "result_file_path") ) try: self.db.execute( - 'INSERT INTO t_level0_prc (level0_id,pipeline_id,prc_module, params_id,prc_status,prc_time,file_path) \ + 'INSERT INTO t_level0_prc (level0_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ VALUES(?,?,?,?,?,?,?)', - (rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_id, rec.prc_status, rec.prc_time, rec.file_path) + (rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) ) self.db.end() rec.id = self.db.last_row_id() diff --git a/csst_dfs_api_local/facility/observation.py b/csst_dfs_api_local/facility/observation.py index f28a5fff5dd17020cf87b9dd207f616c818ea5b3..730cd992a0c38a3a239c0664b2ed1eb1670681da 100644 --- a/csst_dfs_api_local/facility/observation.py +++ b/csst_dfs_api_local/facility/observation.py @@ -68,43 +68,69 @@ class ObservationApi(object): def get(self, **kwargs): ''' parameter kwargs: - obs_id = [int] + id = [int], + obs_id = [str] return dict or None ''' + id = get_parameter(kwargs, "id", 0) + obs_id = get_parameter(kwargs, "obs_id", "") + + if id == 0 and obs_id == "": + return Result.error(message="at least define id or obs_id") + + if id != 0: + return self.get_by_id(id) + if obs_id != "": + return self.get_by_obs_id(obs_id) + + def get_by_id(self, id: int): + try: + r = self.db.select_one( + "select * from t_observation where id=?", (id,)) + if r: + return Result.ok_data(data=Observation().from_dict(r)) + else: + return Result.error(message=f"id:{id} not found") + + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + + def get_by_obs_id(self, obs_id: str): try: - obs_id = get_parameter(kwargs, "obs_id", -1) r = self.db.select_one( - "select * from t_observation where id=?", (obs_id,)) + "select * from t_observation where obs_id=?", (obs_id,)) if r: return Result.ok_data(data=Observation().from_dict(r)) else: return Result.error(message=f"obs_id:{obs_id} not found") except Exception as e: log.error(e) - return Result.error(message=str(e)) + return Result.error(message=str(e)) + def update_proc_status(self, **kwargs): ''' update the status of reduction parameter kwargs: - obs_id : [int], + id : [int], + obs_id : [str], status : [int] return csst_dfs_common.models.Result ''' - obs_id = get_parameter(kwargs, "obs_id") + id = get_parameter(kwargs, "id", 0) + obs_id = get_parameter(kwargs, "obs_id", "") + result = self.get(id = id, obs_id = obs_id) + if not result.success: + return Result.error(message="not found") + + id = result.data.id status = get_parameter(kwargs, "status") try: - existed = self.db.exists( - "select * from t_observation where id=?", - (obs_id,) - ) - if not existed: - log.warning('%s not found' %(obs_id, )) - return Result.error(message ='%s not found' %(obs_id, )) self.db.execute( 'update t_observation set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), obs_id) + (status, format_time_ms(time.time()), id) ) self.db.end() return Result.ok_data() @@ -117,22 +143,22 @@ class ObservationApi(object): ''' update the status of QC0 parameter kwargs: - obs_id : [int], + id : [int], + obs_id : [str], status : [int] ''' - obs_id = get_parameter(kwargs, "obs_id") + id = get_parameter(kwargs, "id", 0) + obs_id = get_parameter(kwargs, "obs_id", "") + result = self.get(id = id, obs_id = obs_id) + if not result.success: + return Result.error(message="not found") + + id = result.data.id status = get_parameter(kwargs, "status") try: - existed = self.db.exists( - "select * from t_observation where id=?", - (obs_id,) - ) - if not existed: - log.warning('%s not found' %(obs_id, )) - return Result.error(message ='%s not found' %(obs_id, )) self.db.execute( 'update t_observation set qc0_status=?, qc0_time=? where id=?', - (status, format_time_ms(time.time()), obs_id) + (status, format_time_ms(time.time()), id) ) self.db.end() return Result.ok_data() @@ -145,7 +171,8 @@ class ObservationApi(object): ''' insert a observational record into database parameter kwargs: - obs_id = [int] + id = [int] + obs_id = [str] obs_time = [str] exp_time = [int] module_id = [str] @@ -154,7 +181,8 @@ class ObservationApi(object): module_status_id = [int] return: csst_dfs_common.models.Result ''' - obs_id = get_parameter(kwargs, "obs_id", 0) + id = get_parameter(kwargs, "id", 0) + obs_id = get_parameter(kwargs, "obs_id") obs_time = get_parameter(kwargs, "obs_time") exp_time = get_parameter(kwargs, "exp_time") module_id = get_parameter(kwargs, "module_id") @@ -163,27 +191,28 @@ class ObservationApi(object): module_status_id = get_parameter(kwargs, "module_status_id") try: - if obs_id == 0: + if id == 0: r = self.db.select_one("select max(id) as max_id from t_observation") max_id = 0 if r["max_id"] is None else r["max_id"] - obs_id = max_id + 1 + id = max_id + 1 + obs_id = "%07d"%(id,) existed = self.db.exists( - "select * from t_observation where id=?", - (obs_id,) + "select * from t_observation where id=? or obs_id=?", + (id, obs_id,) ) if existed: log.warning('%s existed' %(obs_id, )) return Result.error(message ='%s existed' %(obs_id, )) self.db.execute( - 'INSERT INTO t_observation (id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status,create_time) \ - VALUES(?,?,?,?,?,?,?,?,?)', - (obs_id, obs_time, exp_time, module_id, obs_type, facility_status_id, module_status_id,-1,format_time_ms(time.time())) + 'INSERT INTO t_observation (id,obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status,create_time) \ + VALUES(?,?,?,?,?,?,?,?,?,?)', + (id, obs_id, obs_time, exp_time, module_id, obs_type, facility_status_id, module_status_id,-1,format_time_ms(time.time())) ) self.db.end() - return self.get(obs_id = obs_id) + return self.get(id = id) except Exception as e: log.error(e) diff --git a/csst_dfs_api_local/ifs/fits.py b/csst_dfs_api_local/ifs/fits.py deleted file mode 100644 index 4e4514e27880cc94190c6716327e6ce4a04a4845..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/ifs/fits.py +++ /dev/null @@ -1,239 +0,0 @@ - -import logging -import os -from os.path import join -import shutil -import time, datetime -import shutil - -from glob import glob - -from astropy.io import fits - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result - -log = logging.getLogger('csst') -class FitsApi(object): - def __init__(self, sub_system = "ifs"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.check_dir() - self.db = DBClient() - - def check_dir(self): - if not os.path.exists(self.root_dir): - os.mkdir(self.root_dir) - log.info("using [%s] as root directory", self.root_dir) - if not os.path.exists(os.path.join(self.root_dir, "fits")): - os.mkdir(os.path.join(self.root_dir, "fits")) - - def find(self, **kwargs): - ''' - parameter kwargs: - obs_time = [int], - file_name = [str], - exp_time = (start, end), - ccd_num = [int], - qc0_status = [int], - prc_status = [int], - limit: limits returns the number of records - - return: csst_dfs_common.models.Result - ''' - paths = [] - - obs_time = get_parameter(kwargs, "obs_time") - file_name = get_parameter(kwargs, "file_name") - exp_time = get_parameter(kwargs, "exp_time", (None, format_time_ms(time.time()))) - ccd_num = get_parameter(kwargs, "ccd_num") - qc0_status = get_parameter(kwargs, "qc0_status") - prc_status = get_parameter(kwargs, "prc_status") - limit = get_parameter(kwargs, "limit", 0) - limit = to_int(limit, 0) - - try: - - sql = [] - - sql.append("select * from ifs_rawfits where exp_time<='" + exp_time[1]+"'") - - if exp_time[0] is not None: - sql.append(" and exp_time>='" + exp_time[0] + "'") - if obs_time is not None: - sql.append(" and obs_time=" + repr(obs_time)) - if ccd_num is not None: - sql.append(" and ccd_num=" + repr(ccd_num)) - if qc0_status is not None: - sql.append(" and qc0_status=" + repr(qc0_status)) - if prc_status is not None: - sql.append(" and prc_status=" + repr(prc_status)) - if limit > 0: - sql.append(f" limit {limit}") - - if file_name: - sql = ["select * from ifs_rawfits where filename='" + file_name + "'"] - - totalCount = self.db.select_one("".join(sql).replace("select * from","select count(*) as v from")) - - _, recs = self.db.select_many("".join(sql)) - for r in recs: - r['file_path'] = os.path.join(self.root_dir, r['file_path']) - - return Result.ok_data(data=recs).append("totalCount", totalCount['v']) - except Exception as e: - return Result.error(message=e.message) - - def get(self, **kwargs): - ''' - parameter kwargs: - fits_id = [int] - - return dict or None - ''' - try: - fits_id = get_parameter(kwargs, "fits_id", -1) - r = self.db.select_one( - "select * from ifs_rawfits where id=?", (fits_id,)) - if r: - r['file_path'] = os.path.join(self.root_dir, r['file_path']) - return Result.ok_data(data=r) - except Exception as e: - return Result.error(message=e.message) - - def read(self, **kwargs): - ''' - parameter kwargs: - fits_id = [int], - file_path = [str], - chunk_size = [int] default 20480 - - yield bytes of fits file - ''' - fits_id = get_parameter(kwargs, "fits_id") - file_path = get_parameter(kwargs, "file_path") - - if fits_id is None and file_path is None: - raise Exception("fits_id or file_path need to be defined") - - if fits_id is not None: - r = self.db.select_one( - "select * from ifs_rawfits where id=?", (fits_id,)) - if r is not None: - file_path = r["file_path"] - - if file_path is not None: - chunk_size = get_parameter(kwargs, "chunk_size", 20480) - return yield_file_bytes(os.path.join(self.root_dir, file_path), chunk_size) - - def update_proc_status(self, **kwargs): - ''' - parameter kwargs: - fits_id = [int], - status = [int] - ''' - fits_id = get_parameter(kwargs, "fits_id") - status = get_parameter(kwargs, "status") - - existed = self.db.exists( - "select * from ifs_rawfits where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return - self.db.execute( - 'update ifs_rawfits set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - - def update_qc0_status(self, **kwargs): - ''' - parameter kwargs: - fits_id = [int], - status = [int] - ''' - - fits_id = get_parameter(kwargs, "fits_id") - status = get_parameter(kwargs, "status") - - existed = self.db.exists( - "select * from ifs_rawfits where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return - self.db.execute( - 'update ifs_rawfits set qc0_status=?, qc0_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - - def import2db(self, **kwargs): - ''' - reduce the header of fits file of server and insert a record into database - parameter kwargs: - file_path = [str] - ''' - file_path = get_parameter(kwargs, "file_path") - - if file_path is None: - raise Exception("file_path need to be defined") - - file_full_path = os.path.join(self.root_dir, file_path) - - if not os.path.exists(file_full_path): - raise Exception("%s not found" % (file_full_path)) - - file_name = os.path.basename(file_path) - - existed = self.db.exists( - "select * from ifs_rawfits where filename=?", - (file_name,) - ) - if existed: - log.warning('%s has already been imported' %(file_path, )) - return - - hu = fits.getheader(file_full_path) - obs_time = hu['obst'] if 'obst' in hu else '1' - ccd_num = hu['ccd_num'] if 'ccd_num' in hu else 0 - exp_time = format_time_ms(time.time()) - - self.db.execute( - 'INSERT INTO ifs_rawfits (filename, obs_time, ccd_num, exp_time, file_path, qc0_status, prc_status, create_time) \ - VALUES (?,?,?,?,?,?,?,?)', - (file_name, obs_time, ccd_num, exp_time, file_path, 0, 0, format_time_ms(time.time()),) - ) - self.db.end() - - log.info("raw fits %s imported.", file_path) - - def write(self, **kwargs): - ''' - copy a local file to file storage, then reduce the header of fits file and insert a record into database - parameter kwargs: - file_path = [str] - ''' - file_path = get_parameter(kwargs, "file_path") - - if not file_path: - log.error("file_path is None") - return - - new_file_dir = create_dir(os.path.join(self.root_dir, "fits"), - self.sub_system, - "/".join([str(datetime.now().year),"%02d"%(datetime.now().month),"%02d"%(datetime.now().day)])) - - file_basename = os.path.basename(file_path) - new_file_path = os.path.join(new_file_dir, file_basename) - shutil.copyfile(file_path, new_file_path) - - file_path = new_file_path.replace(self.root_dir, '') - if file_path.index("/") == 0: - file_path = file_path[1:] - - self.import2db(file_path = file_path) \ No newline at end of file diff --git a/csst_dfs_api_local/ifs/level1.py b/csst_dfs_api_local/ifs/level1.py index 7ff92e4a2525d6e12311922f3c1f1f0876d08ada..d0249507a108be9c9ed4726ca2f94e4ffcafb277 100644 --- a/csst_dfs_api_local/ifs/level1.py +++ b/csst_dfs_api_local/ifs/level1.py @@ -21,7 +21,7 @@ class Level1DataApi(object): ''' retrieve level1 records from database parameter kwargs: - raw_id: [int] + level0_id: [str] data_type: [str] obs_type: [str] create_time : (start, end), @@ -33,7 +33,7 @@ class Level1DataApi(object): return: csst_dfs_common.models.Result ''' try: - raw_id = get_parameter(kwargs, "raw_id") + level0_id = get_parameter(kwargs, "level0_id") data_type = get_parameter(kwargs, "data_type") create_time_start = get_parameter(kwargs, "create_time", [None, None])[0] create_time_end = get_parameter(kwargs, "create_time", [None, None])[1] @@ -46,8 +46,8 @@ class Level1DataApi(object): sql_data = f"select * from ifs_level1_data where 1=1" sql_condition = "" - if raw_id: - sql_condition = f"{sql_condition} and raw_id={raw_id}" + if level0_id: + sql_condition = f"{sql_condition} and level0_id='{level0_id}'" if data_type: sql_condition = f"{sql_condition} and data_type='{data_type}'" if create_time_start: @@ -72,7 +72,7 @@ class Level1DataApi(object): return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c']) except Exception as e: - return Result.error(message=e.message) + return Result.error(message=str(e)) def get(self, **kwargs): @@ -93,7 +93,7 @@ class Level1DataApi(object): return Result.error(message=f"id:{fits_id} not found") except Exception as e: log.error(e) - return Result.error(message=e.message) + return Result.error(message=str(e)) def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -123,7 +123,7 @@ class Level1DataApi(object): except Exception as e: log.error(e) - return Result.error(message=e.message) + return Result.error(message=str(e)) def update_qc1_status(self, **kwargs): ''' update the status of QC1 @@ -151,13 +151,13 @@ class Level1DataApi(object): except Exception as e: log.error(e) - return Result.error(message=e.message) + return Result.error(message=str(e)) def write(self, **kwargs): ''' insert a level1 record into database parameter kwargs: - raw_id : [int] + level0_id : [str] data_type : [str] cor_sci_id : [int] prc_params : [str] @@ -178,7 +178,7 @@ class Level1DataApi(object): try: rec = Level1Record( id = 0, - raw_id = get_parameter(kwargs, "raw_id"), + level0_id = get_parameter(kwargs, "level0_id"), data_type = get_parameter(kwargs, "data_type"), cor_sci_id = get_parameter(kwargs, "cor_sci_id"), prc_params = get_parameter(kwargs, "prc_params"), @@ -191,7 +191,7 @@ class Level1DataApi(object): filename = get_parameter(kwargs, "filename"), file_path = get_parameter(kwargs, "file_path"), prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.datetime.now())), + prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), pipeline_id = get_parameter(kwargs, "pipeline_id") ) existed = self.db.exists( @@ -203,9 +203,9 @@ class Level1DataApi(object): return Result.error(message=f'{rec.filename} has already been existed') self.db.execute( - 'INSERT INTO ifs_level1_data (raw_id,data_type,cor_sci_id,prc_params,flat_id,dark_id,bias_id,lamp_id,arc_id,sky_id,filename,file_path,qc1_status,prc_status,prc_time, create_time,pipeline_id) \ + 'INSERT INTO ifs_level1_data (level0_id,data_type,cor_sci_id,prc_params,flat_id,dark_id,bias_id,lamp_id,arc_id,sky_id,filename,file_path,qc1_status,prc_status,prc_time, create_time,pipeline_id) \ VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', - (rec.raw_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.flat_id, rec.dark_id, rec.bias_id, rec.lamp_id, rec.arc_id, rec.sky_id, rec.filename, rec.file_path, -1, rec.prc_status,rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) + (rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.flat_id, rec.dark_id, rec.bias_id, rec.lamp_id, rec.arc_id, rec.sky_id, rec.filename, rec.file_path, -1, rec.prc_status,rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) ) self.db.end() rec.id = self.db.last_row_id() @@ -213,4 +213,4 @@ class Level1DataApi(object): return Result.ok_data(data=rec) except Exception as e: log.error(e) - return Result.error(message=e.message) \ No newline at end of file + return Result.error(message=str(e)) \ No newline at end of file diff --git a/csst_dfs_api_local/ifs/reffits.py b/csst_dfs_api_local/ifs/reffits.py deleted file mode 100644 index 5979cfe3c2e36a9e8a5eb331f4818b31d712223a..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/ifs/reffits.py +++ /dev/null @@ -1,244 +0,0 @@ -import logging -import os -import time, datetime -import shutil -from astropy.io import fits - -from ..common.db import DBClient -from ..common.utils import * - -log = logging.getLogger('csst') -class RefFitsApi(object): - REF_FITS_BIAS = "bias" - REF_FITS_FLAT = "flat" - REF_FITS_DARK = "dark" - REF_FITS_SKY = "sky" - REF_FITS_ARC = "arc" - - REF_FITS_TYPES = [REF_FITS_BIAS, REF_FITS_FLAT, REF_FITS_DARK, REF_FITS_SKY, REF_FITS_ARC] - - def __init__(self, sub_system = "ifs"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.check_dir() - self.db = DBClient() - - def check_dir(self): - if not os.path.exists(self.root_dir): - os.mkdir(self.root_dir) - log.info("using [%s] as root directory", self.root_dir) - if not os.path.exists(os.path.join(self.root_dir, "refs")): - os.mkdir(os.path.join(self.root_dir, "refs")) - - def find(self, **kwargs): - ''' - parameter kwargs: - obs_time = [int], - file_name = [str], - ccd_num = [int], - exp_time = (start, end), - status = [int], - ref_type = [str] - - return list of reference's files records - ''' - obs_time = get_parameter(kwargs, "obs_time") - file_name = get_parameter(kwargs, "file_name") - exp_time = get_parameter(kwargs, "exp_time", (None, format_time_ms(time.time()))) - ccd_num = get_parameter(kwargs, "ccd_num") - status = get_parameter(kwargs, "status") - ref_type = get_parameter(kwargs, "ref_type") - - sql = [] - - sql.append("select * from ifs_ref_fits where exp_time<='" + exp_time[1] + "'") - - if exp_time[0] is not None: - sql.append(" and exp_time>='" + exp_time[0] + "'") - if obs_time is not None: - sql.append(" and obs_time=" + repr(obs_time)) - if ccd_num is not None: - sql.append(" and ccd_num=" + repr(ccd_num)) - if ref_type is not None: - sql.append(" and ref_type='" + ref_type + "'") - if status is not None: - sql.append(" and status=" + repr(status)) - - if file_name: - sql = ["select * from ifs_ref_fits where filename='" + file_name + "'"] - sql.append(" order by exp_time desc") - _, recs = self.db.select_many("".join(sql)) - - for r in recs: - r['file_path'] = os.path.join(self.root_dir, r['file_path']) - return recs - - def get(self, **kwargs): - '''query database, return a record as dict - - parameter kwargs: - fits_id = [int] - - return dict or None - ''' - fits_id = get_parameter(kwargs, "fits_id", -1) - r = self.db.select_one( - "select * from ifs_ref_fits where id=?", (fits_id,)) - if r: - r['file_path'] = os.path.join(self.root_dir, r['file_path']) - return r - - def read(self, **kwargs): - ''' - parameter kwargs: - fits_id = [int], - file_path = [str], - chunk_size = [int] - - yield bytes of fits file - ''' - fits_id = get_parameter(kwargs, "fits_id") - file_path = get_parameter(kwargs, "file_path") - - if fits_id is None and file_path is None: - raise Exception("fits_id or file_path need to be defined") - - if fits_id is not None: - r = self.db.select_one( - "select * from ifs_ref_fits where id=?", (fits_id)) - if r is not None: - file_path = r["file_path"] - - if file_path is not None: - chunk_size = get_parameter(kwargs, "chunk_size", 20480) - return yield_file_bytes(os.path.join(self.root_dir, file_path), chunk_size) - - def update_status(self, **kwargs): - ''' - parameter kwargs: - fits_id = [int], - status = [int] - ''' - - fits_id = get_parameter(kwargs, "fits_id") - status = get_parameter(kwargs, "status") - - existed = self.db.exists( - "select * from ifs_ref_fits where id=?", - (fits_id,) - ) - if existed: - log.warning('%s not found' %(fits_id, )) - return - self.db.execute( - 'update ifs_ref_fits set status=? where id=?', - (status, fits_id) - ) - self.db.end() - - def import2db(self, **kwargs): - ''' - parameter kwargs: - file_path = [str] - ref_type = [str] - - insert into database - ''' - - file_path = get_parameter(kwargs, "file_path") - - if file_path is None: - raise Exception("file_path need to be defined") - - file_full_path = os.path.join(self.root_dir, file_path) - - if not os.path.exists(file_full_path): - raise Exception("%s not found"%(file_full_path)) - - file_name = os.path.basename(file_path) - - existed = self.db.exists( - "select * from ifs_ref_fits where filename=?", - (file_name,) - ) - if existed: - log.warning('%s has already been imported' %(file_path, )) - return - - hu = fits.getheader(file_full_path) - obs_time = hu['obst'] if 'obst' in hu else '' - ccd_num = hu['ccd_num'] if 'ccd_num' in hu else 0 - exp_time = format_time_ms(time.time()) - - ref_type = get_parameter(kwargs, "ref_type") - - if ref_type is None: - if 'flat' in file_name.lower(): - ref_type = 'flat' - elif 'bias' in file_name.lower(): - ref_type = 'bias' - elif 'hgar' in file_name.lower(): - ref_type = 'arc' - elif 'sky' in file_name.lower(): - ref_type = 'sky' - else: - ref_type = "" - - - self.db.execute( - 'INSERT INTO ifs_ref_fits (filename, obs_time, ccd_num, exp_time, file_path, ref_type, status, create_time) \ - VALUES(?,?,?,?,?,?,?,?)', - (file_name, obs_time, ccd_num, exp_time, file_path, ref_type, 1, format_time_ms(time.time())) - ) - - self.db.end() - - log.info("ref fits %s imported.", file_path) - - def write(self, **kwargs): - ''' copy a local file to file storage, then reduce the header of fits file and insert a record into database - - parameter kwargs: - file_path = [str] - ''' - file_path = get_parameter(kwargs, "file_path") - - new_file_dir = create_dir(os.path.join(self.root_dir, "refs"), - self.sub_system, - "/".join([str(datetime.now().year),"%02d"%(datetime.now().month),"%02d"%(datetime.now().day)])) - - - file_basename = os.path.basename(file_path) - new_file_path = os.path.join(new_file_dir, file_basename) - shutil.copyfile(file_path, new_file_path) - - file_path = new_file_path.replace(self.root_dir, '') - if file_path.index("/") == 0: - file_path = file_path[1:] - - self.import2db(file_path = file_path) - - - def associate_raw(self, **kwargs): - ''' associate raw fits to reference file - parameter kwargs: - raw_fits_ids = [list] - ref_fits_id = [int] - ''' - raw_fits_ids = get_parameter(kwargs, "raw_fits_ids") - ref_fits_id = get_parameter(kwargs, "ref_fits_id") - - if raw_fits_ids is None or ref_fits_id is None: - raise Exception("raw_fits_ids or ref_fits_id is None") - - sql = 'INSERT INTO ifs_raw_ref (fit_id, ref_id, create_time) values ' - values = ["(%s,%s,'%s')"%(i,ref_fits_id,format_time_ms(time.time())) for i in raw_fits_ids] - - self.db.execute(sql + ",".join(values)) - - self.db.end() - - log.info("%s associate to %s imported.", raw_fits_ids, ref_fits_id) - - - diff --git a/csst_dfs_api_local/ifs/result0.py b/csst_dfs_api_local/ifs/result0.py deleted file mode 100644 index 4fd63cb8b4f11eeaa9b8c3f5f6fa54b1387fa888..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/ifs/result0.py +++ /dev/null @@ -1,134 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * - -log = logging.getLogger('csst') - -class Result0Api(object): - def __init__(self, sub_system = "ifs"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.check_dir() - self.db = DBClient() - - def check_dir(self): - if not os.path.exists(self.root_dir): - os.mkdir(self.root_dir) - log.info("using [%s] as root directory", self.root_dir) - - if not os.path.exists(os.path.join(self.root_dir, "results0")): - os.mkdir(os.path.join(self.root_dir, "results0")) - - def find(self, **kwargs): - ''' - parameter kwargs: - raw_id = [int], - file_name = [str], - proc_type = [str] - - return list of level 0 record - ''' - paths = [] - - raw_id = get_parameter(kwargs, "raw_id", -1) - file_name = get_parameter(kwargs, "file_name") - proc_type = get_parameter(kwargs, "proc_type") - sql = [] - - sql.append("select * from ifs_result_0 where raw_id=%d" %(raw_id,)) - - if proc_type is not None: - sql.append(" and proc_type='" + proc_type + "'") - - if file_name: - sql = ["select * from ifs_result_0 where filename='" + file_name + "'"] - - _, recs = self.db.select_many("".join(sql)) - - for r in recs: - r['file_path'] = os.path.join(self.root_dir, r['file_path']) - return recs - - def get(self, **kwargs): - ''' query database, return a record as dict - - parameter kwargs: - fits_id = [int] - - return dict or None - ''' - fits_id = get_parameter(kwargs, "fits_id", -1) - r = self.db.select_one( - "select * from ifs_result_0 where id=?", (fits_id,)) - if r: - r['file_path'] = os.path.join(self.root_dir, r['file_path']) - return r - - def read(self, **kwargs): - ''' yield bytes of fits file - - parameter kwargs: - fits_id = [int], - file_path = [str], - chunk_size = [int] default 20480 - - yield bytes of fits file - ''' - fits_id = get_parameter(kwargs, "fits_id") - file_path = get_parameter(kwargs, "file_path") - - if fits_id is None and file_path is None: - raise Exception("fits_id or file_path need to be defined") - - if fits_id is not None: - r = self.db.select_one( - "select * from ifs_result_0 where id=?", (fits_id)) - if r is not None: - file_path = r["file_path"] - - if file_path is not None: - chunk_size = get_parameter(kwargs, "chunk_size", 20480) - return yield_file_bytes(os.path.join(self.root_dir, file_path), chunk_size) - - def write(self, **kwargs): - ''' copy a local level 0 file to file storage, and insert a record into database - - parameter kwargs: - raw_id = [int], - file_path = [str], - proc_type = [str] - ''' - raw_id = get_parameter(kwargs, "raw_id") - file_path = get_parameter(kwargs, "file_path") - proc_type = get_parameter(kwargs, "proc_type", "default") - - if file_path is None: - raise Exception("file_path need to be defined") - - new_file_dir = create_dir(os.path.join(self.root_dir, "results0"), - self.sub_system, - "/".join([str(datetime.now().year),"%02d"%(datetime.now().month),"%02d"%(datetime.now().day)])) - - - file_basename = os.path.basename(file_path) - new_file_path = os.path.join(new_file_dir, file_basename) - shutil.copyfile(file_path, new_file_path) - - file_path = new_file_path.replace(self.root_dir, '') - if file_path.index("/") == 0: - file_path = file_path[1:] - - self.db.execute( - 'INSERT INTO ifs_result_0 (filename, file_path, raw_id, proc_type, create_time) \ - VALUES(?,?,?,?,?)', - (file_basename, file_path, raw_id, proc_type, format_time_ms(time.time())) - ) - - self.db.end() - - log.info("result0 fits %s imported.", file_path) - return new_file_path \ No newline at end of file diff --git a/csst_dfs_api_local/msc/level1.py b/csst_dfs_api_local/msc/level1.py index d3233fca4c7a8e4f30263d8e4af934f571efdd1d..160cb71ba9f53900dc456842ebcb14b52de3c64b 100644 --- a/csst_dfs_api_local/msc/level1.py +++ b/csst_dfs_api_local/msc/level1.py @@ -21,7 +21,7 @@ class Level1DataApi(object): ''' retrieve level1 records from database parameter kwargs: - raw_id: [int] + level0_id: [str] data_type: [str] obs_type: [str] create_time : (start, end), @@ -33,7 +33,7 @@ class Level1DataApi(object): return: csst_dfs_common.models.Result ''' try: - raw_id = get_parameter(kwargs, "raw_id") + level0_id = get_parameter(kwargs, "level0_id") data_type = get_parameter(kwargs, "data_type") create_time_start = get_parameter(kwargs, "create_time", [None, None])[0] create_time_end = get_parameter(kwargs, "create_time", [None, None])[1] @@ -46,8 +46,8 @@ class Level1DataApi(object): sql_data = f"select * from msc_level1_data where 1=1" sql_condition = "" - if raw_id: - sql_condition = f"{sql_condition} and raw_id={raw_id}" + if level0_id: + sql_condition = f"{sql_condition} and level0_id='{level0_id}'" if data_type: sql_condition = f"{sql_condition} and data_type='{data_type}'" if create_time_start: @@ -78,7 +78,8 @@ class Level1DataApi(object): def get(self, **kwargs): ''' parameter kwargs: - id = [int] + id = [int], + level0_id = [str] return dict or None ''' @@ -95,6 +96,24 @@ class Level1DataApi(object): log.error(e) return Result.error(message=str(e)) + def get(self, **kwargs): + ''' + parameter kwargs: + id = [int] + return csst_dfs_common.models.Result + ''' + try: + id = get_parameter(kwargs, "id", -1) + r = self.db.select_one( + "select * from msc_level1_data where id=?", (id,)) + if r: + return Result.ok_data(data=Level1Record().from_dict(r)) + else: + return Result.error(message=f"id:{id} not found") + except Exception as e: + log.error(e) + return Result.error(message=str(e)) + def update_proc_status(self, **kwargs): ''' update the status of reduction @@ -157,7 +176,7 @@ class Level1DataApi(object): ''' insert a level1 record into database parameter kwargs: - raw_id : [int] + level0_id : [str] data_type : [str] cor_sci_id : [int] prc_params : [str] @@ -178,7 +197,7 @@ class Level1DataApi(object): try: rec = Level1Record( id = 0, - raw_id = get_parameter(kwargs, "raw_id"), + level0_id = get_parameter(kwargs, "level0_id"), data_type = get_parameter(kwargs, "data_type"), cor_sci_id = get_parameter(kwargs, "cor_sci_id"), prc_params = get_parameter(kwargs, "prc_params"), @@ -188,7 +207,7 @@ class Level1DataApi(object): filename = get_parameter(kwargs, "filename"), file_path = get_parameter(kwargs, "file_path"), prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.datetime.now())), + prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), pipeline_id = get_parameter(kwargs, "pipeline_id") ) existed = self.db.exists( @@ -200,9 +219,9 @@ class Level1DataApi(object): return Result.error(message=f'{rec.filename} has already been existed') self.db.execute( - 'INSERT INTO msc_level1_data (raw_id,data_type,cor_sci_id,prc_params,flat_id,dark_id,bias_id,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \ + 'INSERT INTO msc_level1_data (level0_id,data_type,cor_sci_id,prc_params,flat_id,dark_id,bias_id,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \ VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)', - (rec.raw_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.flat_id, rec.dark_id, rec.bias_id, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) + (rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.flat_id, rec.dark_id, rec.bias_id, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) ) self.db.end() rec.id = self.db.last_row_id() diff --git a/tests/test_common_db.py b/tests/test_common_db.py index d118369aa86e9f7c6ce866b1d75d94345c593ef7..9729d4d1e35bb889277ca69cae051bd947f48a95 100644 --- a/tests/test_common_db.py +++ b/tests/test_common_db.py @@ -13,7 +13,7 @@ class DBClientTestCase(unittest.TestCase): if r is not None: print("now:", r) - r = db.exists("select * from t_observation where id=2323") + r = db.exists("select * from t_observation where id=1") if r: print("existed") else: diff --git a/tests/test_facility_cal_merge.py b/tests/test_facility_cal_merge.py index da426278d3481acac7fd3868642bc900a0bfda78..6bfbc170c9d52a323a42fe8f4236955aa7aaccf7 100644 --- a/tests/test_facility_cal_merge.py +++ b/tests/test_facility_cal_merge.py @@ -10,25 +10,27 @@ class CalMergeApiTestCase(unittest.TestCase): self.api = CalMergeApi() def test_find(self): - recs = self.api.find(detector_no='CCD01', + recs = self.api.find(detector_no='01', ref_type = "bias", obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) print('find:', recs) def test_get(self): - rec = self.api.get(id = 3) + rec = self.api.get(cal_id='0000231') print('get:', rec) def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 3, status = 1) + rec = self.api.update_proc_status(id = 1, status = 1) print('update_proc_status:', rec) def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 3, status = 2) + rec = self.api.update_qc1_status(id = 1, status = 2) print('update_qc1_status:', rec) def test_write(self): - rec = self.api.write(detector_no='CCD01', + rec = self.api.write( + cal_id='0000231', + detector_no='01', ref_type = "bias", obs_time = "2021-06-04 11:12:13", exp_time = 150, @@ -36,5 +38,5 @@ class CalMergeApiTestCase(unittest.TestCase): file_path = "/opt/dddasd.fits", prc_status = 3, prc_time = '2021-06-04 11:12:13', - level0_ids = [1,2,3,4]) + level0_ids = ['0000231','0000232','0000233','0000234']) print('write:', rec) \ No newline at end of file diff --git a/tests/test_facility_level0.py b/tests/test_facility_level0.py index 0110bf23b6084f01a1d908826896e18949bbc7dc..e76ac0af4bfff3e791044256a153ac6b8cd28680 100644 --- a/tests/test_facility_level0.py +++ b/tests/test_facility_level0.py @@ -8,25 +8,25 @@ class Level0DataApiTestCase(unittest.TestCase): self.api = Level0DataApi() def test_find(self): - recs = self.api.find(obs_id = 13, obs_type = 'sci', limit = 0) + recs = self.api.find(obs_id = '13', obs_type = 'sci', limit = 0) print('find:', recs) def test_get(self): - rec = self.api.get(fits_id = 31) + rec = self.api.get(id = 31) print('get:', rec) def test_update_proc_status(self): - rec = self.api.update_proc_status(fits_id = 31, status = 6) + rec = self.api.update_proc_status(id = 31, status = 6) print('update_proc_status:', rec) def test_update_qc0_status(self): - rec = self.api.update_qc0_status(fits_id = 31, status = 7) + rec = self.api.update_qc0_status(id = 31, status = 7) print('update_qc0_status:', rec) def test_write(self): rec = self.api.write( - obs_id = 13, - detector_no = "CCD01", + obs_id = '0000013', + detector_no = "01", obs_type = "sci", obs_time = "2021-06-06 11:12:13", exp_time = 150, diff --git a/tests/test_facility_level0_prc.py b/tests/test_facility_level0_prc.py index 0251cc1e39bb75845e6909f1466e86e80d90cd20..2c58367e023e873e167eec83cb47046ead05d204 100644 --- a/tests/test_facility_level0_prc.py +++ b/tests/test_facility_level0_prc.py @@ -10,7 +10,7 @@ class Level0PrcTestCase(unittest.TestCase): self.api = Level0PrcApi() def test_find(self): - recs = self.api.find(level0_id=134) + recs = self.api.find(level0_id='134') print('find:', recs) def test_update_proc_status(self): @@ -18,11 +18,11 @@ class Level0PrcTestCase(unittest.TestCase): print('update_proc_status:', rec) def test_write(self): - rec = self.api.write(level0_id=134, + rec = self.api.write(level0_id='134', pipeline_id = "P1", prc_module = "QC0", - params_id = "/opt/dddasd.params", + params_file_path = "/opt/dddasd.params", prc_status = 3, prc_time = '2021-06-04 11:12:13', - file_path = "/opt/dddasd.header") + result_file_path = "/opt/dddasd.header") print('write:', rec) \ No newline at end of file diff --git a/tests/test_facility_observation.py b/tests/test_facility_observation.py index bb4d85188eca6d71b1ed6bd505de349447b33a7b..75882342da0189e5084d3ed8ff314564bfbcd458 100644 --- a/tests/test_facility_observation.py +++ b/tests/test_facility_observation.py @@ -12,7 +12,7 @@ class FacilityObservationTestCase(unittest.TestCase): print('find:', recs) def test_get(self): - rec = self.api.get(obs_id=11) + rec = self.api.get(obs_id='0000022') print('get:', rec) def test_update_proc_status(self): @@ -25,6 +25,8 @@ class FacilityObservationTestCase(unittest.TestCase): def test_write(self): rec = self.api.write( + id = 0, + obs_id = "", obs_time = "2021-06-06 11:12:13", exp_time = 150, module_id = "MSC", diff --git a/tests/test_ifs_fits.py b/tests/test_ifs_fits.py deleted file mode 100644 index 64ae1a58d3a52ce3ecefaef4b2447b870e704272..0000000000000000000000000000000000000000 --- a/tests/test_ifs_fits.py +++ /dev/null @@ -1,62 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.ifs import FitsApi - -class IFSFitsTestCase(unittest.TestCase): - - def setUp(self): - self.api = FitsApi() - - def test_get(self): - r = self.api.get(fits_id=1111) - print('get:', r) - - r = self.api.get(fits_id=1) - print('get:', r) - - def test_find(self): - recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits') - print('find:', recs) - assert len(recs) == 1 - - recs = self.api.find() - print('find:', recs) - assert len(recs) > 1 - - def test_read(self): - recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits') - print("The full path: ", os.path.join(self.api.root_dir, recs[0]['file_path'])) - - file_segments = self.api.read(file_path=recs[0]['file_path']) - file_bytes = b''.join(file_segments) - hdul = fits.HDUList.fromstring(file_bytes) - print(hdul.info()) - hdr = hdul[0].header - print(repr(hdr)) - - def test_update_proc_status(self): - recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits') - - self.api.update_proc_status(fits_id=recs[0]['id'],status=1) - - rec = self.api.get(fits_id=recs[0]['id']) - assert rec['prc_status'] == 1 - - def test_update_qc0_status(self): - recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits') - - self.api.update_qc0_status(fits_id=recs[0]['id'],status=1) - - rec = self.api.get(fits_id=recs[0]['id']) - assert rec['qc0_status'] == 1 - - def test_write(self): - recs = self.api.write(file_path='/opt/temp/csst_ifs/CCD2_ObsTime_1200_ObsNum_40.fits') - - recs = self.api.find(file_name='CCD2_ObsTime_1200_ObsNum_40.fits') - - rec = self.api.get(fits_id=recs[0]['id']) - - print(rec) \ No newline at end of file diff --git a/tests/test_ifs_level1.py b/tests/test_ifs_level1.py index 793353b3be13f3b9638851fd39734c6018381fb4..2ee158f3bd30e9d0598fd8f812825813eb64747c 100644 --- a/tests/test_ifs_level1.py +++ b/tests/test_ifs_level1.py @@ -8,8 +8,10 @@ class IFSResult1TestCase(unittest.TestCase): self.api = Level1DataApi() def test_find(self): - recs = self.api.find(raw_id=11, - create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) + recs = self.api.find( + level0_id='0000223', + create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13") + ) print('find:', recs) def test_get(self): @@ -25,7 +27,8 @@ class IFSResult1TestCase(unittest.TestCase): print('update_qc1_status:', rec) def test_write(self): - rec = self.api.write(raw_id=11, + rec = self.api.write( + level0_id='0000223', data_type = "sci", cor_sci_id = 2, prc_params = "/opt/dddasd.params", diff --git a/tests/test_ifs_refs.py b/tests/test_ifs_refs.py deleted file mode 100644 index 43afb865359ec5ac1ed5ee4fdcfecfa8543a3969..0000000000000000000000000000000000000000 --- a/tests/test_ifs_refs.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.ifs import RefFitsApi - -class IFSFitsTestCase(unittest.TestCase): - - def setUp(self): - self.api = RefFitsApi() - - # def test_find(self): - # recs = self.api.find(exp_time=('2021-03-19 13:42:22', '2021-03-19 15:28:00'), ref_type=RefFitsApi.REF_FITS_FLAT) - # print('find:', recs) - # assert len(recs) > 1 - - # recs = self.api.find() - # print('=' * 80) - # print('find:', recs) - # assert len(recs) > 1 - - # def test_read(self): - # recs = self.api.find(file_name='CCD2_Flat_img.fits') - # print("The full path: ", os.path.join(self.api.root_dir, recs[0]['file_path'])) - - # file_segments = self.api.read(file_path=recs[0]['file_path']) - # file_bytes = b''.join(file_segments) - # hdul = fits.HDUList.fromstring(file_bytes) - # print(hdul.info()) - # hdr = hdul[0].header - # print(repr(hdr)) - - # def test_update_status(self): - # recs = self.api.find(file_name='CCD2_Flat_img.fits') - # self.api.update_status(fits_id=recs[0]['id'],status=1) - # rec = self.api.get(fits_id=recs[0]['id']) - # assert rec['status'] == 1 - - # def test_write(self): - # recs = self.api.write(file_path='/opt/temp/csst_ifs/CCD3_Flat_img.fits') - # recs = self.api.find(file_name='CCD3_Flat_img.fits') - # rec = self.api.get(fits_id=recs[0]['id']) - # print(rec) - - def test_associate_raw(self): - raw_fits_ids = [3,4,5] - ref_fits_id = 2 - self.api.associate_raw(raw_fits_ids = raw_fits_ids, ref_fits_id = ref_fits_id) diff --git a/tests/test_msc_level1.py b/tests/test_msc_level1.py index 9dace600eee7ceb97d7ae04da53677e04a989d6b..6667860534aa79f945dffd29649d2ad4520ccd8d 100644 --- a/tests/test_msc_level1.py +++ b/tests/test_msc_level1.py @@ -8,7 +8,8 @@ class IFSResult1TestCase(unittest.TestCase): self.api = Level1DataApi() def test_find(self): - recs = self.api.find(raw_id=11, + recs = self.api.find( + level0_id='0000223', create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) print('find:', recs) @@ -25,7 +26,8 @@ class IFSResult1TestCase(unittest.TestCase): print('update_qc1_status:', rec) def test_write(self): - rec = self.api.write(raw_id=11, + rec = self.api.write( + level0_id='0000223', data_type = "sci", cor_sci_id = 2, prc_params = "/opt/dddasd.params",