diff --git a/csst_dfs_api_local/common/db.sql b/csst_dfs_api_local/common/db.sql index fce218d99966001f7be3019d41dfd7544cd38934..67540511075d4ff85cceaab0d7849e83d671bedb 100644 --- a/csst_dfs_api_local/common/db.sql +++ b/csst_dfs_api_local/common/db.sql @@ -17,22 +17,15 @@ drop table if exists t_level0_header; drop table if exists t_level0_prc; -drop table if exists t_cal2level0; +drop table if exists t_level1_data; -drop table if exists t_cal_header; +drop table if exists t_level1_header; -drop table if exists t_cal_merge; +drop table if exists t_level1_prc; -/*----------------msc------------------------------*/ - - -drop table if exists msc_level1_data; +drop table if exists t_level1_ref; -drop table if exists msc_level1_header; - -drop table if exists msc_level1_prc; - -drop table if exists msc_level1_ref; +/*----------------msc------------------------------*/ drop table if exists msc_level2_data; @@ -41,44 +34,17 @@ drop table if exists msc_level2_header; drop table if exists msc_level2_catalog; /*----------------ifs------------------------------*/ -drop table if exists ifs_level1_data; - -drop table if exists ifs_level1_header; - -drop table if exists ifs_level1_prc; - -drop table if exists ifs_level1_ref; /*----------------mci------------------------------*/ -drop table if exists mci_level1_data; - -drop table if exists mci_level1_header; - -drop table if exists mci_level1_prc; - -drop table if exists mci_level1_ref; /*----------------cpic------------------------------*/ -drop table if exists cpic_level1_data; - -drop table if exists cpic_level1_header; - -drop table if exists cpic_level1_prc; - -drop table if exists cpic_level1_ref; /*----------------sls------------------------------*/ - -drop table if exists sls_level1_data; - -drop table if exists sls_level1_header; - -drop table if exists sls_level1_prc; - -drop table if exists sls_level1_ref; - drop table if exists sls_level2_spectra; /*===========================facility===================================*/ +/*==============================================================*/ +/* Table: t_detector */ +/*==============================================================*/ create table t_detector ( no varchar(10) not null, @@ -155,13 +121,17 @@ create table t_observation create_time datetime, import_status tinyint(1) ); - +/*==============================================================*/ +/* Table: t_level0_data */ +/*==============================================================*/ create table t_level0_data ( id integer PRIMARY KEY autoincrement, level0_id varchar(20) not null, obs_id varchar(10) not null, detector_no varchar(10) not null, + filter varchar(16), + module_id varchar(16), obs_type varchar(16), obs_time datetime, exp_time float, @@ -208,44 +178,13 @@ create table t_level0_prc result_file_path varchar(256) ); -create table t_cal2level0 -( - merge_id int(20) not null, - level0_id varchar(20) not null, - primary key (merge_id, level0_id) -); - -create table t_cal_header -( - id int(20) not null, - ra_obj float, - dec_obj float, - create_time datetime, - primary key (id) -); - -create table t_cal_merge -( - id integer PRIMARY KEY autoincrement, - cal_id varchar(20) not null, - detector_no varchar(10) not null, - ref_type varchar(16), - obs_time datetime, - exp_time float, - filename varchar(128), - file_path varchar(256), - qc1_status tinyint(1), - qc1_time datetime, - prc_status tinyint(1), - prc_time datetime, - create_time datetime -); -/*========================msc======================================*/ -create table msc_level1_data +create table t_level1_data ( id integer PRIMARY KEY autoincrement, level0_id varchar(20) not null, data_type varchar(64) not null, + filter varchar(16), + module_id varchar(16), cor_sci_id int(20), prc_params varchar(1024), filename varchar(128), @@ -255,26 +194,30 @@ create table msc_level1_data qc1_status tinyint(1), qc1_time datetime, create_time datetime, - pipeline_id varchar(60) + pipeline_id varchar(60), + last_query_time datetime ); -create table msc_level1_ref ( - level1_id int(20) not null, - ref_type varchar(64) not null, - cal_id int(20) not null, - primary key (level1_id, ref_type) +create table t_level1_ref ( + id integer PRIMARY KEY autoincrement, + level1_id int(20) not null, + ref_type varchar(64) not null, + cal_id varchar(128) not null ); -create table msc_level1_header +create table t_level1_header ( id int(20) not null, ra_obj float, dec_obj float, - create_time datetime, + ra_cen float, + dec_cen float, + obj_hpix int(20) default 0, + cen_hpix int(20) default 0, primary key (id) ); -create table msc_level1_prc +create table t_level1_prc ( id integer PRIMARY KEY autoincrement, level1_id int(20) not null, @@ -285,7 +228,7 @@ create table msc_level1_prc prc_time datetime, result_file_path varchar(256) ); - +/*========================msc======================================*/ create table msc_level2_data ( id integer PRIMARY KEY autoincrement, level0_id VARCHAR(20) null, @@ -331,138 +274,11 @@ create table msc_level2co_header ( constraint PK_MSC_LEVEL2CO_HEADER primary key (id) ); /*===========================ifs===================================*/ -create table ifs_level1_data -( - id integer PRIMARY KEY autoincrement, - level0_id varchar(20) not null, - data_type varchar(64) not null, - cor_sci_id int(20), - prc_params varchar(1024), - filename varchar(128), - file_path varchar(256), - prc_status tinyint(1), - prc_time datetime, - qc1_status tinyint(1), - qc1_time datetime, - create_time datetime, - pipeline_id varchar(60) -); - -create table ifs_level1_ref ( - level1_id int(20) not null, - ref_type varchar(64) not null, - cal_id int(20) not null, - primary key (level1_id, ref_type) -); -create table ifs_level1_header -( - id int(20) not null, - ra_obj float, - dec_obj float, - create_time datetime, - primary key (id) -); -create table ifs_level1_prc -( - id integer PRIMARY KEY autoincrement, - level1_id int(20) not null, - pipeline_id varchar(64) not null, - prc_module varchar(32) not null, - params_file_path varchar(256), - prc_status int(2), - prc_time datetime, - result_file_path varchar(256) -); /*===========================mci===================================*/ -create table mci_level1_data -( - id integer PRIMARY KEY autoincrement, - level0_id varchar(20) not null, - data_type varchar(64) not null, - cor_sci_id int(20), - prc_params varchar(1024), - filename varchar(128), - file_path varchar(256), - prc_status tinyint(1), - prc_time datetime, - qc1_status tinyint(1), - qc1_time datetime, - create_time datetime, - pipeline_id varchar(60) -); - -create table mci_level1_ref ( - level1_id int(20) not null, - ref_type varchar(64) not null, - cal_id int(20) not null, - primary key (level1_id, ref_type) -); -create table mci_level1_header -( - id int(20) not null, - ra_obj float, - dec_obj float, - create_time datetime, - primary key (id) -); -create table mci_level1_prc -( - id integer PRIMARY KEY autoincrement, - level1_id int(20) not null, - pipeline_id varchar(64) not null, - prc_module varchar(32) not null, - params_file_path varchar(256), - prc_status int(2), - prc_time datetime, - result_file_path varchar(256) -); /*===========================sls===================================*/ -create table sls_level1_data -( - id integer PRIMARY KEY autoincrement, - level0_id varchar(20) not null, - data_type varchar(64) not null, - prc_params varchar(1024), - filename varchar(128), - file_path varchar(256), - prc_status tinyint(1), - prc_time datetime, - qc1_status tinyint(1), - qc1_time datetime, - create_time datetime, - pipeline_id varchar(60) -); - -create table sls_level1_ref ( - level1_id int(20) not null, - ref_type varchar(64) not null, - cal_id int(20) not null, - primary key (level1_id, ref_type) -); - -create table sls_level1_header -( - id int(20) not null, - ra_obj float, - dec_obj float, - create_time datetime, - primary key (id) -); - -create table sls_level1_prc -( - id integer PRIMARY KEY autoincrement, - level1_id int(20) not null, - pipeline_id varchar(64) not null, - prc_module varchar(32) not null, - params_file_path varchar(256), - prc_status int(2), - prc_time datetime, - result_file_path varchar(256) -); create table sls_level2_spectra ( @@ -489,47 +305,3 @@ create table sls_level2_spectra_header primary key (id) ); /*===========================cpic===================================*/ - -create table cpic_level1_data -( - id integer PRIMARY KEY autoincrement, - level0_id varchar(20) not null, - data_type varchar(64) not null, - prc_params varchar(1024), - filename varchar(128), - file_path varchar(256), - prc_status tinyint(1), - prc_time datetime, - qc1_status tinyint(1), - qc1_time datetime, - create_time datetime, - pipeline_id varchar(60) -); - -create table cpic_level1_ref ( - level1_id int(20) not null, - ref_type varchar(64) not null, - cal_id int(20) not null, - primary key (level1_id, ref_type) -); - -create table cpic_level1_header -( - id int(20) not null, - ra_obj float, - dec_obj float, - create_time datetime, - primary key (id) -); - -create table cpic_level1_prc -( - id integer PRIMARY KEY autoincrement, - level1_id int(20) not null, - pipeline_id varchar(64) not null, - prc_module varchar(32) not null, - params_file_path varchar(256), - prc_status int(2), - prc_time datetime, - result_file_path varchar(256) -); \ No newline at end of file diff --git a/csst_dfs_api_local/cpic/__init__.py b/csst_dfs_api_local/cpic/__init__.py index 15cff41489ffd6daffdbaa088141de7b881bb805..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/csst_dfs_api_local/cpic/__init__.py +++ b/csst_dfs_api_local/cpic/__init__.py @@ -1,5 +0,0 @@ -from .calmerge import CalMergeApi -from .level0 import Level0DataApi -from .level0prc import Level0PrcApi -from .level1 import Level1DataApi -from .level1prc import Level1PrcApi \ No newline at end of file diff --git a/csst_dfs_api_local/cpic/ingest.py b/csst_dfs_api_local/cpic/ingest.py deleted file mode 100644 index 0d85216bbdcd8b7dce13880d120ecd21565e3112..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/cpic/ingest.py +++ /dev/null @@ -1,113 +0,0 @@ -import os, sys -import argparse -import logging -from astropy.io import fits -import datetime -import shutil - -from csst_dfs_api_local.common.db import DBClient -log = logging.getLogger('csst-dfs-api-local') - -def ingest(): - - parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files") - parser.add_argument('-i','--infile', dest="infile", help="a file or a directory") - parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="copy files after import") - args = parser.parse_args(sys.argv[1:]) - - import_root_dir = args.infile - if import_root_dir is None or (not os.path.isfile(import_root_dir) and not os.path.isdir(import_root_dir)): - parser.print_help() - sys.exit(0) - - db = DBClient() - if os.path.isfile(import_root_dir): - log.info(f"prepare import {import_root_dir}") - ingesst_one(import_root_dir, db, args.copyfiles) - if os.path.isdir(import_root_dir): - for (path, _, file_names) in os.walk(import_root_dir): - for filename in file_names: - if filename.find(".fits") > 0: - file_full_path = os.path.join(path, filename) - log.info(f"prepare import {file_full_path}") - ingesst_one(file_full_path, db, args.copyfiles) - - db.close() - -def ingesst_one(file_path, db, copyfiles): - dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - - hdul = fits.open(file_path) - header = hdul[0].header - - obs_id = header["OBSID"] - exp_start_time = f"{header['DATE-OBS']} {header['TIME-OBS']}" - exp_time = header['EXPTIME'] - - module_id = header["INSTRUME"] - obs_type = header["FILETYPE"] - qc0_status = -1 - prc_status = -1 - time_now = datetime.datetime.now() - create_time = time_now.strftime('%Y-%m-%d %H:%M:%S') - - facility_status_id = 0 - module_status_id = 0 - - existed = db.exists("select * from t_observation where obs_id=?", (obs_id,)) - if not existed: - db.execute("insert into t_observation \ - (obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?)", - (obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time)) - db.end() - #level0 - detector = header["DETECTOR"] - filename = header["FILENAME"] - - existed = db.exists( - "select * from cpic_level0_data where filename=?", - (filename,) - ) - if existed: - log.warning('%s has already been imported' %(file_path, )) - db.end() - return - - detector_status_id = 0 - - file_full_path = file_path - - if copyfiles: - file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS" - if not os.path.exists(file_dir): - os.makedirs(file_dir) - file_full_path = f"{file_dir}/{filename}.fits" - - level0_id = f"{obs_id}{detector}" - - c = db.execute("insert into cpic_level0_data \ - (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?,?,?)", - (level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) - db.end() - level0_id_id = db.last_row_id() - #level0-header - ra_obj = header["RA_OBJ"] - dec_obj = header["DEC_OBJ"] - db.execute("delete from cpic_level0_header where id=?",(level0_id_id,)) - db.execute("insert into cpic_level0_header \ - (id, ra_obj, dec_obj, create_time) \ - values (?,?,?,?)", - (level0_id_id, ra_obj, dec_obj, create_time)) - - if copyfiles: - #copy files - shutil.copyfile(file_path, file_full_path) - - db.end() - - print(f"{file_path} imported") - -if __name__ == "__main__": - ingest() \ No newline at end of file diff --git a/csst_dfs_api_local/cpic/level1.py b/csst_dfs_api_local/cpic/level1.py deleted file mode 100644 index e2716a1b708b98b8f8803c30b87e56eb7bb1a0a7..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/cpic/level1.py +++ /dev/null @@ -1,205 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.cpic import Level1Record -from csst_dfs_commons.models.common import from_dict_list - -log = logging.getLogger('csst') - -class Level1DataApi(object): - def __init__(self, sub_system = "cpic"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - - def find(self, **kwargs): - ''' retrieve level1 records from database - - parameter kwargs: - level0_id: [str] - data_type: [str] - create_time : (start, end), - qc1_status : [int], - prc_status : [int], - filename: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - level0_id = get_parameter(kwargs, "level0_id") - data_type = get_parameter(kwargs, "data_type") - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0] - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1] - qc1_status = get_parameter(kwargs, "qc1_status") - prc_status = get_parameter(kwargs, "prc_status") - filename = get_parameter(kwargs, "filename") - limit = get_parameter(kwargs, "limit", 0) - - sql_count = "select count(*) as c from cpic_level1_data where 1=1" - sql_data = f"select * from cpic_level1_data where 1=1" - - sql_condition = "" - if level0_id: - sql_condition = f"{sql_condition} and level0_id='{level0_id}'" - if data_type: - sql_condition = f"{sql_condition} and data_type='{data_type}'" - if create_time_start: - sql_condition = f"{sql_condition} and create_time >='{create_time_start}'" - if create_time_end: - sql_condition = f"{sql_condition} and create_time <='{create_time_end}'" - if qc1_status: - sql_condition = f"{sql_condition} and qc1_status={qc1_status}" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - if filename: - sql_condition = f" and filename='{filename}'" - - sql_count = f"{sql_count} {sql_condition}" - sql_data = f"{sql_data} {sql_condition}" - - if limit > 0: - sql_data = f"{sql_data} limit {limit}" - - totalCount = self.db.select_one(sql_count) - _, recs = self.db.select_many(sql_data) - return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c']) - - except Exception as e: - return Result.error(message=str(e)) - - def get(self, **kwargs): - ''' - parameter kwargs: - id = [int] - return csst_dfs_common.models.Result - ''' - try: - id = get_parameter(kwargs, "id", -1) - r = self.db.select_one( - "select * from cpic_level1_data where id=?", (id,)) - if r: - return Result.ok_data(data=Level1Record().from_dict(r)) - else: - return Result.error(message=f"id:{id} not found") - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - existed = self.db.exists( - "select * from cpic_level1_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) - self.db.execute( - 'update cpic_level1_data set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_qc1_status(self, **kwargs): - ''' update the status of QC1 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - existed = self.db.exists( - "select * from cpic_level1_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) - self.db.execute( - 'update cpic_level1_data set qc1_status=?, qc1_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a level1 record into database - - parameter kwargs: - level0_id : [str] - data_type : [str] - prc_params : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - pipeline_id : [str] - refs: [dict] - return csst_dfs_common.models.Result - ''' - try: - rec = Level1Record( - id = 0, - level0_id = get_parameter(kwargs, "level0_id"), - data_type = get_parameter(kwargs, "data_type"), - prc_params = get_parameter(kwargs, "prc_params"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - refs = get_parameter(kwargs, "refs", {}) - ) - existed = self.db.exists( - "select * from cpic_level1_data where filename=?", - (rec.filename,) - ) - if existed: - log.error(f'{rec.filename} has already been existed') - return Result.error(message=f'{rec.filename} has already been existed') - - self.db.execute( - 'INSERT INTO cpic_level1_data (level0_id,data_type,prc_params,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \ - VALUES(?,?,?,?,?,?,?,?,?,?)', - (rec.level0_id, rec.data_type, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) - ) - self.db.end() - rec.id = self.db.last_row_id() - - if rec.refs.items(): - sql_refs = "insert into cpic_level1_ref (level1_id,ref_type,cal_id) values " - values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()] - _ = self.db.execute(sql_refs + ",".join(values)) - - self.db.end() - return Result.ok_data(data=rec) - except Exception as e: - log.error(e) - return Result.error(message=str(e)) \ No newline at end of file diff --git a/csst_dfs_api_local/cpic/level1prc.py b/csst_dfs_api_local/cpic/level1prc.py deleted file mode 100644 index cbac0eb5162805864b579a41b800877b3eb1060a..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/cpic/level1prc.py +++ /dev/null @@ -1,124 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.cpic import Level1PrcRecord -from csst_dfs_commons.models.common import from_dict_list - -log = logging.getLogger('csst') - -class Level1PrcApi(object): - def __init__(self, sub_system = "cpic"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - - def find(self, **kwargs): - ''' retrieve level1 procedure records from database - - parameter kwargs: - level1_id: [int] - pipeline_id: [str] - prc_module: [str] - prc_status : [int] - - return: csst_dfs_common.models.Result - ''' - try: - level1_id = get_parameter(kwargs, "level1_id", 0) - pipeline_id = get_parameter(kwargs, "pipeline_id") - prc_module = get_parameter(kwargs, "prc_module") - prc_status = get_parameter(kwargs, "prc_status") - - sql_data = f"select * from cpic_level1_prc" - - sql_condition = f"where level1_id={level1_id}" - if pipeline_id: - sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'" - if prc_module: - sql_condition = sql_condition + " and prc_module ='" + prc_module + "'" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - - sql_data = f"{sql_data} {sql_condition}" - - _, records = self.db.select_many(sql_data) - return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records)) - - except Exception as e: - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - - try: - existed = self.db.exists( - "select * from cpic_level1_prc where id=?", - (id,) - ) - if not existed: - log.warning('%s not found' %(id, )) - return Result.error(message ='%s not found' %(id, )) - self.db.execute( - 'update cpic_level1_prc set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a level1 procedure record into database - - parameter kwargs: - level1_id : [int] - pipeline_id : [str] - prc_module : [str] - params_file_path : [str] - prc_status : [int] - prc_time : [str] - result_file_path : [str] - return csst_dfs_common.models.Result - ''' - - rec = Level1PrcRecord( - id = 0, - level1_id = get_parameter(kwargs, "level1_id", 0), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - prc_module = get_parameter(kwargs, "prc_module"), - params_file_path = get_parameter(kwargs, "params_file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time"), - result_file_path = get_parameter(kwargs, "result_file_path") - ) - try: - self.db.execute( - 'INSERT INTO cpic_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ - VALUES(?,?,?,?,?,?,?)', - (rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) - ) - self.db.end() - rec.id = self.db.last_row_id() - - return Result.ok_data(data=rec) - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - diff --git a/csst_dfs_api_local/facility/calmerge.py b/csst_dfs_api_local/facility/calmerge.py deleted file mode 100644 index e6fd9e5c67aa6cf1ee92957269b69b6e12fa7ef5..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/facility/calmerge.py +++ /dev/null @@ -1,298 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.facility import CalMergeRecord -from csst_dfs_commons.models.common import from_dict_list -from .level0 import Level0DataApi - -log = logging.getLogger('csst') - -class CalMergeApi(object): - def __init__(self): - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - self.level0Api = Level0DataApi() - - def get_latest_by_l0(self, **kwargs): - ''' retrieve calibration merge records from database by level0 data - - :param kwargs: Parameter dictionary, key items support: - level0_id: [str], - ref_type: [str] - - :returns: csst_dfs_common.models.Result - ''' - try: - level0_id = get_parameter(kwargs, "level0_id") - ref_type = get_parameter(kwargs, "ref_type") - - level0_data = self.level0Api.get_by_level0_id(level0_id) - if level0_data.data is None: - return Result.error(message = "level0 data [%s]not found"%(level0_id)) - - sql_data = f"select * from t_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1" - - r = self.db.select_one(sql_data) - if r: - rec = CalMergeRecord().from_dict(r) - return Result.ok_data(data=rec) - - sql_data = f"select * from t_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time <= '{level0_data.data.obs_time}' order by obs_time DESC limit 1" - - r = self.db.select_one(sql_data) - if r: - rec = CalMergeRecord().from_dict(r) - return Result.ok_data(data=rec) - - return Result.error(message = "not found") - - except Exception as e: - return Result.error(message=str(e)) - - def find(self, **kwargs): - ''' retrieve calibration merge records from database - - parameter kwargs: - detector_no: [str] - ref_type: [str] - obs_time: (start,end) - qc1_status : [int] - prc_status : [int] - file_name: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - detector_no = get_parameter(kwargs, "detector_no") - ref_type = get_parameter(kwargs, "ref_type") - exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0] - exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1] - qc1_status = get_parameter(kwargs, "qc1_status") - prc_status = get_parameter(kwargs, "prc_status") - file_name = get_parameter(kwargs, "file_name") - limit = get_parameter(kwargs, "limit", 0) - - sql_count = "select count(*) as c from t_cal_merge where 1=1" - sql_data = f"select * from t_cal_merge where 1=1" - - sql_condition = "" - if detector_no: - sql_condition = f"{sql_condition} and detector_no='{detector_no}'" - if ref_type: - sql_condition = f"{sql_condition} and ref_type='{ref_type}'" - if exp_time_start: - sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'" - if exp_time_end: - sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'" - if qc1_status: - sql_condition = f"{sql_condition} and qc1_status={qc1_status}" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - - if file_name: - sql_condition = f" and filename={file_name}" - - sql_count = f"{sql_count} {sql_condition}" - sql_data = f"{sql_data} {sql_condition}" - - log.info(sql_count) - log.info(sql_data) - - if limit > 0: - sql_data = f"{sql_data} limit {limit}" - - totalCount = self.db.select_one(sql_count) - _, records = self.db.select_many(sql_data) - - return Result.ok_data(data=from_dict_list(CalMergeRecord, records)).append("totalCount", totalCount['c']) - - except Exception as e: - return Result.error(message=str(e)) - - def get(self, **kwargs): - ''' fetch a record from database - - parameter kwargs: - id : [int], - cal_id : [str] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - - if id == 0 and cal_id == "": - return Result.error(message="at least define id or cal_id") - - if id != 0: - return self.get_by_id(id) - if cal_id != "": - return self.get_by_cal_id(cal_id) - - def get_by_id(self, iid: int): - try: - r = self.db.select_one( - "select * from t_cal_merge where id=?", (iid,)) - if r: - - sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" - _, records = self.db.select_many(sql_get_level0_id) - level0_ids = [r["level0_id"] for r in records] - - rec = CalMergeRecord().from_dict(r) - rec.level0_ids = level0_ids - return Result.ok_data(data=rec) - else: - return Result.error(message=f"id:{iid} not found") - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def get_by_cal_id(self, cal_id: str): - try: - r = self.db.select_one( - "select * from t_cal_merge where cal_id=?", (cal_id,)) - if r: - - sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" - _, records = self.db.select_many(sql_get_level0_id) - level0_ids = [r["level0_id"] for r in records] - - rec = CalMergeRecord().from_dict(r) - rec.level0_ids = level0_ids - return Result.ok_data(data=rec) - else: - return Result.error(message=f"id:{id} not found") - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_qc1_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - cal_id : [str], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - result = self.get(id = id, cal_id = cal_id) - - if not result.success: - return Result.error(message="not found") - - id = result.data.id - status = get_parameter(kwargs, "status") - - try: - self.db.execute( - 'update t_cal_merge set qc1_status=?, qc1_time=? where id=?', - (status, format_time_ms(time.time()), id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - cal_id : [str], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id", 0) - cal_id = get_parameter(kwargs, "cal_id", "") - result = self.get(id = id, cal_id = cal_id) - - if not result.success: - return Result.error(message="not found") - - id = result.data.id - status = get_parameter(kwargs, "status") - - try: - existed = self.db.exists( - "select * from t_cal_merge where id=?", - (id,) - ) - if not existed: - log.warning('%s not found' %(id, )) - return Result.error(message ='%s not found' %(id, )) - self.db.execute( - 'update t_cal_merge set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a calibration merge record into database - - parameter kwargs: - cal_id : [str] - detector_no : [str] - ref_type : [str] - obs_time : [str] - exp_time : [float] - prc_status : [int] - prc_time : [str] - filename : [str] - file_path : [str] - level0_ids : [list] - return csst_dfs_common.models.Result - ''' - - rec = CalMergeRecord( - id = 0, - cal_id = get_parameter(kwargs, "cal_id"), - detector_no = get_parameter(kwargs, "detector_no"), - ref_type = get_parameter(kwargs, "ref_type"), - obs_time = get_parameter(kwargs, "obs_time"), - exp_time = get_parameter(kwargs, "exp_time"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time"), - level0_ids = get_parameter(kwargs, "level0_ids", []) - ) - try: - self.db.execute( - 'INSERT INTO t_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \ - VALUES(?,?,?,?,?,?,?,?,?,?)', - (rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time())) - ) - self.db.end() - rec.id = self.db.last_row_id() - - sql_level0_ids = "insert into t_cal2level0 (merge_id,level0_id) values " - values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids] - _ = self.db.execute(sql_level0_ids + ",".join(values)) - - self.db.end() - - return Result.ok_data(data=rec) - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) \ No newline at end of file diff --git a/csst_dfs_api_local/facility/ingest.py b/csst_dfs_api_local/facility/ingest.py new file mode 100644 index 0000000000000000000000000000000000000000..0a657f0d51f8e166206cbe15e3acb10460e95372 --- /dev/null +++ b/csst_dfs_api_local/facility/ingest.py @@ -0,0 +1,198 @@ +import os, sys +import argparse +import logging +from astropy.io import fits +import datetime +import shutil + +from csst_dfs_api_local.common.db import DBClient +from csst_dfs_commons.utils.fits import get_header_value +from csst_dfs_commons.models.ifs import Level0Record + +log = logging.getLogger('csst-dfs-api-local') + +def ingest(): + + parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files") + parser.add_argument('-i','--infile', dest="infile", help="a file or a directory") + parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="whether copy files after import") + args = parser.parse_args(sys.argv[1:]) + + import_root_dir = args.infile + if import_root_dir is None or (not os.path.isfile(import_root_dir) and not os.path.isdir(import_root_dir)): + parser.print_help() + sys.exit(0) + + db = DBClient() + if os.path.isfile(import_root_dir): + log.info(f"prepare import {import_root_dir}") + ingest_one(import_root_dir, db, args.copyfiles) + if os.path.isdir(import_root_dir): + for (path, _, file_names) in os.walk(import_root_dir): + for filename in file_names: + if filename.find(".fits") > 0: + file_full_path = os.path.join(path, filename) + log.info(f"prepare import {file_full_path}") + try: + ingest_one(file_full_path, db, args.copyfiles) + except Exception as e: + print(f"{file_full_path} import error!!!") + log.error(e) + + db.close() + +def ingest_one(file_path, db, copyfiles): + dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") + + hdul = fits.open(file_path) + header = hdul[0].header + #处理fits头 + obs_id = get_header_value("OBSID", header, "") + obs_id_str = "%09d" % (obs_id) if isinstance(obs_id, int) else obs_id + module_id = get_header_value("INSTRUME", header, "") + exp_start_time = f"{get_header_value('DATE-OBS', header, '')} {get_header_value('TIME-OBS', header, '')}" + exp_time = get_header_value("EXPTIME", header, 0) + module_id = get_header_value("INSTRUME", header, "") + obs_type = get_header_value("FILETYPE", header, "") + detector = str(get_header_value("DETECTOR", header, "")) + filename = get_header_value("FILENAME", header, "") + mjd_int = int(get_header_value("EXPSTART", header, 0)) + + if module_id.upper() == "HSTDM": + img_header = hdul[0].header + else: + img_header = hdul[1].header + + filter = get_header_value("FILTER", img_header, "") + + #level0_headder + object_name = get_header_value("OBJECT", header, "-") + ra_obj = get_header_value("RA_OBJ", header, 0) + dec_obj = get_header_value("DEC_OBJ", header, 0) + crpix1 = get_header_value("CRPIX1", img_header, 0) + crpix2 = get_header_value("CRPIX2", img_header, 0) + crval1 = get_header_value("CRVAL1", img_header, 0) + crval2 = get_header_value("CRVAL2", img_header, 0) + ctype1 = get_header_value("CTYPE1", img_header, "") + ctype2 = get_header_value("CTYPE2", img_header, "") + cd1_1 = get_header_value("CD1_1", img_header, 0) + cd1_2 = get_header_value("CD1_2", img_header, 0) + cd2_1 = get_header_value("CD2_1", img_header, 0) + cd2_2 = get_header_value("CD2_2", img_header, 0) + version = get_header_value("IMG_VER", img_header, "-") + # ----------------------------------------------------------------- + # 针对模块id特殊处理 + if len(detector) > 2 and module_id.upper() == "MSC": + detector = detector[-2:] + + if module_id.upper() == "IFS" or module_id.upper() == "MCI" : + obs_type = get_header_value("OBSTYPE", header, "") + exp_start_time = get_header_value('DATE-OBS', header, '') + ra_obj = get_header_value("OBJ_RA", header, 0) + dec_obj = get_header_value("OBJ_DEC", header, 0) + detector = str(get_header_value("CAMERA", img_header, "")) + filter = get_header_value("DETNAM", img_header, "") + + if module_id.upper() == "MCI" : + obs_type = get_header_value("OBSTYPE", header, "") + exp_start_time = get_header_value('DATE-OBS', header, '') + ra_obj = get_header_value("OBJ_RA", header, 0) + dec_obj = get_header_value("OBJ_DEC", header, 0) + detector = str(get_header_value("CAMERA", img_header, "")) + + if module_id.upper() == "CPIC": + exp_start_time = get_header_value('DATE-OBS', header, '') + obs_type = get_header_value("OBSTYPE", header, "") + detector = get_header_value("CCDLABEL", img_header, "") + if len(detector) > 2: + detector = detector[0:2] + object_name = get_header_value("TARGET", header, "-") + + if module_id.upper() == "HSTDM": + if len(detector) == 1: + detector = '0' + detector + # ----------------------------------------------------------------- + + qc0_status = -1 + prc_status = -1 + time_now = datetime.datetime.now() + create_time = time_now.strftime('%Y-%m-%d %H:%M:%S') + + facility_status_id = 0 + module_status_id = 0 + + existed = db.exists("select * from t_observation where obs_id=?", (obs_id_str,)) + if not existed: + db.execute("insert into t_observation \ + (obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ + values (?,?,?,?,?,?,?,?,?,?)", + (obs_id_str,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time)) + db.end() + #level0 + # 文件路径处理 + #MSC----组件名 + file_dir_in_db = f"L0/{module_id}/{obs_type.upper()}/{mjd_int}/{obs_id_str}/MS" + + if module_id.upper() == "IFS": + file_dir_in_db = f"L0/{module_id}/{obs_type.upper()}/{mjd_int}" + if module_id.upper() == "MCI": + file_dir_in_db = f"L0/{module_id}/{obs_type.upper()}/{mjd_int}/{obs_id_str}" + if module_id.upper() == "CPIC": + file_dir_in_db = f"L0/{module_id}/{obs_type.upper()}/{mjd_int}/{detector}" + if module_id.upper() == "HSTDM": + file_dir_in_db = f"L0/{module_id}/{obs_type.upper()}/{mjd_int}" + + level0Ids = db.select_one( + "select id from t_level0_data where filename=?", + (filename,) + ) + + detector_status_id = 0 + + if level0Ids is None: + file_full_path = file_path + if copyfiles: + file_dir = f"{dest_root_dir}/{file_dir_in_db}" + if not os.path.exists(file_dir): + os.makedirs(file_dir) + file_full_path = f"{file_dir}/{filename}" + shutil.copyfile(file_path, file_full_path) + level0_id = f"{obs_id}{detector}" + c = db.execute("insert into t_level0_data \ + (level0_id, obs_id, detector_no, filter, obs_type, module_id, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ + values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + (level0_id, obs_id, detector, filter, obs_type, module_id, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) + level0_id_id = db.last_row_id() + else: + level0_id_id = level0Ids[0] + log.warning('%s has already been imported' %(file_path, )) + + #level0-header + db.execute("delete from t_level0_header where id=?",(level0_id_id,)) + db.execute("insert into t_level0_header \ + (id, ra_obj, dec_obj, crpix1, crpix2, crval1, crval2, \ + ctype1, ctype2, cd1_1, cd1_2, cd2_1, cd2_2, object_name, version, create_time) \ + values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + (level0_id_id, ra_obj, dec_obj, crpix1, crpix2, crval1, crval2, ctype1, ctype2, cd1_1, cd1_2, cd2_1, cd2_2, object_name, version, create_time)) + db.end() + + rec = Level0Record( + id = level0_id_id, + level0_id = level0_id, + filter = filter, + module_id = module_id, + obs_id = obs_id, + detector_no = detector, + obs_type = obs_type, + obs_time = exp_start_time, + exp_time = exp_time, + detector_status_id = detector_status_id, + filename = filename, + file_path = file_full_path + ) + + print(f"{file_path} imported") + return rec + +if __name__ == "__main__": + ingest() \ No newline at end of file diff --git a/csst_dfs_api_local/facility/level0.py b/csst_dfs_api_local/facility/level0.py index 8d15ba03e003da10797800f45a467fe1e3761478..8e460c5bc67637866d8713f7d85d3b4303e5524d 100644 --- a/csst_dfs_api_local/facility/level0.py +++ b/csst_dfs_api_local/facility/level0.py @@ -171,7 +171,7 @@ class Level0DataApi(object): ) self.db.end() return Result.ok_data() - + except Exception as e: log.error(e) return Result.error(message=str(e)) @@ -201,14 +201,14 @@ class Level0DataApi(object): ) self.db.end() return Result.ok_data() - + except Exception as e: log.error(e) return Result.error(message=str(e)) def write(self, **kwargs): ''' insert a level0 data record into database - + parameter kwargs: file_path = [str], copyfiles = [boolean] diff --git a/csst_dfs_api_local/ifs/level1.py b/csst_dfs_api_local/facility/level1.py similarity index 75% rename from csst_dfs_api_local/ifs/level1.py rename to csst_dfs_api_local/facility/level1.py index 812dbd76283055fcd0ae4bf5445d0039b635b221..c0783fb009e9cd850bbcba81d722ab4878760721 100644 --- a/csst_dfs_api_local/ifs/level1.py +++ b/csst_dfs_api_local/facility/level1.py @@ -8,15 +8,24 @@ from ..common.utils import * from csst_dfs_commons.models import Result from csst_dfs_commons.models.ifs import Level1Record from csst_dfs_commons.models.common import from_dict_list +from csst_dfs_commons.utils.fits import get_header_value, get_healpix_id, get_healpix_ids log = logging.getLogger('csst') class Level1DataApi(object): - def __init__(self, sub_system = "ifs"): - self.sub_system = sub_system + def __init__(self): self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") self.db = DBClient() + def make_sql_heapix(self, ra, dec, radius, ra_column, dec_column, hpix_column): + arcDec = (PI / 180) * dec + whereSql = f"abs((180./pi()) * ACOS(SIN(pi() * {dec_column}/180) * SIN({arcDec}) + COS(pi() * {dec_column}/180) * COS({arcDec}) * COS((pi()/180) * ({ra_column} - {ra})))) < {radius}" + + heapix_ids = get_healpix_ids(ra, dec, radius, 128) + whereZoneSql = "%s in (%s)" % \ + (hpix_column, ','.join([str(i) for i in heapix_ids])) + return f"{whereZoneSql} and {whereSql}" + def find(self, **kwargs): ''' retrieve level1 records from database @@ -41,8 +50,8 @@ class Level1DataApi(object): filename = get_parameter(kwargs, "filename") limit = get_parameter(kwargs, "limit", 0) - sql_count = "select count(*) as c from ifs_level1_data where 1=1" - sql_data = f"select * from ifs_level1_data where 1=1" + sql_count = "select count(*) from t_level1_data d left join t_level1_header h on d.id=h.id where 1=1" + sql_data = f"select d.* from t_level1_data d left join t_level1_header h on d.id=h.id where 1=1" sql_condition = "" if level0_id: @@ -64,7 +73,19 @@ class Level1DataApi(object): sql_data = f"{sql_data} {sql_condition}" if limit > 0: - sql_data = f"{sql_data} limit {limit}" + sql_data = f"{sql_data} limit {limit}" + + ra_cen = request.other_conditions['ra_cen'] + dec_cen = request.other_conditions['dec_cen'] + radius_cen = request.other_conditions['radius_cen'] + + if ra_cen and dec_cen and radius_cen: + sql_condition = f"{sql_condition} and {self.make_sql_heapix(float(ra_cen), float(dec_cen),float(radius_cen), 'h.ra_cen','h.dec_cen','h.cen_hpix')}" + sql_count = f"{sql_count} {sql_condition}" + sql_data = f"{sql_data} {sql_condition}" + + if request.limit > 0: + sql_data = f"{sql_data} limit {request.limit}" totalCount = self.db.select_one(sql_count) _, recs = self.db.select_many(sql_data) @@ -84,7 +105,7 @@ class Level1DataApi(object): try: fits_id = get_parameter(kwargs, "id", -1) r = self.db.select_one( - "select * from ifs_level1_data where id=?", (fits_id,)) + "select * from t_level1_data where id=?", (fits_id,)) if r: return Result.ok_data(data=Level1Record().from_dict(r)) @@ -107,14 +128,14 @@ class Level1DataApi(object): status = get_parameter(kwargs, "status") try: existed = self.db.exists( - "select * from ifs_level1_data where id=?", + "select * from t_level1_data where id=?", (fits_id,) ) if not existed: log.warning('%s not found' %(fits_id, )) return Result.error(message ='%s not found' %(fits_id, )) self.db.execute( - 'update ifs_level1_data set prc_status=?, prc_time=? where id=?', + 'update t_level1_data set prc_status=?, prc_time=? where id=?', (status, format_time_ms(time.time()), fits_id) ) self.db.end() @@ -135,19 +156,18 @@ class Level1DataApi(object): status = get_parameter(kwargs, "status") try: existed = self.db.exists( - "select * from ifs_level1_data where id=?", + "select * from t_level1_data where id=?", (fits_id,) ) if not existed: log.warning('%s not found' %(fits_id, )) return Result.error(message ='%s not found' %(fits_id, )) self.db.execute( - 'update ifs_level1_data set qc1_status=?, qc1_time=? where id=?', + 'update t_level1_data set qc1_status=?, qc1_time=? where id=?', (status, format_time_ms(time.time()), fits_id) ) self.db.end() return Result.ok_data() - except Exception as e: log.error(e) return Result.error(message=str(e)) @@ -184,7 +204,7 @@ class Level1DataApi(object): refs = get_parameter(kwargs, "refs", {}) ) existed = self.db.exists( - "select * from ifs_level1_data where filename=?", + "select * from t_level1_data where filename=?", (rec.filename,) ) if existed: @@ -193,7 +213,7 @@ class Level1DataApi(object): now_str = format_time_ms(time.time()) self.db.execute( - 'INSERT INTO ifs_level1_data (level0_id,data_type,cor_sci_id,prc_params,filename,file_path,qc1_status,prc_status,prc_time, create_time,pipeline_id) \ + 'INSERT INTO t_level1_data (level0_id,data_type,cor_sci_id,prc_params,filename,file_path,qc1_status,prc_status,prc_time, create_time,pipeline_id) \ VALUES(?,?,?,?,?,?,?,?,?,?,?)', (rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status,rec.prc_time, now_str, rec.pipeline_id,) ) @@ -201,7 +221,7 @@ class Level1DataApi(object): rec.id = self.db.last_row_id() if rec.refs.items(): - sql_refs = "insert into ifs_level1_ref (level1_id,ref_type,cal_id) values " + sql_refs = "insert into t_level1_ref (level1_id,ref_type,cal_id) values " values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()] _ = self.db.execute(sql_refs + ",".join(values)) self.db.end() diff --git a/csst_dfs_api_local/ifs/level1prc.py b/csst_dfs_api_local/facility/level1prc.py similarity index 92% rename from csst_dfs_api_local/ifs/level1prc.py rename to csst_dfs_api_local/facility/level1prc.py index 626192b4cb5bf6aaf4d4c77132294238fda5b31e..703e41a3cfb73563b0c0809de79e9ccc70f4759c 100644 --- a/csst_dfs_api_local/ifs/level1prc.py +++ b/csst_dfs_api_local/facility/level1prc.py @@ -33,7 +33,7 @@ class Level1PrcApi(object): prc_module = get_parameter(kwargs, "prc_module") prc_status = get_parameter(kwargs, "prc_status") - sql_data = f"select * from ifs_level1_prc" + sql_data = f"select * from t_level1_prc" sql_condition = f"where level1_id={level1_id}" if pipeline_id: @@ -65,14 +65,14 @@ class Level1PrcApi(object): try: existed = self.db.exists( - "select * from ifs_level1_prc where id=?", + "select * from t_level1_prc where id=?", (id,) ) if not existed: log.warning('%s not found' %(id, )) return Result.error(message ='%s not found' %(id, )) self.db.execute( - 'update ifs_level1_prc set prc_status=?, prc_time=? where id=?', + 'update t_level1_prc set prc_status=?, prc_time=? where id=?', (status, format_time_ms(time.time()), id) ) self.db.end() @@ -108,7 +108,7 @@ class Level1PrcApi(object): ) try: self.db.execute( - 'INSERT INTO ifs_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ + 'INSERT INTO t_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ VALUES(?,?,?,?,?,?,?)', (rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) ) diff --git a/csst_dfs_api_local/ifs/__init__.py b/csst_dfs_api_local/ifs/__init__.py index 15cff41489ffd6daffdbaa088141de7b881bb805..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/csst_dfs_api_local/ifs/__init__.py +++ b/csst_dfs_api_local/ifs/__init__.py @@ -1,5 +0,0 @@ -from .calmerge import CalMergeApi -from .level0 import Level0DataApi -from .level0prc import Level0PrcApi -from .level1 import Level1DataApi -from .level1prc import Level1PrcApi \ No newline at end of file diff --git a/csst_dfs_api_local/ifs/ingest.py b/csst_dfs_api_local/ifs/ingest.py deleted file mode 100644 index a40bb3a9519d75bd90336f779c7f2c0cdae328f1..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/ifs/ingest.py +++ /dev/null @@ -1,138 +0,0 @@ -import os, sys -import argparse -import logging -from astropy.io import fits -import datetime -import shutil - -from csst_dfs_api_local.common.db import DBClient -from csst_dfs_commons.utils.fits import get_header_value -from csst_dfs_commons.models.ifs import Level0Record - -log = logging.getLogger('csst-dfs-api-local') - -def ingest(): - - parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files") - parser.add_argument('-i','--infile', dest="infile", help="a file or a directory") - parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="whether copy files after import") - args = parser.parse_args(sys.argv[1:]) - - import_root_dir = args.infile - if import_root_dir is None or (not os.path.isfile(import_root_dir) and not os.path.isdir(import_root_dir)): - parser.print_help() - sys.exit(0) - - db = DBClient() - if os.path.isfile(import_root_dir): - log.info(f"prepare import {import_root_dir}") - ingest_one(import_root_dir, db, args.copyfiles) - if os.path.isdir(import_root_dir): - for (path, _, file_names) in os.walk(import_root_dir): - for filename in file_names: - if filename.find(".fits") > 0: - file_full_path = os.path.join(path, filename) - log.info(f"prepare import {file_full_path}") - try: - ingest_one(file_full_path, db, args.copyfiles) - except Exception as e: - print(f"{file_full_path} import error!!!") - log.error(e) - - db.close() - -def ingest_one(file_path, db, copyfiles): - dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - - hdul = fits.open(file_path) - header = hdul[0].header - header1 = hdul[1].header - - obs_id = header["OBSID"] - exp_start_time = f"{header['DATE-OBS']}" - exp_time = header['EXPTIME'] - - module_id = header["INSTRUME"] - obs_type = header["OBSTYPE"] - object_name = get_header_value("OBJECT", header, "-") - - qc0_status = -1 - prc_status = -1 - time_now = datetime.datetime.now() - create_time = time_now.strftime('%Y-%m-%d %H:%M:%S') - - facility_status_id = 0 - module_status_id = 0 - - existed = db.exists("select * from t_observation where obs_id=?", (obs_id,)) - if not existed: - db.execute("insert into t_observation \ - (obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?)", - (obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time)) - db.end() - #level0 - detector = get_header_value("DETNAM", header1, "-") - filename = get_header_value("FILENAME", header, os.path.basename(file_path)) - version = get_header_value("IMG_VER", header1, "-") - - existed = db.exists( - "select * from ifs_level0_data where filename=?", - (filename,) - ) - if existed: - log.warning('%s has already been imported' %(file_path, )) - db.end() - return - - detector_status_id = 0 - - file_full_path = file_path - - if copyfiles: - file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS" - if not os.path.exists(file_dir): - os.makedirs(file_dir) - file_full_path = f"{file_dir}/{filename}.fits" - - level0_id = f"{obs_id}{detector}" - - c = db.execute("insert into ifs_level0_data \ - (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?,?,?)", - (level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) - db.end() - level0_id_id = db.last_row_id() - #level0-header - ra_obj = header["OBJ_RA"] - dec_obj = header["OBJ_DEC"] - db.execute("delete from ifs_level0_header where id=?",(level0_id_id,)) - db.execute("insert into ifs_level0_header \ - (id, ra_obj, dec_obj, object_name, version) \ - values (?,?,?,?,?)", - (level0_id_id, ra_obj, dec_obj, object_name, version)) - - if copyfiles: - #copy files - shutil.copyfile(file_path, file_full_path) - - db.end() - - rec = Level0Record( - id = level0_id_id, - level0_id = level0_id, - obs_id = obs_id, - detector_no = detector, - obs_type = obs_type, - obs_time = exp_start_time, - exp_time = exp_time, - detector_status_id = detector_status_id, - filename = filename, - file_path = file_full_path - ) - - print(f"{file_path} imported") - return rec - -if __name__ == "__main__": - ingest() \ No newline at end of file diff --git a/csst_dfs_api_local/mci/__init__.py b/csst_dfs_api_local/mci/__init__.py index 15cff41489ffd6daffdbaa088141de7b881bb805..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/csst_dfs_api_local/mci/__init__.py +++ b/csst_dfs_api_local/mci/__init__.py @@ -1,5 +0,0 @@ -from .calmerge import CalMergeApi -from .level0 import Level0DataApi -from .level0prc import Level0PrcApi -from .level1 import Level1DataApi -from .level1prc import Level1PrcApi \ No newline at end of file diff --git a/csst_dfs_api_local/mci/ingest.py b/csst_dfs_api_local/mci/ingest.py deleted file mode 100644 index f60436ccde8ca0dc15dd19ee7e4c9bd071949184..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/mci/ingest.py +++ /dev/null @@ -1,136 +0,0 @@ -import os, sys -import argparse -import logging -from astropy.io import fits -import datetime -import shutil - -from csst_dfs_api_local.common.db import DBClient -from csst_dfs_commons.utils.fits import get_header_value -from csst_dfs_commons.models.mci import Level0Record - -log = logging.getLogger('csst-dfs-api-local') - -def ingest(): - - parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files") - parser.add_argument('-i','--infile', dest="infile", help="a file or a directory") - parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="whether copy files after import") - args = parser.parse_args(sys.argv[1:]) - - import_root_dir = args.infile - if import_root_dir is None or (not os.path.isfile(import_root_dir) and not os.path.isdir(import_root_dir)): - parser.print_help() - sys.exit(0) - - db = DBClient() - if os.path.isfile(import_root_dir): - log.info(f"prepare import {import_root_dir}") - ingest_one(import_root_dir, db, args.copyfiles) - if os.path.isdir(import_root_dir): - for (path, _, file_names) in os.walk(import_root_dir): - for filename in file_names: - if filename.find(".fits") > 0: - file_full_path = os.path.join(path, filename) - log.info(f"prepare import {file_full_path}") - try: - ingest_one(file_full_path, db, args.copyfiles) - except Exception as e: - print(f"{file_full_path} import error!!!") - log.error(e) - - db.close() - -def ingest_one(file_path, db, copyfiles): - dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - - hdul = fits.open(file_path) - header = hdul[0].header - header1 = hdul[1].header - - obs_id = header["OBSID"] - exp_start_time = f"{header['DATE-OBS']}" - exp_time = header['EXPTIME'] - - module_id = header["INSTRUME"] - obs_type = header["OBSTYPE"] - object_name = get_header_value("OBJECT", header, "-") - - qc0_status = -1 - prc_status = -1 - time_now = datetime.datetime.now() - create_time = time_now.strftime('%Y-%m-%d %H:%M:%S') - - facility_status_id = 0 - module_status_id = 0 - - existed = db.exists("select * from t_observation where obs_id=?", (obs_id,)) - if not existed: - db.execute("insert into t_observation \ - (obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?)", - (obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time)) - db.end() - #level0 - detector = get_header_value("DETNAM", header1, "-") - filename = get_header_value("FILENAME", header, os.path.basename(file_path)) - version = get_header_value("IMG_VER", header1, "-") - existed = db.exists( - "select * from mci_level0_data where filename=?", - (filename,) - ) - if existed: - log.warning('%s has already been imported' %(file_path, )) - db.end() - return - - detector_status_id = 0 - - file_full_path = file_path - - if copyfiles: - file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS" - if not os.path.exists(file_dir): - os.makedirs(file_dir) - file_full_path = f"{file_dir}/{filename}.fits" - - level0_id = f"{obs_id}{detector}" - - c = db.execute("insert into mci_level0_data \ - (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?,?,?)", - (level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) - db.end() - level0_id_id = db.last_row_id() - #level0-header - ra_obj = header["OBJ_RA"] - dec_obj = header["OBJ_DEC"] - db.execute("delete from mci_level0_header where id=?",(level0_id_id,)) - db.execute("insert into mci_level0_header \ - (id, ra_obj, dec_obj, object_name, version) \ - values (?,?,?,?,?)", - (level0_id_id, ra_obj, dec_obj, object_name, version)) - - if copyfiles: - #copy files - shutil.copyfile(file_path, file_full_path) - - db.end() - - rec = Level0Record( - id = level0_id_id, - level0_id = level0_id, - obs_id = obs_id, - detector_no = detector, - obs_type = obs_type, - obs_time = exp_start_time, - exp_time = exp_time, - detector_status_id = detector_status_id, - filename = filename, - file_path = file_full_path - ) - print(f"{file_path} imported") - return rec - -if __name__ == "__main__": - ingest() \ No newline at end of file diff --git a/csst_dfs_api_local/mci/level1.py b/csst_dfs_api_local/mci/level1.py deleted file mode 100644 index 05458cf21c5ee8fe4f4ac8fac9e44d06cf7ac659..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/mci/level1.py +++ /dev/null @@ -1,213 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.mci import Level1Record -from csst_dfs_commons.models.common import from_dict_list - -log = logging.getLogger('csst') - -class Level1DataApi(object): - def __init__(self, sub_system = "mci"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - - def find(self, **kwargs): - ''' retrieve level1 records from database - - parameter kwargs: - level0_id: [str] - data_type: [str] - create_time : (start, end), - qc1_status : [int], - prc_status : [int], - filename: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - level0_id = get_parameter(kwargs, "level0_id") - data_type = get_parameter(kwargs, "data_type") - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0] - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1] - qc1_status = get_parameter(kwargs, "qc1_status") - prc_status = get_parameter(kwargs, "prc_status") - filename = get_parameter(kwargs, "filename") - limit = get_parameter(kwargs, "limit", 0) - - sql_count = "select count(*) as c from mci_level1_data where 1=1" - sql_data = f"select * from mci_level1_data where 1=1" - - sql_condition = "" - if level0_id: - sql_condition = f"{sql_condition} and level0_id='{level0_id}'" - if data_type: - sql_condition = f"{sql_condition} and data_type='{data_type}'" - if create_time_start: - sql_condition = f"{sql_condition} and create_time >='{create_time_start}'" - if create_time_end: - sql_condition = f"{sql_condition} and create_time <='{create_time_end}'" - if qc1_status: - sql_condition = f"{sql_condition} and qc1_status={qc1_status}" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - if filename: - sql_condition = f" and filename='{filename}'" - - sql_count = f"{sql_count} {sql_condition}" - sql_data = f"{sql_data} {sql_condition}" - - if limit > 0: - sql_data = f"{sql_data} limit {limit}" - - totalCount = self.db.select_one(sql_count) - _, recs = self.db.select_many(sql_data) - return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c']) - - except Exception as e: - return Result.error(message=str(e)) - - - def get(self, **kwargs): - ''' - parameter kwargs: - id = [int] - - return dict or None - ''' - try: - fits_id = get_parameter(kwargs, "id", -1) - r = self.db.select_one( - "select * from mci_level1_data where id=?", (fits_id,)) - - if r: - return Result.ok_data(data=Level1Record().from_dict(r)) - else: - return Result.error(message=f"id:{fits_id} not found") - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - existed = self.db.exists( - "select * from mci_level1_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) - self.db.execute( - 'update mci_level1_data set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_qc1_status(self, **kwargs): - ''' update the status of QC1 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - existed = self.db.exists( - "select * from mci_level1_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) - self.db.execute( - 'update mci_level1_data set qc1_status=?, qc1_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a level1 record into database - - parameter kwargs: - level0_id : [str] - data_type : [str] - cor_sci_id : [int] - prc_params : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - pipeline_id : [str] - refs : [dict] - - return csst_dfs_common.models.Result - ''' - try: - rec = Level1Record( - id = 0, - level0_id = get_parameter(kwargs, "level0_id"), - data_type = get_parameter(kwargs, "data_type"), - cor_sci_id = get_parameter(kwargs, "cor_sci_id"), - prc_params = get_parameter(kwargs, "prc_params"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - refs = get_parameter(kwargs, "refs", {}) - ) - existed = self.db.exists( - "select * from mci_level1_data where filename=?", - (rec.filename,) - ) - if existed: - log.error(f'{rec.filename} has already been existed') - return Result.error(message=f'{rec.filename} has already been existed') - - now_str = format_time_ms(time.time()) - self.db.execute( - 'INSERT INTO mci_level1_data (level0_id,data_type,cor_sci_id,prc_params,filename,file_path,qc1_status,prc_status,prc_time, create_time,pipeline_id) \ - VALUES(?,?,?,?,?,?,?,?,?,?,?)', - (rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status,rec.prc_time, now_str, rec.pipeline_id,) - ) - self.db.end() - rec.id = self.db.last_row_id() - - if rec.refs.items(): - sql_refs = "insert into mci_level1_ref (level1_id,ref_type,cal_id) values " - values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()] - _ = self.db.execute(sql_refs + ",".join(values)) - self.db.end() - - rec.create_time = now_str - return Result.ok_data(data=rec) - except Exception as e: - log.error(e) - return Result.error(message=str(e)) \ No newline at end of file diff --git a/csst_dfs_api_local/mci/level1prc.py b/csst_dfs_api_local/mci/level1prc.py deleted file mode 100644 index 74891b578fc469b1ea7eca50ea0d123e645b3a87..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/mci/level1prc.py +++ /dev/null @@ -1,123 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.mci import Level1PrcRecord -from csst_dfs_commons.models.common import from_dict_list - -log = logging.getLogger('csst') - -class Level1PrcApi(object): - def __init__(self): - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - - def find(self, **kwargs): - ''' retrieve level1 procedure records from database - - parameter kwargs: - level1_id: [int] - pipeline_id: [str] - prc_module: [str] - prc_status : [int] - - return: csst_dfs_common.models.Result - ''' - try: - level1_id = get_parameter(kwargs, "level1_id", 0) - pipeline_id = get_parameter(kwargs, "pipeline_id") - prc_module = get_parameter(kwargs, "prc_module") - prc_status = get_parameter(kwargs, "prc_status") - - sql_data = f"select * from mci_level1_prc" - - sql_condition = f"where level1_id={level1_id}" - if pipeline_id: - sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'" - if prc_module: - sql_condition = sql_condition + " and prc_module ='" + prc_module + "'" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - - sql_data = f"{sql_data} {sql_condition}" - - _, records = self.db.select_many(sql_data) - return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records)) - - except Exception as e: - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - - try: - existed = self.db.exists( - "select * from mci_level1_prc where id=?", - (id,) - ) - if not existed: - log.warning('%s not found' %(id, )) - return Result.error(message ='%s not found' %(id, )) - self.db.execute( - 'update mci_level1_prc set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a level1 procedure record into database - - parameter kwargs: - level1_id : [int] - pipeline_id : [str] - prc_module : [str] - params_file_path : [str] - prc_status : [int] - prc_time : [str] - result_file_path : [str] - return csst_dfs_common.models.Result - ''' - - rec = Level1PrcRecord( - id = 0, - level1_id = get_parameter(kwargs, "level1_id", 0), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - prc_module = get_parameter(kwargs, "prc_module"), - params_file_path = get_parameter(kwargs, "params_file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time"), - result_file_path = get_parameter(kwargs, "result_file_path") - ) - try: - self.db.execute( - 'INSERT INTO mci_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ - VALUES(?,?,?,?,?,?,?)', - (rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) - ) - self.db.end() - rec.id = self.db.last_row_id() - - return Result.ok_data(data=rec) - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - diff --git a/csst_dfs_api_local/msc/__init__.py b/csst_dfs_api_local/msc/__init__.py index 79badfb73d6f6a8c51a4472693b21acfba3f244a..c3a09d1fd8474db31036e56193eeb2ff54a58a87 100644 --- a/csst_dfs_api_local/msc/__init__.py +++ b/csst_dfs_api_local/msc/__init__.py @@ -1,6 +1 @@ -from .calmerge import CalMergeApi -from .level0 import Level0DataApi -from .level0prc import Level0PrcApi -from .level1 import Level1DataApi -from .level1prc import Level1PrcApi from .level2 import Level2DataApi \ No newline at end of file diff --git a/csst_dfs_api_local/msc/ingest.py b/csst_dfs_api_local/msc/ingest.py deleted file mode 100644 index 7a2281068e2adeca3b3958fe260ac43ea2bb1400..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/msc/ingest.py +++ /dev/null @@ -1,131 +0,0 @@ -import os, sys -import argparse -import logging -from astropy.io import fits -import datetime -import shutil - -from csst_dfs_api_local.common.db import DBClient -from csst_dfs_commons.utils.fits import get_header_value - -log = logging.getLogger('csst-dfs-api-local') - -def ingest(): - - parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files") - parser.add_argument('-i','--infile', dest="infile", help="a file or a directory") - parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="whether copy files after import") - args = parser.parse_args(sys.argv[1:]) - - import_root_dir = args.infile - if import_root_dir is None or (not os.path.isfile(import_root_dir) and not os.path.isdir(import_root_dir)): - parser.print_help() - sys.exit(0) - - db = DBClient() - if os.path.isfile(import_root_dir): - log.info(f"prepare import {import_root_dir}") - ingest_one(import_root_dir, db, args.copyfiles) - if os.path.isdir(import_root_dir): - for (path, _, file_names) in os.walk(import_root_dir): - for filename in file_names: - if filename.find(".fits") > 0: - file_full_path = os.path.join(path, filename) - log.info(f"prepare import {file_full_path}") - ingest_one(file_full_path, db, args.copyfiles) - - db.close() - -def ingest_one(file_path, db, copyfiles): - dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - - hdul = fits.open(file_path) - fits_header = hdul[0].header - img_header = hdul[1].header - - obs_id = fits_header["OBSID"] - exp_start_time = f"{fits_header['DATE-OBS']} {fits_header['TIME-OBS']}" - exp_time = fits_header['EXPTIME'] - - module_id = fits_header["INSTRUME"] - obs_type = fits_header["FILETYPE"] - qc0_status = -1 - prc_status = -1 - time_now = datetime.datetime.now() - create_time = time_now.strftime('%Y-%m-%d %H:%M:%S') - - facility_status_id = 0 - module_status_id = 0 - - existed = db.exists("select * from t_observation where obs_id=?", (obs_id,)) - if not existed: - db.execute("insert into t_observation \ - (obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?)", - (obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time)) - db.end() - #level0 - detector = get_header_value("DETECTOR", fits_header, "") - if len(detector) > 2: - detector = detector[-2:] - filename = get_header_value("FILENAME", fits_header, os.path.basename(file_path)) - - existed = db.exists( - "select * from msc_level0_data where filename=?", - (filename,) - ) - if existed: - log.warning('%s has already been imported' %(file_path, )) - db.end() - return - - detector_status_id = 0 - - file_full_path = file_path - - if copyfiles: - file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{fits_header['EXPSTART']}/{obs_id}/MS" - if not os.path.exists(file_dir): - os.makedirs(file_dir) - file_full_path = f"{file_dir}/{filename}.fits" - - level0_id = f"{obs_id}{detector}" - - c = db.execute("insert into msc_level0_data \ - (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?,?,?)", - (level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) - db.end() - level0_id_id = db.last_row_id() - - #level0-header - ra_obj = get_header_value("RA_OBJ", fits_header, 0) - dec_obj = get_header_value("DEC_OBJ", fits_header, 0) - crpix1 = get_header_value("CRPIX1", img_header, 0) - crpix2 = get_header_value("CRPIX2", img_header, 0) - crval1 = get_header_value("CRVAL1", img_header, 0) - crval2 = get_header_value("CRVAL2", img_header, 0) - ctype1 = get_header_value("CTYPE1", img_header, "") - ctype2 = get_header_value("CTYPE2", img_header, "") - cd1_1 = get_header_value("CD1_1", img_header, 0) - cd1_2 = get_header_value("CD1_2", img_header, 0) - cd2_1 = get_header_value("CD2_1", img_header, 0) - cd2_2 = get_header_value("CD2_2", img_header, 0) - - db.execute("delete from msc_level0_header where id=?",(level0_id_id,)) - db.execute("insert into msc_level0_header \ - (id, ra_obj, dec_obj, crpix1, crpix2, crval1, crval2, \ - ctype1, ctype2, cd1_1, cd1_2, cd2_1, cd2_2, create_time) \ - values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - (level0_id_id, ra_obj, dec_obj, crpix1, crpix2, crval1, crval2, ctype1, ctype2, cd1_1, cd1_2, cd2_1, cd2_2, create_time)) - - if copyfiles: - #copy files - shutil.copyfile(file_path, file_full_path) - - db.end() - - print(f"{file_path} imported") - -if __name__ == "__main__": - ingest() \ No newline at end of file diff --git a/csst_dfs_api_local/msc/level1.py b/csst_dfs_api_local/msc/level1.py deleted file mode 100644 index 9e1a9130d5b9d0e8b2268e5856e2eafdcb92f8d2..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/msc/level1.py +++ /dev/null @@ -1,208 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.msc import Level1Record -from csst_dfs_commons.models.common import from_dict_list - -log = logging.getLogger('csst') - -class Level1DataApi(object): - def __init__(self, sub_system = "msc"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - - def find(self, **kwargs): - ''' retrieve level1 records from database - - parameter kwargs: - level0_id: [str] - data_type: [str] - create_time : (start, end), - qc1_status : [int], - prc_status : [int], - filename: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - level0_id = get_parameter(kwargs, "level0_id") - data_type = get_parameter(kwargs, "data_type") - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0] - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1] - qc1_status = get_parameter(kwargs, "qc1_status") - prc_status = get_parameter(kwargs, "prc_status") - filename = get_parameter(kwargs, "filename") - limit = get_parameter(kwargs, "limit", 0) - - sql_count = "select count(*) as c from msc_level1_data where 1=1" - sql_data = f"select * from msc_level1_data where 1=1" - - sql_condition = "" - if level0_id: - sql_condition = f"{sql_condition} and level0_id='{level0_id}'" - if data_type: - sql_condition = f"{sql_condition} and data_type='{data_type}'" - if create_time_start: - sql_condition = f"{sql_condition} and create_time >='{create_time_start}'" - if create_time_end: - sql_condition = f"{sql_condition} and create_time <='{create_time_end}'" - if qc1_status: - sql_condition = f"{sql_condition} and qc1_status={qc1_status}" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - if filename: - sql_condition = f" and filename='{filename}'" - - sql_count = f"{sql_count} {sql_condition}" - sql_data = f"{sql_data} {sql_condition}" - - if limit > 0: - sql_data = f"{sql_data} limit {limit}" - - totalCount = self.db.select_one(sql_count) - _, recs = self.db.select_many(sql_data) - return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c']) - - except Exception as e: - return Result.error(message=str(e)) - - def get(self, **kwargs): - ''' - parameter kwargs: - id = [int] - return csst_dfs_common.models.Result - ''' - try: - id = get_parameter(kwargs, "id", -1) - r = self.db.select_one( - "select * from msc_level1_data where id=?", (id,)) - if r: - return Result.ok_data(data=Level1Record().from_dict(r)) - else: - return Result.error(message=f"id:{id} not found") - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - existed = self.db.exists( - "select * from msc_level1_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) - self.db.execute( - 'update msc_level1_data set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_qc1_status(self, **kwargs): - ''' update the status of QC1 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - existed = self.db.exists( - "select * from msc_level1_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) - self.db.execute( - 'update msc_level1_data set qc1_status=?, qc1_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a level1 record into database - - parameter kwargs: - level0_id : [str] - data_type : [str] - cor_sci_id : [int] - prc_params : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - pipeline_id : [str] - refs: [dict] - - return csst_dfs_common.models.Result - ''' - try: - rec = Level1Record( - id = 0, - level0_id = get_parameter(kwargs, "level0_id"), - data_type = get_parameter(kwargs, "data_type"), - cor_sci_id = get_parameter(kwargs, "cor_sci_id"), - prc_params = get_parameter(kwargs, "prc_params"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - refs = get_parameter(kwargs, "refs", {}) - ) - existed = self.db.exists( - "select * from msc_level1_data where filename=?", - (rec.filename,) - ) - if existed: - log.error(f'{rec.filename} has already been existed') - return Result.error(message=f'{rec.filename} has already been existed') - - self.db.execute( - 'INSERT INTO msc_level1_data (level0_id,data_type,cor_sci_id,prc_params,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \ - VALUES(?,?,?,?,?,?,?,?,?,?,?)', - (rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) - ) - self.db.end() - rec.id = self.db.last_row_id() - - if rec.refs.items(): - sql_refs = "insert into msc_level1_ref (level1_id,ref_type,cal_id) values " - values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()] - _ = self.db.execute(sql_refs + ",".join(values)) - - self.db.end() - return Result.ok_data(data=rec) - except Exception as e: - log.error(e) - return Result.error(message=str(e)) \ No newline at end of file diff --git a/csst_dfs_api_local/msc/level1prc.py b/csst_dfs_api_local/msc/level1prc.py deleted file mode 100644 index 1bba992f26cb49cbaa434a5fb18d2ae430177ac6..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/msc/level1prc.py +++ /dev/null @@ -1,124 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.msc import Level1PrcRecord -from csst_dfs_commons.models.common import from_dict_list - -log = logging.getLogger('csst') - -class Level1PrcApi(object): - def __init__(self, sub_system = "msc"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - - def find(self, **kwargs): - ''' retrieve level1 procedure records from database - - parameter kwargs: - level1_id: [int] - pipeline_id: [str] - prc_module: [str] - prc_status : [int] - - return: csst_dfs_common.models.Result - ''' - try: - level1_id = get_parameter(kwargs, "level1_id", 0) - pipeline_id = get_parameter(kwargs, "pipeline_id") - prc_module = get_parameter(kwargs, "prc_module") - prc_status = get_parameter(kwargs, "prc_status") - - sql_data = f"select * from msc_level1_prc" - - sql_condition = f"where level1_id={level1_id}" - if pipeline_id: - sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'" - if prc_module: - sql_condition = sql_condition + " and prc_module ='" + prc_module + "'" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - - sql_data = f"{sql_data} {sql_condition}" - - _, records = self.db.select_many(sql_data) - return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records)) - - except Exception as e: - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - - try: - existed = self.db.exists( - "select * from msc_level1_prc where id=?", - (id,) - ) - if not existed: - log.warning('%s not found' %(id, )) - return Result.error(message ='%s not found' %(id, )) - self.db.execute( - 'update msc_level1_prc set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a level1 procedure record into database - - parameter kwargs: - level1_id : [int] - pipeline_id : [str] - prc_module : [str] - params_file_path : [str] - prc_status : [int] - prc_time : [str] - result_file_path : [str] - return csst_dfs_common.models.Result - ''' - - rec = Level1PrcRecord( - id = 0, - level1_id = get_parameter(kwargs, "level1_id", 0), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - prc_module = get_parameter(kwargs, "prc_module"), - params_file_path = get_parameter(kwargs, "params_file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time"), - result_file_path = get_parameter(kwargs, "result_file_path") - ) - try: - self.db.execute( - 'INSERT INTO msc_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ - VALUES(?,?,?,?,?,?,?)', - (rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) - ) - self.db.end() - rec.id = self.db.last_row_id() - - return Result.ok_data(data=rec) - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - diff --git a/csst_dfs_api_local/msc/level2.py b/csst_dfs_api_local/msc/level2.py index 896c118f7b316c6bce2ca9198dfe9a517406aced..e2d8d7255d0ecc60ea694c27c313740231abf6f2 100644 --- a/csst_dfs_api_local/msc/level2.py +++ b/csst_dfs_api_local/msc/level2.py @@ -154,7 +154,6 @@ class Level2DataApi(object): ) self.db.end() return Result.ok_data() - except Exception as e: log.error(e) return Result.error(message=str(e)) @@ -172,7 +171,6 @@ class Level2DataApi(object): prc_status : [int] prc_time : [str] - return csst_dfs_common.models.Result ''' try: diff --git a/csst_dfs_api_local/sls/__init__.py b/csst_dfs_api_local/sls/__init__.py index 819de80eeba5ef22911a8443de66f86fe63b39d8..a1d1cd89ec87b5d631236a701fa7a5d9fcda7a76 100644 --- a/csst_dfs_api_local/sls/__init__.py +++ b/csst_dfs_api_local/sls/__init__.py @@ -1,2 +1 @@ -from .level1 import Level1DataApi from .level2spectra import Level2SpectraApi \ No newline at end of file diff --git a/csst_dfs_api_local/sls/ingest.py b/csst_dfs_api_local/sls/ingest.py deleted file mode 100644 index b163698beedd800adbf5a42589b240f6e1180fc6..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/sls/ingest.py +++ /dev/null @@ -1,109 +0,0 @@ -import os, sys -import argparse -import logging -from astropy.io import fits -import datetime -import shutil - -from csst_dfs_api_local.common.db import DBClient -log = logging.getLogger('csst-dfs-api-local') - -def ingest(): - db = DBClient() - parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files") - parser.add_argument('-i','--infile', dest="infile", help="a file or a directory") - parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="copy files after import") - args = parser.parse_args(sys.argv[1:]) - - import_root_dir = args.infile - - if os.path.isfile(import_root_dir): - log.info(f"prepare import {import_root_dir}") - ingesst_one(import_root_dir, db, args.copyfiles) - if os.path.isdir(import_root_dir): - for (path, _, file_names) in os.walk(import_root_dir): - for filename in file_names: - if filename.find(".fits") > 0: - file_full_path = os.path.join(path, filename) - log.info(f"prepare import {file_full_path}") - ingesst_one(file_full_path, db, args.copyfiles) - - db.close() - -def ingesst_one(file_path, db, copyfiles): - dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - - hdul = fits.open(file_path) - header = hdul[0].header - - obs_id = header["OBSID"] - exp_start_time = f"{header['DATE-OBS']} {header['TIME-OBS']}" - exp_time = header['EXPTIME'] - - module_id = header["INSTRUME"] - obs_type = header["FILETYPE"] - qc0_status = -1 - prc_status = -1 - time_now = datetime.datetime.now() - create_time = time_now.strftime('%Y-%m-%d %H:%M:%S') - - facility_status_id = 0 - module_status_id = 0 - - existed = db.exists("select * from t_observation where obs_id=?", (obs_id,)) - if not existed: - db.execute("insert into t_observation \ - (obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?)", - (obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time)) - db.end() - #level0 - detector = header["DETECTOR"] - filename = header["FILENAME"] - - existed = db.exists( - "select * from sls_level0_data where filename=?", - (filename,) - ) - if existed: - log.warning('%s has already been imported' %(file_path, )) - db.end() - return - - detector_status_id = 0 - - file_full_path = file_path - - if copyfiles: - file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS" - if not os.path.exists(file_dir): - os.makedirs(file_dir) - file_full_path = f"{file_dir}/{filename}.fits" - - level0_id = f"{obs_id}{detector}" - - c = db.execute("insert into sls_level0_data \ - (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ - values (?,?,?,?,?,?,?,?,?,?,?,?)", - (level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) - db.end() - level0_id_id = db.last_row_id() - #level0-header - ra_obj = header["RA_OBJ"] - dec_obj = header["DEC_OBJ"] - db.execute("delete from sls_level0_header where id=?",(level0_id_id,)) - db.execute("insert into sls_level0_header \ - (id, ra_obj, dec_obj, create_time) \ - values (?,?,?,?)", - (level0_id_id, ra_obj, dec_obj, create_time)) - - if copyfiles: - #copy files - shutil.copyfile(file_path, file_full_path) - - db.end() - - print(f"{file_path} imported") - -if __name__ == "__main__": - ingest() \ No newline at end of file diff --git a/csst_dfs_api_local/sls/level1.py b/csst_dfs_api_local/sls/level1.py deleted file mode 100644 index 60d8e8d73209a6635e47042f8956f218f819c178..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/sls/level1.py +++ /dev/null @@ -1,205 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.sls import Level1Record -from csst_dfs_commons.models.common import from_dict_list - -log = logging.getLogger('csst') - -class Level1DataApi(object): - def __init__(self, sub_system = "sls"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - - def find(self, **kwargs): - ''' retrieve level1 records from database - - parameter kwargs: - level0_id: [str] - data_type: [str] - create_time : (start, end), - qc1_status : [int], - prc_status : [int], - filename: [str] - limit: limits returns the number of records,default 0:no-limit - - return: csst_dfs_common.models.Result - ''' - try: - level0_id = get_parameter(kwargs, "level0_id") - data_type = get_parameter(kwargs, "data_type") - create_time_start = get_parameter(kwargs, "create_time", [None, None])[0] - create_time_end = get_parameter(kwargs, "create_time", [None, None])[1] - qc1_status = get_parameter(kwargs, "qc1_status") - prc_status = get_parameter(kwargs, "prc_status") - filename = get_parameter(kwargs, "filename") - limit = get_parameter(kwargs, "limit", 0) - - sql_count = "select count(*) as c from sls_level1_data where 1=1" - sql_data = f"select * from sls_level1_data where 1=1" - - sql_condition = "" - if level0_id: - sql_condition = f"{sql_condition} and level0_id='{level0_id}'" - if data_type: - sql_condition = f"{sql_condition} and data_type='{data_type}'" - if create_time_start: - sql_condition = f"{sql_condition} and create_time >='{create_time_start}'" - if create_time_end: - sql_condition = f"{sql_condition} and create_time <='{create_time_end}'" - if qc1_status: - sql_condition = f"{sql_condition} and qc1_status={qc1_status}" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - if filename: - sql_condition = f" and filename='{filename}'" - - sql_count = f"{sql_count} {sql_condition}" - sql_data = f"{sql_data} {sql_condition}" - - if limit > 0: - sql_data = f"{sql_data} limit {limit}" - - totalCount = self.db.select_one(sql_count) - _, recs = self.db.select_many(sql_data) - return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c']) - - except Exception as e: - return Result.error(message=str(e)) - - def get(self, **kwargs): - ''' - parameter kwargs: - id = [int] - return csst_dfs_common.models.Result - ''' - try: - id = get_parameter(kwargs, "id", -1) - r = self.db.select_one( - "select * from sls_level1_data where id=?", (id,)) - if r: - return Result.ok_data(data=Level1Record().from_dict(r)) - else: - return Result.error(message=f"id:{id} not found") - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - existed = self.db.exists( - "select * from sls_level1_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) - self.db.execute( - 'update sls_level1_data set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def update_qc1_status(self, **kwargs): - ''' update the status of QC1 - - parameter kwargs: - id : [int], - status : [int] - ''' - fits_id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - try: - existed = self.db.exists( - "select * from sls_level1_data where id=?", - (fits_id,) - ) - if not existed: - log.warning('%s not found' %(fits_id, )) - return Result.error(message ='%s not found' %(fits_id, )) - self.db.execute( - 'update sls_level1_data set qc1_status=?, qc1_time=? where id=?', - (status, format_time_ms(time.time()), fits_id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a level1 record into database - - parameter kwargs: - level0_id : [str] - data_type : [str] - prc_params : [str] - filename : [str] - file_path : [str] - prc_status : [int] - prc_time : [str] - pipeline_id : [str] - refs: [dict] - return csst_dfs_common.models.Result - ''' - try: - rec = Level1Record( - id = 0, - level0_id = get_parameter(kwargs, "level0_id"), - data_type = get_parameter(kwargs, "data_type"), - prc_params = get_parameter(kwargs, "prc_params"), - filename = get_parameter(kwargs, "filename"), - file_path = get_parameter(kwargs, "file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - refs = get_parameter(kwargs, "refs", {}) - ) - existed = self.db.exists( - "select * from sls_level1_data where filename=?", - (rec.filename,) - ) - if existed: - log.error(f'{rec.filename} has already been existed') - return Result.error(message=f'{rec.filename} has already been existed') - - self.db.execute( - 'INSERT INTO sls_level1_data (level0_id,data_type,prc_params,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \ - VALUES(?,?,?,?,?,?,?,?,?,?)', - (rec.level0_id, rec.data_type, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) - ) - self.db.end() - rec.id = self.db.last_row_id() - - if rec.refs.items(): - sql_refs = "insert into sls_level1_ref (level1_id,ref_type,cal_id) values " - values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()] - _ = self.db.execute(sql_refs + ",".join(values)) - - self.db.end() - return Result.ok_data(data=rec) - except Exception as e: - log.error(e) - return Result.error(message=str(e)) \ No newline at end of file diff --git a/csst_dfs_api_local/sls/level1prc.py b/csst_dfs_api_local/sls/level1prc.py deleted file mode 100644 index ad8ead360ecde77db10b5ff7fcee5d329f3e7ff8..0000000000000000000000000000000000000000 --- a/csst_dfs_api_local/sls/level1prc.py +++ /dev/null @@ -1,124 +0,0 @@ -import os -import logging -import time, datetime -import shutil - -from ..common.db import DBClient -from ..common.utils import * -from csst_dfs_commons.models import Result -from csst_dfs_commons.models.sls import Level1PrcRecord -from csst_dfs_commons.models.common import from_dict_list - -log = logging.getLogger('csst') - -class Level1PrcApi(object): - def __init__(self, sub_system = "sls"): - self.sub_system = sub_system - self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") - self.db = DBClient() - - def find(self, **kwargs): - ''' retrieve level1 procedure records from database - - parameter kwargs: - level1_id: [int] - pipeline_id: [str] - prc_module: [str] - prc_status : [int] - - return: csst_dfs_common.models.Result - ''' - try: - level1_id = get_parameter(kwargs, "level1_id", 0) - pipeline_id = get_parameter(kwargs, "pipeline_id") - prc_module = get_parameter(kwargs, "prc_module") - prc_status = get_parameter(kwargs, "prc_status") - - sql_data = f"select * from sls_level1_prc" - - sql_condition = f"where level1_id={level1_id}" - if pipeline_id: - sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'" - if prc_module: - sql_condition = sql_condition + " and prc_module ='" + prc_module + "'" - if prc_status: - sql_condition = f"{sql_condition} and prc_status={prc_status}" - - sql_data = f"{sql_data} {sql_condition}" - - _, records = self.db.select_many(sql_data) - return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records)) - - except Exception as e: - return Result.error(message=str(e)) - - def update_proc_status(self, **kwargs): - ''' update the status of reduction - - parameter kwargs: - id : [int], - status : [int] - - return csst_dfs_common.models.Result - ''' - id = get_parameter(kwargs, "id") - status = get_parameter(kwargs, "status") - - try: - existed = self.db.exists( - "select * from sls_level1_prc where id=?", - (id,) - ) - if not existed: - log.warning('%s not found' %(id, )) - return Result.error(message ='%s not found' %(id, )) - self.db.execute( - 'update sls_level1_prc set prc_status=?, prc_time=? where id=?', - (status, format_time_ms(time.time()), id) - ) - self.db.end() - return Result.ok_data() - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - - def write(self, **kwargs): - ''' insert a level1 procedure record into database - - parameter kwargs: - level1_id : [int] - pipeline_id : [str] - prc_module : [str] - params_file_path : [str] - prc_status : [int] - prc_time : [str] - result_file_path : [str] - return csst_dfs_common.models.Result - ''' - - rec = Level1PrcRecord( - id = 0, - level1_id = get_parameter(kwargs, "level1_id", 0), - pipeline_id = get_parameter(kwargs, "pipeline_id"), - prc_module = get_parameter(kwargs, "prc_module"), - params_file_path = get_parameter(kwargs, "params_file_path"), - prc_status = get_parameter(kwargs, "prc_status", -1), - prc_time = get_parameter(kwargs, "prc_time"), - result_file_path = get_parameter(kwargs, "result_file_path") - ) - try: - self.db.execute( - 'INSERT INTO sls_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ - VALUES(?,?,?,?,?,?,?)', - (rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) - ) - self.db.end() - rec.id = self.db.last_row_id() - - return Result.ok_data(data=rec) - - except Exception as e: - log.error(e) - return Result.error(message=str(e)) - diff --git a/requirements.txt b/requirements.txt index ac7c2900d9d64ab5468d10a454ed6e8cb60900b8..260ec3ec90d1a4498c65f3743f39349cf0cc8354 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ DBUtils==1.3 -astropy>=4.0 -git+https://github.com/astronomical-data-processing/csst-dfs-commons.git \ No newline at end of file +astropy>=4.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index dd3b200eeff3cc02e900b7eee7aad762b6e8d054..3b0228af07c69c26ccc429492d6c54a580d09d80 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,14 +21,9 @@ python_requires = >=3.7 zip_safe = False setup_requires = setuptools_scm install_requires = - astropy>=4.0 DBUtils==1.3 [options.package_data] csst_dfs_api_local.common = *.sql [options.entry_points] console_scripts = - csst-msc-ingest-local = csst_dfs_api_local.msc.ingest:ingest - csst-ifs-ingest-local = csst_dfs_api_local.ifs.ingest:ingest - csst-mci-ingest-local = csst_dfs_api_local.mci.ingest:ingest - csst-sls-ingest-local = csst_dfs_api_local.sls.ingest:ingest - csst-cpic-ingest-local = csst_dfs_api_local.cpic.ingest:ingest \ No newline at end of file + csst-dfs-ingest-local = csst_dfs_api_local.facility.ingest:ingest \ No newline at end of file diff --git a/tests/test_mci_level0.py b/tests/test_facility_level0.py similarity index 90% rename from tests/test_mci_level0.py rename to tests/test_facility_level0.py index 6584c6ac0adf6eae4472a0e016829141d42aaddb..79b7bd41aae2bb079ac1d88efd594d4c0a88e402 100644 --- a/tests/test_mci_level0.py +++ b/tests/test_facility_level0.py @@ -1,8 +1,8 @@ import unittest -from csst_dfs_api_local.mci import Level0DataApi +from csst_dfs_api_local.facility import Level0DataApi -class MCILevel0DataApiTestCase(unittest.TestCase): +class Level0DataApiTestCase(unittest.TestCase): def setUp(self): self.api = Level0DataApi() diff --git a/tests/test_mci_level0_prc.py b/tests/test_facility_level0_prc.py similarity index 83% rename from tests/test_mci_level0_prc.py rename to tests/test_facility_level0_prc.py index 3a926d51f1a688b21ebad0541dcf657aff665add..eee9cc58a30166cf3469d5e8d1b9465d24667b54 100644 --- a/tests/test_mci_level0_prc.py +++ b/tests/test_facility_level0_prc.py @@ -2,9 +2,9 @@ import os import unittest from astropy.io import fits -from csst_dfs_api_local.mci.level0prc import Level0PrcApi +from csst_dfs_api_local.facility.level0prc import Level0PrcApi -class MCILevel0PrcTestCase(unittest.TestCase): +class Level0PrcTestCase(unittest.TestCase): def setUp(self): self.api = Level0PrcApi() diff --git a/tests/test_msc_level1.py b/tests/test_facility_level1.py similarity index 89% rename from tests/test_msc_level1.py rename to tests/test_facility_level1.py index ab0cbcf0fbab5235e220e3d9c0ef833ba7b3340f..9682b6804338e2965342216e7d24ad90b6826ed1 100644 --- a/tests/test_msc_level1.py +++ b/tests/test_facility_level1.py @@ -1,8 +1,8 @@ import unittest -from csst_dfs_api_local.msc import Level1DataApi +from csst_dfs_api_local.facility import Level1DataApi -class MSCLevel1DataTestCase(unittest.TestCase): +class Level1DataTestCase(unittest.TestCase): def setUp(self): self.api = Level1DataApi() diff --git a/tests/test_msc_level1_prc.py b/tests/test_facility_level1_prc.py similarity index 83% rename from tests/test_msc_level1_prc.py rename to tests/test_facility_level1_prc.py index 69b3300a5a1505cf00d908e291a4b781b6179305..299497d66d06a2177d7ba05bf1927ac02edcb304 100644 --- a/tests/test_msc_level1_prc.py +++ b/tests/test_facility_level1_prc.py @@ -2,9 +2,9 @@ import os import unittest from astropy.io import fits -from csst_dfs_api_local.msc.level1prc import Level1PrcApi +from csst_dfs_api_local.facility.level1prc import Level1PrcApi -class MSCLevel1PrcTestCase(unittest.TestCase): +class Level1PrcTestCase(unittest.TestCase): def setUp(self): self.api = Level1PrcApi() diff --git a/tests/test_ifs_cal_merge.py b/tests/test_ifs_cal_merge.py deleted file mode 100644 index 623a53df54e5c799d6a59cd90425a8f4b4dcf889..0000000000000000000000000000000000000000 --- a/tests/test_ifs_cal_merge.py +++ /dev/null @@ -1,46 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.ifs.calmerge import CalMergeApi - -class IFSCalMergeApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = CalMergeApi() - - def test_find(self): - recs = self.api.find(detector_no='01', - ref_type = "bias", - obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) - print('find:', recs) - - def test_get_latest_by_l0(self): - rec = self.api.get_latest_by_l0(level0_id='000001301', ref_type = "bias") - print('get:', rec) - - def test_get(self): - rec = self.api.get(cal_id='0000231') - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 100, status = 1) - print('update_proc_status:', rec) - - def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 100, status = 2) - print('update_qc1_status:', rec) - - def test_write(self): - rec = self.api.write( - cal_id='0000231', - detector_no='01', - ref_type = "bias", - obs_time = "2021-06-04 11:12:13", - exp_time = 150, - filename = "/opt/dddasd.params", - file_path = "/opt/dddasd.fits", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - level0_ids = ['0000231','0000232','0000233','0000234']) - print('write:', rec) \ No newline at end of file diff --git a/tests/test_ifs_ingest.py b/tests/test_ifs_ingest.py index cdec48b84c280cd22a64b846dff73eff92d75551..22d1d538ab0a330510a82a3d4d9d1cadf1befc89 100644 --- a/tests/test_ifs_ingest.py +++ b/tests/test_ifs_ingest.py @@ -1,6 +1,6 @@ import unittest -from csst_dfs_api_local.ifs.ingest import ingest, ingesst_one +from csst_dfs_api_local.facility.ingest import ingest, ingesst_one class IFSLevel0DataApiTestCase(unittest.TestCase): diff --git a/tests/test_ifs_level0.py b/tests/test_ifs_level0.py deleted file mode 100644 index dc646fe6ddf26391767edbd8d9577d5ede8e4caa..0000000000000000000000000000000000000000 --- a/tests/test_ifs_level0.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest - -from csst_dfs_api_local.ifs import Level0DataApi - -class IFSLevel0DataApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level0DataApi() - - def test_find(self): - recs = self.api.find(obs_id = '300000055', obs_type = 'sky', limit = 0) - print('find:', recs) - - def test_get(self): - rec = self.api.get(id = 31) - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 31, status = 6) - print('update_proc_status:', rec) - - def test_update_qc0_status(self): - rec = self.api.update_qc0_status(id = 31, status = 7) - print('update_qc0_status:', rec) - - def test_write(self): - rec = self.api.write( - obs_id = '0000013', - detector_no = "01", - obs_type = "sky", - obs_time = "2021-06-06 11:12:13", - exp_time = 150, - detector_status_id = 3, - filename = "MSC_00001234", - file_path = "/opt/MSC_00001234.fits") - print('write:', rec) diff --git a/tests/test_ifs_level0_prc.py b/tests/test_ifs_level0_prc.py deleted file mode 100644 index 929afca8557ec78fe2b988d892d8f7d3ba090637..0000000000000000000000000000000000000000 --- a/tests/test_ifs_level0_prc.py +++ /dev/null @@ -1,28 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.ifs.level0prc import Level0PrcApi - -class IFSLevel0PrcTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level0PrcApi() - - def test_find(self): - recs = self.api.find(level0_id='300000055CCD231-c4') - print('find:', recs) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_write(self): - rec = self.api.write(level0_id='300000055CCD231-c4', - pipeline_id = "P1", - prc_module = "QC0", - params_file_path = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - result_file_path = "/opt/dddasd.header") - print('write:', rec) \ No newline at end of file diff --git a/tests/test_ifs_level1.py b/tests/test_ifs_level1.py deleted file mode 100644 index 532f6d01576f740c21768af2c773a583dc2ca2c8..0000000000000000000000000000000000000000 --- a/tests/test_ifs_level1.py +++ /dev/null @@ -1,41 +0,0 @@ -import unittest - -from csst_dfs_api_local.ifs import Level1DataApi - -class IFSLevel1DataApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level1DataApi() - - def test_find(self): - recs = self.api.find( - level0_id='0000223', - create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13") - ) - print('find:', recs) - - def test_get(self): - rec = self.api.get(id = 1) - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 1, status = 7) - print('update_qc1_status:', rec) - - def test_write(self): - rec = self.api.write( - level0_id='0000223', - data_type = "sci", - cor_sci_id = 2, - prc_params = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-05 11:12:13', - filename = "dddasd223234.fits", - file_path = "/opt/dddasd23.fits", - pipeline_id = "P2", - refs = {'dark': 1, 'bias': 2, 'flat': 3 }) - print('write:', rec) \ No newline at end of file diff --git a/tests/test_ifs_level1_prc.py b/tests/test_ifs_level1_prc.py deleted file mode 100644 index 30582d2db1b6ec643d7d39dd496a38bc450c2469..0000000000000000000000000000000000000000 --- a/tests/test_ifs_level1_prc.py +++ /dev/null @@ -1,28 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.ifs.level1prc import Level1PrcApi - -class IFSLevel1PrcTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level1PrcApi() - - def test_find(self): - recs = self.api.find(level1_id=1) - print('find:', recs) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_write(self): - rec = self.api.write(level1_id=1, - pipeline_id = "P1", - prc_module = "QC0", - params_file_path = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - result_file_path = "/opt/dddasd.header") - print('write:', rec) \ No newline at end of file diff --git a/tests/test_mci_cal_merge.py b/tests/test_mci_cal_merge.py deleted file mode 100644 index be8e44ae93bd492f91b611a9d6551ac01778b68b..0000000000000000000000000000000000000000 --- a/tests/test_mci_cal_merge.py +++ /dev/null @@ -1,46 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.mci.calmerge import CalMergeApi - -class MCICalMergeApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = CalMergeApi() - - def test_find(self): - recs = self.api.find(detector_no='01', - ref_type = "bias", - obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) - print('find:', recs) - - def test_get_latest_by_l0(self): - rec = self.api.get_latest_by_l0(level0_id='000001301', ref_type = "bias") - print('get:', rec) - - def test_get(self): - rec = self.api.get(cal_id='0000231') - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 100, status = 1) - print('update_proc_status:', rec) - - def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 100, status = 2) - print('update_qc1_status:', rec) - - def test_write(self): - rec = self.api.write( - cal_id='0000231', - detector_no='01', - ref_type = "bias", - obs_time = "2021-06-04 11:12:13", - exp_time = 150, - filename = "/opt/dddasd.params", - file_path = "/opt/dddasd.fits", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - level0_ids = ['0000231','0000232','0000233','0000234']) - print('write:', rec) \ No newline at end of file diff --git a/tests/test_mci_level1.py b/tests/test_mci_level1.py deleted file mode 100644 index ede8742dac1a04f44d3d135484af64ef5405d168..0000000000000000000000000000000000000000 --- a/tests/test_mci_level1.py +++ /dev/null @@ -1,41 +0,0 @@ -import unittest - -from csst_dfs_api_local.mci import Level1DataApi - -class MCILevel1DataApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level1DataApi() - - def test_find(self): - recs = self.api.find( - level0_id='0000223', - create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13") - ) - print('find:', recs) - - def test_get(self): - rec = self.api.get(id = 1) - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 1, status = 7) - print('update_qc1_status:', rec) - - def test_write(self): - rec = self.api.write( - level0_id='0000223', - data_type = "sci", - cor_sci_id = 2, - prc_params = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-05 11:12:13', - filename = "dddasd223234.fits", - file_path = "/opt/dddasd23.fits", - pipeline_id = "P2", - refs = {'dark': 1, 'bias': 2, 'flat': 3 }) - print('write:', rec) \ No newline at end of file diff --git a/tests/test_mci_level1_prc.py b/tests/test_mci_level1_prc.py deleted file mode 100644 index 5d8628bb515b00fd1240929a7452e854e8b0807f..0000000000000000000000000000000000000000 --- a/tests/test_mci_level1_prc.py +++ /dev/null @@ -1,28 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.mci.level1prc import Level1PrcApi - -class MCILevel1PrcTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level1PrcApi() - - def test_find(self): - recs = self.api.find(level1_id=1) - print('find:', recs) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_write(self): - rec = self.api.write(level1_id=1, - pipeline_id = "P1", - prc_module = "QC0", - params_file_path = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - result_file_path = "/opt/dddasd.header") - print('write:', rec) \ No newline at end of file diff --git a/tests/test_msc_cal_merge.py b/tests/test_msc_cal_merge.py deleted file mode 100644 index e1616f6d619b29df055ec0cf88ab4752ebf04640..0000000000000000000000000000000000000000 --- a/tests/test_msc_cal_merge.py +++ /dev/null @@ -1,46 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.msc.calmerge import CalMergeApi - -class MSCCalMergeApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = CalMergeApi() - - def test_find(self): - recs = self.api.find(detector_no='01', - ref_type = "bias", - obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) - print('find:', recs) - - def test_get_latest_by_l0(self): - rec = self.api.get_latest_by_l0(level0_id='000001301', ref_type = "bias") - print('get:', rec) - - def test_get(self): - rec = self.api.get(cal_id='0000231') - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 100, status = 1) - print('update_proc_status:', rec) - - def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 100, status = 2) - print('update_qc1_status:', rec) - - def test_write(self): - rec = self.api.write( - cal_id='0000231', - detector_no='01', - ref_type = "bias", - obs_time = "2021-06-04 11:12:13", - exp_time = 150, - filename = "/opt/dddasd.params", - file_path = "/opt/dddasd.fits", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - level0_ids = ['0000231','0000232','0000233','0000234']) - print('write:', rec) \ No newline at end of file diff --git a/tests/test_msc_level0.py b/tests/test_msc_level0.py deleted file mode 100644 index a5181dc762a9c06327f90dd0d783507ce2e3695c..0000000000000000000000000000000000000000 --- a/tests/test_msc_level0.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest - -from csst_dfs_api_local.msc import Level0DataApi - -class MSCLevel0DataApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level0DataApi() - - def test_find(self): - recs = self.api.find(obs_id = '300000055', obs_type = 'sky', limit = 0) - print('find:', recs) - - def test_get(self): - rec = self.api.get(id = 31) - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 31, status = 6) - print('update_proc_status:', rec) - - def test_update_qc0_status(self): - rec = self.api.update_qc0_status(id = 31, status = 7) - print('update_qc0_status:', rec) - - def test_write(self): - rec = self.api.write( - obs_id = '0000013', - detector_no = "01", - obs_type = "sky", - obs_time = "2021-06-06 11:12:13", - exp_time = 150, - detector_status_id = 3, - filename = "MSC_00001234", - file_path = "/opt/MSC_00001234.fits") - print('write:', rec) diff --git a/tests/test_msc_level0_prc.py b/tests/test_msc_level0_prc.py deleted file mode 100644 index d581f21e818cf82bfa6fb83f395524a886214b21..0000000000000000000000000000000000000000 --- a/tests/test_msc_level0_prc.py +++ /dev/null @@ -1,28 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.msc.level0prc import Level0PrcApi - -class MSCLevel0PrcTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level0PrcApi() - - def test_find(self): - recs = self.api.find(level0_id='300000055CCD231-c4') - print('find:', recs) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_write(self): - rec = self.api.write(level0_id='300000055CCD231-c4', - pipeline_id = "P1", - prc_module = "QC0", - params_file_path = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - result_file_path = "/opt/dddasd.header") - print('write:', rec) \ No newline at end of file diff --git a/tests/test_sls_cal_merge.py b/tests/test_sls_cal_merge.py deleted file mode 100644 index 4074daaa05905eb0002fd63a4c2e3c614e809ab5..0000000000000000000000000000000000000000 --- a/tests/test_sls_cal_merge.py +++ /dev/null @@ -1,46 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.sls.calmerge import CalMergeApi - -class SLSCalMergeApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = CalMergeApi() - - def test_find(self): - recs = self.api.find(detector_no='01', - ref_type = "bias", - obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")) - print('find:', recs) - - def test_get_latest_by_l0(self): - rec = self.api.get_latest_by_l0(level0_id='000001301', ref_type = "bias") - print('get:', rec) - - def test_get(self): - rec = self.api.get(cal_id='0000231') - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 100, status = 1) - print('update_proc_status:', rec) - - def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 100, status = 2) - print('update_qc1_status:', rec) - - def test_write(self): - rec = self.api.write( - cal_id='0000231', - detector_no='01', - ref_type = "bias", - obs_time = "2021-06-04 11:12:13", - exp_time = 150, - filename = "/opt/dddasd.params", - file_path = "/opt/dddasd.fits", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - level0_ids = ['0000231','0000232','0000233','0000234']) - print('write:', rec) \ No newline at end of file diff --git a/tests/test_sls_level0.py b/tests/test_sls_level0.py deleted file mode 100644 index 03658e468f61fbb5a7662ddff52a970d342db244..0000000000000000000000000000000000000000 --- a/tests/test_sls_level0.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest - -from csst_dfs_api_local.sls import Level0DataApi - -class SLSLevel0DataApiTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level0DataApi() - - def test_find(self): - recs = self.api.find(obs_id = '300000055', obs_type = 'sky', limit = 0) - print('find:', recs) - - def test_get(self): - rec = self.api.get(id = 31) - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 31, status = 6) - print('update_proc_status:', rec) - - def test_update_qc0_status(self): - rec = self.api.update_qc0_status(id = 31, status = 7) - print('update_qc0_status:', rec) - - def test_write(self): - rec = self.api.write( - obs_id = '0000013', - detector_no = "01", - obs_type = "sky", - obs_time = "2021-06-06 11:12:13", - exp_time = 150, - detector_status_id = 3, - filename = "MSC_00001234", - file_path = "/opt/MSC_00001234.fits") - print('write:', rec) diff --git a/tests/test_sls_level0_prc.py b/tests/test_sls_level0_prc.py deleted file mode 100644 index 4fe33a4c01ded3c162454330525abc47f11daaa8..0000000000000000000000000000000000000000 --- a/tests/test_sls_level0_prc.py +++ /dev/null @@ -1,28 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.sls.level0prc import Level0PrcApi - -class SLSLevel0PrcTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level0PrcApi() - - def test_find(self): - recs = self.api.find(level0_id='300000055CCD231-c4') - print('find:', recs) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_write(self): - rec = self.api.write(level0_id='300000055CCD231-c4', - pipeline_id = "P1", - prc_module = "QC0", - params_file_path = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - result_file_path = "/opt/dddasd.header") - print('write:', rec) \ No newline at end of file diff --git a/tests/test_sls_level1.py b/tests/test_sls_level1.py deleted file mode 100644 index cdaad8819f9711cd41f011163d0fbc5f008e53e1..0000000000000000000000000000000000000000 --- a/tests/test_sls_level1.py +++ /dev/null @@ -1,40 +0,0 @@ -import unittest - -from csst_dfs_api_local.sls import Level1DataApi - -class SLSResult1TestCase(unittest.TestCase): - - def setUp(self): - self.api = Level1DataApi() - - def test_find(self): - recs = self.api.find( - level0_id='0000223', - create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13") - ) - print('find:', recs) - - def test_get(self): - rec = self.api.get(id = 1) - print('get:', rec) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_update_qc1_status(self): - rec = self.api.update_qc1_status(id = 1, status = 7) - print('update_qc1_status:', rec) - - def test_write(self): - rec = self.api.write( - level0_id='0000223', - data_type = "sci", - prc_params = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-05 11:12:13', - filename = "dddasd223234.fits", - file_path = "/opt/dddasd23.fits", - pipeline_id = "P2", - refs = {'dark': 1, 'bias': 2, 'flat': 3 }) - print('write:', rec) \ No newline at end of file diff --git a/tests/test_sls_level1_prc.py b/tests/test_sls_level1_prc.py deleted file mode 100644 index 84a1f73c4959a3e4ea52bc08cac2075eca641f43..0000000000000000000000000000000000000000 --- a/tests/test_sls_level1_prc.py +++ /dev/null @@ -1,28 +0,0 @@ -import os -import unittest -from astropy.io import fits - -from csst_dfs_api_local.sls.level1prc import Level1PrcApi - -class SLSLevel1PrcTestCase(unittest.TestCase): - - def setUp(self): - self.api = Level1PrcApi() - - def test_find(self): - recs = self.api.find(level1_id=1) - print('find:', recs) - - def test_update_proc_status(self): - rec = self.api.update_proc_status(id = 1, status = 4) - print('update_proc_status:', rec) - - def test_write(self): - rec = self.api.write(level1_id=1, - pipeline_id = "P1", - prc_module = "QC0", - params_file_path = "/opt/dddasd.params", - prc_status = 3, - prc_time = '2021-06-04 11:12:13', - result_file_path = "/opt/dddasd.header") - print('write:', rec) \ No newline at end of file