calmerge.py 10.56 KiB
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import CalMergeRecord
from csst_dfs_commons.models.common import from_dict_list
from .level0 import Level0DataApi
log = logging.getLogger('csst')
class CalMergeApi(object):
    def __init__(self):
        self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
        self.db = DBClient()
        self.level0Api = Level0DataApi()
    def get_latest_by_l0(self, **kwargs):
        ''' retrieve calibration merge records from database by level0 data
        :param kwargs: Parameter dictionary, key items support:
            level0_id: [str],
            ref_type: [str]
        :returns: csst_dfs_common.models.Result
        '''
        try:
            level0_id = get_parameter(kwargs, "level0_id")
            ref_type = get_parameter(kwargs, "ref_type")
            level0_data = self.level0Api.get_by_level0_id(level0_id)
            if level0_data is None:
                return Result.error(message = "level0 data [%s]not found"%(level0_id))
            sql_data = f"select * from t_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
            r = self.db.select_one(sql_data)
            if r:
                rec = CalMergeRecord().from_dict(r)
                return Result.ok_data(data=rec)
            sql_data = f"select * from t_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time <= '{level0_data.data.obs_time}' order by obs_time DESC limit 1"
            r = self.db.select_one(sql_data)
            if r:
                rec = CalMergeRecord().from_dict(r)
                return Result.ok_data(data=rec)            
            return Result.error(message = "not found")
        except Exception as e:
            return Result.error(message=str(e))
    def find(self, **kwargs):
        ''' retrieve calibration merge records from database
        parameter kwargs:
            detector_no: [str]
            ref_type: [str]
            obs_time: (start,end)
            qc1_status : [int]
            prc_status : [int]
            file_name: [str]
            limit: limits returns the number of records,default 0:no-limit
        return: csst_dfs_common.models.Result
        '''
try: detector_no = get_parameter(kwargs, "detector_no") ref_type = get_parameter(kwargs, "ref_type") exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0] exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1] qc1_status = get_parameter(kwargs, "qc1_status") prc_status = get_parameter(kwargs, "prc_status") file_name = get_parameter(kwargs, "file_name") limit = get_parameter(kwargs, "limit", 0) sql_count = "select count(*) as c from t_cal_merge where 1=1" sql_data = f"select * from t_cal_merge where 1=1" sql_condition = "" if detector_no: sql_condition = f"{sql_condition} and detector_no='{detector_no}'" if ref_type: sql_condition = f"{sql_condition} and ref_type='{ref_type}'" if exp_time_start: sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'" if exp_time_end: sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'" if qc1_status: sql_condition = f"{sql_condition} and qc1_status={qc1_status}" if prc_status: sql_condition = f"{sql_condition} and prc_status={prc_status}" if file_name: sql_condition = f" and filename={file_name}" sql_count = f"{sql_count} {sql_condition}" sql_data = f"{sql_data} {sql_condition}" log.info(sql_count) log.info(sql_data) if limit > 0: sql_data = f"{sql_data} limit {limit}" totalCount = self.db.select_one(sql_count) _, records = self.db.select_many(sql_data) return Result.ok_data(data=from_dict_list(CalMergeRecord, records)).append("totalCount", totalCount['c']) except Exception as e: return Result.error(message=str(e)) def get(self, **kwargs): ''' fetch a record from database parameter kwargs: id : [int], cal_id : [str] return csst_dfs_common.models.Result ''' id = get_parameter(kwargs, "id", 0) cal_id = get_parameter(kwargs, "cal_id", "") if id == 0 and cal_id == "": return Result.error(message="at least define id or cal_id") if id != 0: return self.get_by_id(id) if cal_id != "": return self.get_by_cal_id(cal_id) def get_by_id(self, id: str): try: r = self.db.select_one(
"select * from t_cal_merge where id=?", (id,)) if r: sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" _, records = self.db.select_many(sql_get_level0_id) level0_ids = [r["level0_id"] for r in records] rec = CalMergeRecord().from_dict(r) rec.level0_ids = level0_ids return Result.ok_data(data=rec) else: return Result.error(message=f"id:{id} not found") except Exception as e: log.error(e) return Result.error(message=str(e)) def get_by_cal_id(self, cal_id: str): try: r = self.db.select_one( "select * from t_cal_merge where cal_id=?", (cal_id,)) if r: sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" _, records = self.db.select_many(sql_get_level0_id) level0_ids = [r["level0_id"] for r in records] rec = CalMergeRecord().from_dict(r) rec.level0_ids = level0_ids return Result.ok_data(data=rec) else: return Result.error(message=f"id:{id} not found") except Exception as e: log.error(e) return Result.error(message=str(e)) def update_qc1_status(self, **kwargs): ''' update the status of reduction parameter kwargs: id : [int], cal_id : [str], status : [int] return csst_dfs_common.models.Result ''' id = get_parameter(kwargs, "id", 0) cal_id = get_parameter(kwargs, "cal_id", "") result = self.get(id = id, cal_id = cal_id) if not result.success: return Result.error(message="not found") id = result.data.id status = get_parameter(kwargs, "status") try: self.db.execute( 'update t_cal_merge set qc1_status=?, qc1_time=? where id=?', (status, format_time_ms(time.time()), id) ) self.db.end() return Result.ok_data() except Exception as e: log.error(e) return Result.error(message=str(e)) def update_proc_status(self, **kwargs):
''' update the status of reduction parameter kwargs: id : [int], cal_id : [str], status : [int] return csst_dfs_common.models.Result ''' id = get_parameter(kwargs, "id", 0) cal_id = get_parameter(kwargs, "cal_id", "") result = self.get(id = id, cal_id = cal_id) if not result.success: return Result.error(message="not found") id = result.data.id status = get_parameter(kwargs, "status") try: existed = self.db.exists( "select * from t_cal_merge where id=?", (id,) ) if not existed: log.warning('%s not found' %(id, )) return Result.error(message ='%s not found' %(id, )) self.db.execute( 'update t_cal_merge set prc_status=?, prc_time=? where id=?', (status, format_time_ms(time.time()), id) ) self.db.end() return Result.ok_data() except Exception as e: log.error(e) return Result.error(message=str(e)) def write(self, **kwargs): ''' insert a calibration merge record into database parameter kwargs: cal_id : [str] detector_no : [str] ref_type : [str] obs_time : [str] exp_time : [float] prc_status : [int] prc_time : [str] filename : [str] file_path : [str] level0_ids : [list] return csst_dfs_common.models.Result ''' rec = CalMergeRecord( id = 0, cal_id = get_parameter(kwargs, "cal_id"), detector_no = get_parameter(kwargs, "detector_no"), ref_type = get_parameter(kwargs, "ref_type"), obs_time = get_parameter(kwargs, "obs_time"), exp_time = get_parameter(kwargs, "exp_time"), filename = get_parameter(kwargs, "filename"), file_path = get_parameter(kwargs, "file_path"), prc_status = get_parameter(kwargs, "prc_status", -1), prc_time = get_parameter(kwargs, "prc_time"), level0_ids = get_parameter(kwargs, "level0_ids", []) ) try: self.db.execute(
'INSERT INTO t_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \ VALUES(?,?,?,?,?,?,?,?,?,?)', (rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time())) ) self.db.end() rec.id = self.db.last_row_id() sql_level0_ids = "insert into t_cal2level0 (merge_id,level0_id) values " values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids] _ = self.db.execute(sql_level0_ids + ",".join(values)) self.db.end() return Result.ok_data(data=rec) except Exception as e: log.error(e) return Result.error(message=str(e))