Commit 5402e595 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

cpic and bug

parent 132c1fec
...@@ -203,8 +203,6 @@ create table msc_level0_data ...@@ -203,8 +203,6 @@ create table msc_level0_data
create table msc_level0_header create table msc_level0_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -247,8 +245,6 @@ create table msc_level1_ref ( ...@@ -247,8 +245,6 @@ create table msc_level1_ref (
create table msc_level1_header create table msc_level1_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -264,8 +260,6 @@ create table msc_cal2level0 ...@@ -264,8 +260,6 @@ create table msc_cal2level0
create table msc_cal_header create table msc_cal_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -370,8 +364,6 @@ create table ifs_cal2level0 ...@@ -370,8 +364,6 @@ create table ifs_cal2level0
create table ifs_cal_header create table ifs_cal_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -422,8 +414,6 @@ create table ifs_level1_ref ( ...@@ -422,8 +414,6 @@ create table ifs_level1_ref (
create table ifs_level1_header create table ifs_level1_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -541,8 +531,6 @@ create table mci_level1_ref ( ...@@ -541,8 +531,6 @@ create table mci_level1_ref (
create table mci_level1_header create table mci_level1_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -582,8 +570,6 @@ create table sls_level0_data ...@@ -582,8 +570,6 @@ create table sls_level0_data
create table sls_level0_header create table sls_level0_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -611,8 +597,6 @@ create table sls_cal2level0 ...@@ -611,8 +597,6 @@ create table sls_cal2level0
create table sls_cal_header create table sls_cal_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -662,8 +646,6 @@ create table sls_level1_ref ( ...@@ -662,8 +646,6 @@ create table sls_level1_ref (
create table sls_level1_header create table sls_level1_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime,
exp_time float,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
...@@ -700,218 +682,124 @@ create table sls_level2_spectra ...@@ -700,218 +682,124 @@ create table sls_level2_spectra
create table sls_level2_spectra_header create table sls_level2_spectra_header
( (
id int(20) not null, id int(20) not null,
ra float,
"dec" float,
create_time datetime,
primary key (id)
);
/*===========================cpic===================================*/
create table cpic_level0_data
(
id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null,
obs_id varchar(10) not null,
detector_no varchar(10) not null,
obs_type varchar(16),
obs_time datetime, obs_time datetime,
exp_time float, exp_time float,
detector_status_id int(20),
filename varchar(128),
file_path varchar(256),
qc0_status tinyint(1),
qc0_time datetime,
prc_status tinyint(1),
prc_time datetime,
create_time datetime
);
create table cpic_level0_header
(
id int(20) not null,
ra float, ra float,
"dec" float, "dec" float,
create_time datetime, create_time datetime,
primary key (id) primary key (id)
); );
/*------------------------------------------------*/ create table cpic_level0_prc
CREATE TABLE msc_level2_catalog ( (
seq int(20), id integer PRIMARY KEY autoincrement,
flux_aper_1 double, level0_id varchar(20) not null,
flux_aper_2 double, pipeline_id varchar(64) not null,
flux_aper_3 double, prc_module varchar(32) not null,
flux_aper_4 double, params_file_path varchar(256),
flux_aper_5 double, prc_status int(2),
flux_aper_6 double, prc_time datetime,
flux_aper_7 double, result_file_path varchar(256)
flux_aper_8 double, );
flux_aper_9 double, create table cpic_cal2level0
flux_aper_10 double, (
flux_aper_11 double, merge_id int(20) not null,
flux_aper_12 double, level0_id varchar(20) not null,
fluxerr_aper_1 double, primary key (merge_id, level0_id)
fluxerr_aper_2 double, );
fluxerr_aper_3 double,
fluxerr_aper_4 double, create table cpic_cal_header
fluxerr_aper_5 double, (
fluxerr_aper_6 double, id int(20) not null,
fluxerr_aper_7 double, ra float,
fluxerr_aper_8 double, "dec" float,
fluxerr_aper_9 double, create_time datetime,
fluxerr_aper_10 double, primary key (id)
fluxerr_aper_11 double, );
fluxerr_aper_12 double,
mag_aper_1 double, create table cpic_cal_merge
mag_aper_2 double, (
mag_aper_3 double, id integer PRIMARY KEY autoincrement,
mag_aper_4 double, cal_id varchar(20) not null,
mag_aper_5 double, detector_no varchar(10) not null,
mag_aper_6 double, ref_type varchar(16),
mag_aper_7 double, obs_time datetime,
mag_aper_8 double, exp_time float,
mag_aper_9 double, filename varchar(128),
mag_aper_10 double, file_path varchar(256),
mag_aper_11 double, qc1_status tinyint(1),
mag_aper_12 double, qc1_time datetime,
magerr_aper_1 double, prc_status tinyint(1),
magerr_aper_2 double, prc_time datetime,
magerr_aper_3 double, create_time datetime
magerr_aper_4 double, );
magerr_aper_5 double,
magerr_aper_6 double, create table cpic_level1_data
magerr_aper_7 double, (
magerr_aper_8 double, id integer PRIMARY KEY autoincrement,
magerr_aper_9 double, level0_id varchar(20) not null,
magerr_aper_10 double, data_type varchar(64) not null,
magerr_aper_11 double, prc_params varchar(1024),
magerr_aper_12 double, filename varchar(128),
flux_auto double, file_path varchar(256),
fluxerr_auto double, prc_status tinyint(1),
mag_auto double, prc_time datetime,
magerr_auto double, qc1_status tinyint(1),
kron_radius double, qc1_time datetime,
background double, create_time datetime,
x_image double, pipeline_id varchar(60)
y_image double, );
alpha_j2000 double,
delta_j2000 double, create table cpic_level1_ref (
a_image double, level1_id int(20) not null,
b_image double, ref_type varchar(64) not null,
theta_image double, cal_id int(20) not null,
a_world double, primary key (level1_id, ref_type)
b_world double, );
theta_world double,
theta_j2000 double, create table cpic_level1_header
errx2_image double, (
erry2_image double, id int(20) not null,
erra_image double, ra float,
errb_image double, "dec" float,
errtheta_image double, create_time datetime,
erra_world double, primary key (id)
errb_world double, );
errtheta_world double,
errtheta_j2000 double, create table cpic_level1_prc
xwin_image double, (
ywin_image double, id integer PRIMARY KEY autoincrement,
alphawin_j2000 double, level1_id int(20) not null,
deltawin_j2000 double, pipeline_id varchar(64) not null,
errx2win_image double, prc_module varchar(32) not null,
erry2win_image double, params_file_path varchar(256),
flags int(20), prc_status int(2),
flags_weight int(20), prc_time datetime,
imaflags_iso double, result_file_path varchar(256)
nimaflags_iso double, );
fwhm_image double, \ No newline at end of file
fwhm_world double,
elongation double,
ellipticity double,
class_star double,
flux_radius double,
fwhmpsf_image double,
fwhmpsf_world double,
xpsf_image double,
ypsf_image double,
alphapsf_j2000 double,
deltapsf_j2000 double,
flux_psf double,
fluxerr_psf double,
mag_psf double,
magerr_psf double,
niter_psf int(20),
chi2_psf double,
errx2psf_image double,
erry2psf_image double,
chi2_model double,
flags_model tinyint(1),
niter_model int(20),
flux_model double,
fluxerr_model double,
mag_model double,
magerr_model double,
flux_hybrid double,
fluxerr_hybrid double,
mag_hybrid double,
magerr_hybrid double,
flux_max_model double,
mu_max_model double,
flux_eff_model double,
mu_eff_model double,
flux_mean_model double,
mu_mean_model double,
xmodel_image double,
ymodel_image double,
alphamodel_j2000 double,
deltamodel_j2000 double,
erry2model_image double,
erramodel_image double,
errbmodel_image double,
errthetamodel_image double,
erramodel_world double,
errbmodel_world double,
errthetamodel_world double,
errthetamodel_j2000 double,
amodel_image double,
bmodel_image double,
thetamodel_image double,
amodel_world double,
bmodel_world double,
thetamodel_world double,
thetamodel_j2000 double,
spread_model double,
spreaderr_model double,
noisearea_model double,
flux_spheroid double,
fluxerr_spheroid double,
mag_spheroid double,
magerr_spheroid double,
flux_max_spheroid double,
mu_max_spheroid double,
flux_eff_spheroid double,
mu_eff_spheroid double,
flux_mean_spheroid double,
mu_mean_spheroid double,
fluxratio_spheroid double,
fluxratioerr_spheroid double,
spheroid_reff_image double,
spheroid_refferr_image double,
spheroid_reff_world double,
spheroid_refferr_world double,
spheroid_aspect_image double,
spheroid_aspecterr_image double,
spheroid_aspect_world double,
spheroid_aspecterr_world double,
spheroid_theta_image double,
spheroid_thetaerr_image double,
spheroid_theta_world double,
spheroid_thetaerr_world double,
spheroid_theta_j2000 double,
spheroid_sersicn double,
spheroid_sersicnerr double,
flux_disk double,
fluxerr_disk double,
mag_disk double,
magerr_disk double,
flux_max_disk double,
mu_max_disk double,
flux_eff_disk double,
mu_eff_disk double,
flux_mean_disk double,
mu_mean_disk double,
fluxratio_disk double,
fluxratioerr_disk double,
disk_scale_image double,
disk_scaleerr_image double,
disk_scale_world double,
disk_scaleerr_world double,
disk_aspect_image double,
disk_aspecterr_image double,
disk_aspect_world double,
disk_aspecterr_world double,
disk_inclination double,
disk_inclinationerr double,
disk_theta_image double,
disk_thetaerr_image double,
disk_theta_world double,
disk_thetaerr_world double,
disk_theta_j2000 double,
level2_id integer,
NS8HIdx integer,
NS16HIdx integer,
NS32HIdx integer,
NS64HIdx integer,
create_time datetime
) ;
\ No newline at end of file
from .calmerge import CalMergeApi
from .level0 import Level0DataApi
from .level0prc import Level0PrcApi
from .level1 import Level1DataApi
from .level1prc import Level1PrcApi
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.cpic import CalMergeRecord
from csst_dfs_commons.models.common import from_dict_list
from .level0 import Level0DataApi
log = logging.getLogger('csst')
class CalMergeApi(object):
def __init__(self, sub_system = "cpic"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
self.level0Api = Level0DataApi()
def get_latest_by_l0(self, **kwargs):
''' retrieve calibration merge records from database by level0 data
:param kwargs: Parameter dictionary, key items support:
level0_id: [str],
ref_type: [str]
:returns: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
ref_type = get_parameter(kwargs, "ref_type")
level0_data = self.level0Api.get_by_level0_id(level0_id)
if level0_data.data is None:
return Result.error(message = "level0 data [%s]not found"%(level0_id))
sql_data = f"select * from cpic_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
r = self.db.select_one(sql_data)
if r:
rec = CalMergeRecord().from_dict(r)
return Result.ok_data(data=rec)
sql_data = f"select * from cpic_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time <= '{level0_data.data.obs_time}' order by obs_time DESC limit 1"
r = self.db.select_one(sql_data)
if r:
rec = CalMergeRecord().from_dict(r)
return Result.ok_data(data=rec)
return Result.error(message = "not found")
except Exception as e:
return Result.error(message=str(e))
def find(self, **kwargs):
''' retrieve calibration merge records from database
parameter kwargs:
detector_no: [str]
ref_type: [str]
obs_time: (start,end)
qc1_status : [int]
prc_status : [int]
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
detector_no = get_parameter(kwargs, "detector_no")
ref_type = get_parameter(kwargs, "ref_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from cpic_cal_merge where 1=1"
sql_data = f"select * from cpic_cal_merge where 1=1"
sql_condition = ""
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if ref_type:
sql_condition = f"{sql_condition} and ref_type='{ref_type}'"
if exp_time_start:
sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
if exp_time_end:
sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if file_name:
sql_condition = f" and filename={file_name}"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
log.info(sql_count)
log.info(sql_data)
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(CalMergeRecord, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int],
cal_id : [str]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
if id == 0 and cal_id == "":
return Result.error(message="at least define id or cal_id")
if id != 0:
return self.get_by_id(id)
if cal_id != "":
return self.get_by_cal_id(cal_id)
def get_by_id(self, id: str):
try:
r = self.db.select_one(
"select * from cpic_cal_merge where id=?", (id,))
if r:
sql_get_level0_id = f"select level0_id from cpic_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records]
rec = CalMergeRecord().from_dict(r)
rec.level0_ids = level0_ids
return Result.ok_data(data=rec)
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def get_by_cal_id(self, cal_id: str):
try:
r = self.db.select_one(
"select * from cpic_cal_merge where cal_id=?", (cal_id,))
if r:
sql_get_level0_id = f"select level0_id from cpic_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records]
rec = CalMergeRecord().from_dict(r)
rec.level0_ids = level0_ids
return Result.ok_data(data=rec)
else:
return Result.error(message=f"id:{cal_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
result = self.get(id = id, cal_id = cal_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update cpic_cal_merge set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
result = self.get(id = id, cal_id = cal_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from cpic_cal_merge where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update cpic_cal_merge set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a calibration merge record into database
parameter kwargs:
cal_id : [str]
detector_no : [str]
ref_type : [str]
obs_time : [str]
exp_time : [float]
prc_status : [int]
prc_time : [str]
filename : [str]
file_path : [str]
level0_ids : [list]
return csst_dfs_common.models.Result
'''
rec = CalMergeRecord(
id = 0,
cal_id = get_parameter(kwargs, "cal_id"),
detector_no = get_parameter(kwargs, "detector_no"),
ref_type = get_parameter(kwargs, "ref_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
level0_ids = get_parameter(kwargs, "level0_ids", [])
)
try:
self.db.execute(
'INSERT INTO cpic_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?)',
(rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
sql_level0_ids = "insert into cpic_cal2level0 (merge_id,level0_id) values "
values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids]
_ = self.db.execute(sql_level0_ids + ",".join(values))
self.db.end()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os, sys
import argparse
import logging
from astropy.io import fits
import datetime
import shutil
from csst_dfs_api_local.common.db import DBClient
log = logging.getLogger('csst-dfs-api-local')
def ingest():
db = DBClient()
parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files")
parser.add_argument('-i','--infile', dest="infile", help="a file or a directory")
parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="copy files after import")
args = parser.parse_args(sys.argv[1:])
import_root_dir = args.infile
if os.path.isfile(import_root_dir):
log.info(f"prepare import {import_root_dir}")
ingesst_one(import_root_dir, db, args.copyfiles)
if os.path.isdir(import_root_dir):
for (path, _, file_names) in os.walk(import_root_dir):
for filename in file_names:
if filename.find(".fits") > 0:
file_full_path = os.path.join(path, filename)
log.info(f"prepare import {file_full_path}")
ingesst_one(file_full_path, db, args.copyfiles)
db.close()
def ingesst_one(file_path, db, copyfiles):
dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
hdul = fits.open(file_path)
header = hdul[0].header
obs_id = header["OBSID"]
exp_start_time = f"{header['DATE-OBS']} {header['TIME-OBS']}"
exp_time = header['EXPTIME']
module_id = header["INSTRUME"]
obs_type = header["FILETYPE"]
qc0_status = -1
prc_status = -1
time_now = datetime.datetime.now()
create_time = time_now.strftime('%Y-%m-%d %H:%M:%S')
facility_status_id = 0
module_status_id = 0
existed = db.exists("select * from t_observation where obs_id=?", (obs_id,))
if not existed:
db.execute("insert into t_observation \
(obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?)",
(obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time))
db.end()
#level0
detector = header["DETECTOR"]
filename = header["FILENAME"]
existed = db.exists(
"select * from cpic_level0_data where filename=?",
(filename,)
)
if existed:
log.warning('%s has already been imported' %(file_path, ))
db.end()
return
detector_status_id = 0
file_full_path = file_path
if copyfiles:
file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
file_full_path = f"{file_dir}/{filename}.fits"
level0_id = f"{obs_id}{detector}"
c = db.execute("insert into cpic_level0_data \
(level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?,?,?)",
(level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time))
db.end()
level0_id_id = db.last_row_id()
#level0-header
ra_obj = header["RA_OBJ"]
dec_obj = header["DEC_OBJ"]
db.execute("delete from cpic_level0_header where id=?",(level0_id_id,))
db.execute("insert into cpic_level0_header \
(id, ra, `dec`, create_time) \
values (?,?,?,?)",
(level0_id_id, ra_obj, dec_obj, create_time))
if copyfiles:
#copy files
shutil.copyfile(file_path, file_full_path)
db.end()
print(f"{file_path} imported")
if __name__ == "__main__":
ingest()
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.cpic import Level0Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level0DataApi(object):
def __init__(self, sub_system = "cpic"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level0 records from database
parameter kwargs:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
obs_id = get_parameter(kwargs, "obs_id")
detector_no = get_parameter(kwargs, "detector_no")
obs_type = get_parameter(kwargs, "obs_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc0_status = get_parameter(kwargs, "qc0_status")
prc_status = get_parameter(kwargs, "prc_status")
file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from cpic_level0_data where 1=1"
sql_data = f"select * from cpic_level0_data where 1=1"
sql_condition = ""
if obs_id:
sql_condition = f"{sql_condition} and obs_id='{obs_id}'"
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if obs_type:
sql_condition = f"{sql_condition} and obs_type='{obs_type}'"
if exp_time_start:
sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
if exp_time_end:
sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
if qc0_status:
sql_condition = f"{sql_condition} and qc0_status={qc0_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if file_name:
sql_condition = f" and filename='{file_name}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level0Record, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int],
level0_id : [str]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
level0_id = get_parameter(kwargs, "level0_id", "")
if id == 0 and level0_id == "":
return Result.error(message="at least define id or level0_id")
if id != 0:
return self.get_by_id(id)
if level0_id != "":
return self.get_by_level0_id(level0_id)
def get_by_id(self, id: int):
try:
r = self.db.select_one(
"select * from cpic_level0_data where id=?", (id,))
if r:
return Result.ok_data(data=Level0Record().from_dict(r))
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def get_by_level0_id(self, level0_id: str):
try:
r = self.db.select_one(
"select * from cpic_level0_data where level0_id=?", (level0_id,))
if r:
return Result.ok_data(data=Level0Record().from_dict(r))
else:
return Result.error(message=f"level0_id:{level0_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
level0_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
level0_id = get_parameter(kwargs, "level0_id")
result = self.get(id = id, level0_id = level0_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update cpic_level0_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc0_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
id : [int],
level0_id : [str],
status : [int]
'''
id = get_parameter(kwargs, "id")
level0_id = get_parameter(kwargs, "level0_id")
result = self.get(id = id, level0_id = level0_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update cpic_level0_data set qc0_status=?, qc0_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level0 data record into database
parameter kwargs:
obs_id = [str]
detector_no = [str]
obs_type = [str]
obs_time = [str]
exp_time = [int]
detector_status_id = [int]
filename = [str]
file_path = [str]
return: csst_dfs_common.models.Result
'''
rec = Level0Record(
obs_id = get_parameter(kwargs, "obs_id"),
detector_no = get_parameter(kwargs, "detector_no"),
obs_type = get_parameter(kwargs, "obs_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
detector_status_id = get_parameter(kwargs, "detector_status_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path")
)
rec.level0_id = f"{rec.obs_id}{rec.detector_no}"
try:
existed = self.db.exists(
"select * from cpic_level0_data where filename=?",
(rec.filename,)
)
if existed:
log.warning('%s existed' %(rec.filename, ))
return Result.error(message ='%s existed' %(rec.filename, ))
self.db.execute(
'INSERT INTO cpic_level0_data (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,-1,-1,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.cpic import Level0PrcRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level0PrcApi(object):
def __init__(self, sub_system = "cpic"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level0 procedure records from database
parameter kwargs:
level0_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
pipeline_id = get_parameter(kwargs, "pipeline_id")
prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from cpic_level0_prc"
sql_condition = f"where level0_id='{level0_id}'"
if pipeline_id:
sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'"
if prc_module:
sql_condition = sql_condition + " and prc_module ='" + prc_module + "'"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level0PrcRecord, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from cpic_level0_prc where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update cpic_level0_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level0 procedure record into database
parameter kwargs:
level0_id : [str]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec = Level0PrcRecord(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_file_path = get_parameter(kwargs, "params_file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
result_file_path = get_parameter(kwargs, "result_file_path")
)
try:
self.db.execute(
'INSERT INTO cpic_level0_prc (level0_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)',
(rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.cpic import Level1Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1DataApi(object):
def __init__(self, sub_system = "cpic"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
level0_id: [str]
data_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
data_type = get_parameter(kwargs, "data_type")
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0]
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
filename = get_parameter(kwargs, "filename")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from cpic_level1_data where 1=1"
sql_data = f"select * from cpic_level1_data where 1=1"
sql_condition = ""
if level0_id:
sql_condition = f"{sql_condition} and level0_id='{level0_id}'"
if data_type:
sql_condition = f"{sql_condition} and data_type='{data_type}'"
if create_time_start:
sql_condition = f"{sql_condition} and create_time >='{create_time_start}'"
if create_time_end:
sql_condition = f"{sql_condition} and create_time <='{create_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if filename:
sql_condition = f" and filename='{filename}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
id = [int]
return csst_dfs_common.models.Result
'''
try:
id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from cpic_level1_data where id=?", (id,))
if r:
return Result.ok_data(data=Level1Record().from_dict(r))
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from cpic_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update cpic_level1_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of QC1
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from cpic_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update cpic_level1_data set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
level0_id : [str]
data_type : [str]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result
'''
try:
rec = Level1Record(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
data_type = get_parameter(kwargs, "data_type"),
prc_params = get_parameter(kwargs, "prc_params"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
refs = get_parameter(kwargs, "refs", {})
)
existed = self.db.exists(
"select * from cpic_level1_data where filename=?",
(rec.filename,)
)
if existed:
log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute(
'INSERT INTO cpic_level1_data (level0_id,data_type,prc_params,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.data_type, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,)
)
self.db.end()
rec.id = self.db.last_row_id()
if rec.refs.items():
sql_refs = "insert into cpic_level1_ref (level1_id,ref_type,cal_id) values "
values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()]
_ = self.db.execute(sql_refs + ",".join(values))
self.db.end()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.cpic import Level1PrcRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1PrcApi(object):
def __init__(self, sub_system = "cpic"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 procedure records from database
parameter kwargs:
level1_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
level1_id = get_parameter(kwargs, "level1_id", 0)
pipeline_id = get_parameter(kwargs, "pipeline_id")
prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from cpic_level1_prc"
sql_condition = f"where level1_id={level1_id}"
if pipeline_id:
sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'"
if prc_module:
sql_condition = sql_condition + " and prc_module ='" + prc_module + "'"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from cpic_level1_prc where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update cpic_level1_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 procedure record into database
parameter kwargs:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec = Level1PrcRecord(
id = 0,
level1_id = get_parameter(kwargs, "level1_id", 0),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_file_path = get_parameter(kwargs, "params_file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
result_file_path = get_parameter(kwargs, "result_file_path")
)
try:
self.db.execute(
'INSERT INTO cpic_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)',
(rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
...@@ -33,7 +33,7 @@ class CalMergeApi(object): ...@@ -33,7 +33,7 @@ class CalMergeApi(object):
ref_type = get_parameter(kwargs, "ref_type") ref_type = get_parameter(kwargs, "ref_type")
level0_data = self.level0Api.get_by_level0_id(level0_id) level0_data = self.level0Api.get_by_level0_id(level0_id)
if level0_data is None: if level0_data.data is None:
return Result.error(message = "level0 data [%s]not found"%(level0_id)) return Result.error(message = "level0 data [%s]not found"%(level0_id))
sql_data = f"select * from ifs_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1" sql_data = f"select * from ifs_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
......
...@@ -33,7 +33,7 @@ class CalMergeApi(object): ...@@ -33,7 +33,7 @@ class CalMergeApi(object):
ref_type = get_parameter(kwargs, "ref_type") ref_type = get_parameter(kwargs, "ref_type")
level0_data = self.level0Api.get_by_level0_id(level0_id) level0_data = self.level0Api.get_by_level0_id(level0_id)
if level0_data is None: if level0_data.data is None:
return Result.error(message = "level0 data [%s]not found"%(level0_id)) return Result.error(message = "level0 data [%s]not found"%(level0_id))
sql_data = f"select * from mci_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1" sql_data = f"select * from mci_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
......
...@@ -3,4 +3,4 @@ from .level0 import Level0DataApi ...@@ -3,4 +3,4 @@ from .level0 import Level0DataApi
from .level0prc import Level0PrcApi from .level0prc import Level0PrcApi
from .level1 import Level1DataApi from .level1 import Level1DataApi
from .level1prc import Level1PrcApi from .level1prc import Level1PrcApi
from .level2catalog import Level2CatalogApi from .level2 import Level2DataApi
\ No newline at end of file \ No newline at end of file
...@@ -33,7 +33,7 @@ class CalMergeApi(object): ...@@ -33,7 +33,7 @@ class CalMergeApi(object):
ref_type = get_parameter(kwargs, "ref_type") ref_type = get_parameter(kwargs, "ref_type")
level0_data = self.level0Api.get_by_level0_id(level0_id) level0_data = self.level0Api.get_by_level0_id(level0_id)
if level0_data is None: if level0_data.data is None:
return Result.error(message = "level0 data [%s]not found"%(level0_id)) return Result.error(message = "level0 data [%s]not found"%(level0_id))
sql_data = f"select * from msc_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1" sql_data = f"select * from msc_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
......
...@@ -99,9 +99,9 @@ def ingest_one(file_path, db, copyfiles): ...@@ -99,9 +99,9 @@ def ingest_one(file_path, db, copyfiles):
dec_obj = header["DEC_OBJ"] dec_obj = header["DEC_OBJ"]
db.execute("delete from msc_level0_header where id=?",(level0_id_id,)) db.execute("delete from msc_level0_header where id=?",(level0_id_id,))
db.execute("insert into msc_level0_header \ db.execute("insert into msc_level0_header \
(id, obs_time, exp_time, ra, `dec`, create_time) \ (id, ra, `dec`, create_time) \
values (?,?,?,?,?,?)", values (?,?,?,?)",
(level0_id_id, exp_start_time, exp_time, ra_obj, dec_obj, create_time)) (level0_id_id, ra_obj, dec_obj, create_time))
if copyfiles: if copyfiles:
#copy files #copy files
......
import os
import logging
import time, datetime
import shutil
from traceback import print_stack
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.common import from_dict_list
from csst_dfs_commons.models.msc import Level2Record, Level2CatalogRecord
log = logging.getLogger('csst')
class Level2DataApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def catalog_query(self, **kwargs):
return Result.error(message = 'level2 catalog not support in the local mode' )
def find(self, **kwargs):
''' retrieve level2 records from database
:param kwargs: Parameter dictionary, key items support:
level1_id: [int]
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
try:
level1_id = get_parameter(kwargs, "level1_id")
data_type = get_parameter(kwargs, "data_type")
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0]
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1]
qc2_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
filename = get_parameter(kwargs, "filename")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from msc_level2_data where 1=1"
sql_data = f"select * from msc_level2_data where 1=1"
sql_condition = ""
if level1_id:
sql_condition = f"{sql_condition} and level1_id='{level1_id}'"
if data_type:
sql_condition = f"{sql_condition} and data_type='{data_type}'"
if create_time_start:
sql_condition = f"{sql_condition} and create_time >='{create_time_start}'"
if create_time_end:
sql_condition = f"{sql_condition} and create_time <='{create_time_end}'"
if qc2_status:
sql_condition = f"{sql_condition} and qc2_status={qc2_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if filename:
sql_condition = f" and filename='{filename}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level2Record, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
id = [int]
return csst_dfs_common.models.Result
'''
try:
the_id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from msc_level2_data where id=?", (the_id,))
if r:
return Result.ok_data(data=Level2Record().from_dict(r))
else:
return Result.error(message=f"id:{the_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from msc_level2_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update msc_level2_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc2_status(self, **kwargs):
''' update the status of QC2
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from msc_level2_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update msc_level2_data set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level2 record into database
parameter kwargs:
level1_id: [int]
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
return csst_dfs_common.models.Result
'''
try:
rec = Level2Record(
id = 0,
level1_id = get_parameter(kwargs, "level1_id"),
data_type = get_parameter(kwargs, "data_type"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now()))
)
existed = self.db.exists(
"select * from msc_level2_data where filename=?",
(rec.filename,)
)
if existed:
log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute(
'INSERT INTO msc_level2_data (level1_id,data_type,filename,file_path,qc2_status,prc_status,prc_time,create_time) \
VALUES(?,?,?,?,?,?,?,?)',
(rec.level1_id, rec.data_type, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from traceback import print_stack
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.common import from_dict_list
from csst_dfs_commons.models.msc import MSCLevel2CatalogRecord
log = logging.getLogger('csst')
class Level2CatalogApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
obs_id: [str]
detector_no: [str]
obs_time : (start, end),
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
obs_id = get_parameter(kwargs, "obs_id")
detector_no = get_parameter(kwargs, "detector_no")
obs_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
obs_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from msc_level2_catalog where 1=1"
sql_data = f"select * from msc_level2_catalog where 1=1"
sql_condition = ""
if obs_id:
sql_condition = f"{sql_condition} and obs_id='{obs_id}'"
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if obs_time_start:
sql_condition = f"{sql_condition} and obs_time >='{obs_time_start}'"
if obs_time_end:
sql_condition = f"{sql_condition} and obs_time <='{obs_time_end}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(MSCLevel2CatalogRecord, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def write(self, records):
try:
recordStrs = []
for recordStr in records:
recordStrs.append(f"({recordStr})")
if recordStrs:
self.db.execute(
f"insert into msc_level2_catalog values {','.join(recordStrs)}"
)
self.db.end()
return Result.ok_data()
except Exception as e:
print_stack()
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
...@@ -33,7 +33,7 @@ class CalMergeApi(object): ...@@ -33,7 +33,7 @@ class CalMergeApi(object):
ref_type = get_parameter(kwargs, "ref_type") ref_type = get_parameter(kwargs, "ref_type")
level0_data = self.level0Api.get_by_level0_id(level0_id) level0_data = self.level0Api.get_by_level0_id(level0_id)
if level0_data is None: if level0_data.data is None:
return Result.error(message = "level0 data [%s]not found"%(level0_id)) return Result.error(message = "level0 data [%s]not found"%(level0_id))
sql_data = f"select * from sls_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1" sql_data = f"select * from sls_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
......
...@@ -62,7 +62,7 @@ def ingesst_one(file_path, db, copyfiles): ...@@ -62,7 +62,7 @@ def ingesst_one(file_path, db, copyfiles):
filename = header["FILENAME"] filename = header["FILENAME"]
existed = db.exists( existed = db.exists(
"select * from t_level0_data where filename=?", "select * from sls_level0_data where filename=?",
(filename,) (filename,)
) )
if existed: if existed:
...@@ -82,7 +82,7 @@ def ingesst_one(file_path, db, copyfiles): ...@@ -82,7 +82,7 @@ def ingesst_one(file_path, db, copyfiles):
level0_id = f"{obs_id}{detector}" level0_id = f"{obs_id}{detector}"
c = db.execute("insert into t_level0_data \ c = db.execute("insert into sls_level0_data \
(level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?,?,?)", values (?,?,?,?,?,?,?,?,?,?,?,?)",
(level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time)) (level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time))
...@@ -91,11 +91,11 @@ def ingesst_one(file_path, db, copyfiles): ...@@ -91,11 +91,11 @@ def ingesst_one(file_path, db, copyfiles):
#level0-header #level0-header
ra_obj = header["RA_OBJ"] ra_obj = header["RA_OBJ"]
dec_obj = header["DEC_OBJ"] dec_obj = header["DEC_OBJ"]
db.execute("delete from t_level0_header where id=?",(level0_id_id,)) db.execute("delete from sls_level0_header where id=?",(level0_id_id,))
db.execute("insert into t_level0_header \ db.execute("insert into sls_level0_header \
(id, obs_time, exp_time, ra, `dec`, create_time) \ (id, ra, `dec`, create_time) \
values (?,?,?,?,?,?)", values (?,?,?,?)",
(level0_id_id, exp_start_time, exp_time, ra_obj, dec_obj, create_time)) (level0_id_id, ra_obj, dec_obj, create_time))
if copyfiles: if copyfiles:
#copy files #copy files
......
...@@ -29,4 +29,6 @@ csst_dfs_api_local.common = *.sql ...@@ -29,4 +29,6 @@ csst_dfs_api_local.common = *.sql
console_scripts = console_scripts =
csst-msc-ingest-local = csst_dfs_api_local.msc.ingest:ingest csst-msc-ingest-local = csst_dfs_api_local.msc.ingest:ingest
csst-ifs-ingest-local = csst_dfs_api_local.ifs.ingest:ingest csst-ifs-ingest-local = csst_dfs_api_local.ifs.ingest:ingest
csst-mci-ingest-local = csst_dfs_api_local.mci.ingest:ingest csst-mci-ingest-local = csst_dfs_api_local.mci.ingest:ingest
\ No newline at end of file csst-sls-ingest-local = csst_dfs_api_local.sls.ingest:ingest
csst-cpic-ingest-local = csst_dfs_api_local.cpic.ingest:ingest
\ No newline at end of file
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
import unittest import unittest
from astropy.io import fits from astropy.io import fits
from csst_dfs_api_local.msc.level2catalog import Level2CatalogApi from csst_dfs_api_local.msc.level2 import Level2CatalogApi
class MSCLevel2CatalogApiTestCase(unittest.TestCase): class MSCLevel2CatalogApiTestCase(unittest.TestCase):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment