import os import logging import time, datetime from astropy.io import fits import shutil from traceback import print_stack from ..common.db import DBClient from ..common.utils import * from csst_dfs_commons.models import Result from csst_dfs_commons.models.common import from_dict_list from csst_dfs_commons.models.msc import Level2Record, Level2CatalogRecord from csst_dfs_commons.utils.fits import get_header_value log = logging.getLogger('csst') class Level2DataApi(object): def __init__(self, sub_system = "msc"): self.sub_system = sub_system self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") self.db = DBClient() def catalog_query(self, **kwargs): return Result.error(message = 'level2 catalog not support in the local mode' ) def find(self, **kwargs): ''' retrieve level2 records from database :param kwargs: Parameter dictionary, key items support: level0_id: [str] level1_id: [int] data_type: [str] create_time : (start, end), qc2_status : [int], prc_status : [int], filename: [str] limit: limits returns the number of records,default 0:no-limit :returns: csst_dfs_common.models.Result ''' try: level0_id = get_parameter(kwargs, "level0_id") level1_id = get_parameter(kwargs, "level1_id") data_type = get_parameter(kwargs, "data_type") create_time_start = get_parameter(kwargs, "create_time", [None, None])[0] create_time_end = get_parameter(kwargs, "create_time", [None, None])[1] qc2_status = get_parameter(kwargs, "qc1_status") prc_status = get_parameter(kwargs, "prc_status") filename = get_parameter(kwargs, "filename") limit = get_parameter(kwargs, "limit", 0) sql_count = "select count(*) as c from msc_level2_data where 1=1" sql_data = f"select * from msc_level2_data where 1=1" sql_condition = "" if level0_id: sql_condition = f"{sql_condition} and level0_id='{level0_id}'" if level1_id: sql_condition = f"{sql_condition} and level1_id={level1_id}" if data_type: sql_condition = f"{sql_condition} and data_type='{data_type}'" if create_time_start: sql_condition = f"{sql_condition} and create_time >='{create_time_start}'" if create_time_end: sql_condition = f"{sql_condition} and create_time <='{create_time_end}'" if qc2_status: sql_condition = f"{sql_condition} and qc2_status={qc2_status}" if prc_status: sql_condition = f"{sql_condition} and prc_status={prc_status}" if filename: sql_condition = f" and filename='{filename}'" sql_count = f"{sql_count} {sql_condition}" sql_data = f"{sql_data} {sql_condition}" if limit > 0: sql_data = f"{sql_data} limit {limit}" totalCount = self.db.select_one(sql_count) _, recs = self.db.select_many(sql_data) return Result.ok_data(data=from_dict_list(Level2Record, recs)).append("totalCount", totalCount['c']) except Exception as e: return Result.error(message=str(e)) def get(self, **kwargs): ''' parameter kwargs: id = [int] return csst_dfs_common.models.Result ''' try: the_id = get_parameter(kwargs, "id", -1) r = self.db.select_one( "select * from msc_level2_data where id=?", (the_id,)) if r: return Result.ok_data(data=Level2Record().from_dict(r)) else: return Result.error(message=f"id:{the_id} not found") except Exception as e: log.error(e) return Result.error(message=str(e)) def update_proc_status(self, **kwargs): ''' update the status of reduction parameter kwargs: id : [int], status : [int] return csst_dfs_common.models.Result ''' fits_id = get_parameter(kwargs, "id") status = get_parameter(kwargs, "status") try: existed = self.db.exists( "select * from msc_level2_data where id=?", (fits_id,) ) if not existed: log.warning('%s not found' %(fits_id, )) return Result.error(message ='%s not found' %(fits_id, )) self.db.execute( 'update msc_level2_data set prc_status=?, prc_time=? where id=?', (status, format_time_ms(time.time()), fits_id) ) self.db.end() return Result.ok_data() except Exception as e: log.error(e) return Result.error(message=str(e)) def update_qc2_status(self, **kwargs): ''' update the status of QC2 parameter kwargs: id : [int], status : [int] ''' fits_id = get_parameter(kwargs, "id") status = get_parameter(kwargs, "status") try: existed = self.db.exists( "select * from msc_level2_data where id=?", (fits_id,) ) if not existed: log.warning('%s not found' %(fits_id, )) return Result.error(message ='%s not found' %(fits_id, )) self.db.execute( 'update msc_level2_data set qc2_status=?, qc2_time=? where id=?', (status, format_time_ms(time.time()), fits_id) ) self.db.end() return Result.ok_data() except Exception as e: log.error(e) return Result.error(message=str(e)) def write(self, **kwargs): ''' insert a level2 record into database parameter kwargs: level0_id: [str] level1_id: [int] data_type : [str] filename : [str] file_path : [str] obs_time : [str] prc_status : [int] prc_time : [str] return csst_dfs_common.models.Result ''' try: rec = Level2Record( id = 0, level1_id = get_parameter(kwargs, "level1_id"), data_type = get_parameter(kwargs, "data_type"), filename = get_parameter(kwargs, "filename"), file_path = get_parameter(kwargs, "file_path"), prc_status = get_parameter(kwargs, "prc_status", -1), prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), pipeline_id = get_parameter(kwargs, "pipeline_id", "") ) existed = self.db.exists( "select * from msc_level2_data where filename=?", (rec.filename,) ) if existed: log.error(f'{rec.filename} has already been existed') return Result.error(message=f'{rec.filename} has already been existed') hdul = fits.open(rec.file_path) header = hdul[0].header obs_id = get_header_value("OBSID",header, "") obs_id_str = "%09d" % (obs_id) if isinstance(obs_id, int) else obs_id detector = get_header_value("DETECTOR", header, "") if len(detector) > 2: detector = detector[-2:] obs_time = f"{get_header_value('DATE-OBS', header, '')} {get_header_value('TIME-OBS', header, '')}" level0_id = f"{obs_id_str}{detector}" if not rec.filename: rec.filename = os.path.basename(rec.file_path) self.db.execute( 'INSERT INTO msc_level2_data (level0_id, level1_id,data_type,filename,file_path,obs_time,qc2_status,prc_status,prc_time,create_time,pipeline_id) \ VALUES(?,?,?,?,?,?,?,?,?,?,?)', (level0_id, rec.level1_id, rec.data_type, rec.filename, rec.file_path, obs_time, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id) ) self.db.end() rec.id = self.db.last_row_id() return Result.ok_data(data=rec) except Exception as e: log.error(e) return Result.error(message=str(e))