Commit bc4842a9 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

split level0data

parent 7f8aff32
...@@ -41,3 +41,4 @@ test_reports.xml ...@@ -41,3 +41,4 @@ test_reports.xml
# tmp test # tmp test
.tmp/* .tmp/*
.hypothesis
\ No newline at end of file
drop table if exists ifs_level1_data; /*----------------facility------------------------------*/
drop table if exists t_module_status;
drop table if exists ifs_level1_header; drop table if exists t_observation;
drop table if exists t_detector;
drop table if exists t_detector_status;
drop table if exists t_facility_status;
drop table if exists t_guiding;
/*----------------msc------------------------------*/
drop table if exists msc_level0_data;
drop table if exists msc_level0_header;
drop table if exists msc_level0_prc;
drop table if exists msc_cal2level0;
drop table if exists msc_cal_header;
drop table if exists msc_cal_merge;
drop table if exists msc_level1_data; drop table if exists msc_level1_data;
drop table if exists msc_level1_header; drop table if exists msc_level1_header;
drop table if exists t_cal2level0; drop table if exists msc_level1_prc;
drop table if exists t_cal_header; drop table if exists msc_level1_ref;
/*----------------ifs------------------------------*/
drop table if exists t_cal_merge; drop table if exists ifs_level0_data;
drop table if exists t_detector; drop table if exists ifs_level0_header;
drop table if exists t_detector_status; drop table if exists ifs_level0_prc;
drop table if exists t_facility_status; drop table if exists ifs_cal2level0;
drop table if exists t_guiding; drop table if exists ifs_cal_header;
drop table if exists t_level0_data; drop table if exists ifs_cal_merge;
drop table if exists t_level0_header; drop table if exists ifs_level1_data;
drop table if exists t_level0_prc; drop table if exists ifs_level1_header;
drop table if exists t_module_status; drop table if exists ifs_level1_prc;
drop table if exists t_observation; drop table if exists ifs_level1_ref;
/*----------------sls------------------------------*/
drop table if exists sls_level0_data;
drop table if exists sls_level0_header;
drop table if exists sls_level0_prc;
drop table if exists sls_cal2level0;
drop table if exists sls_cal_header;
drop table if exists sls_cal_merge;
drop table if exists sls_level1_data;
drop table if exists sls_level1_header;
drop table if exists sls_level1_prc;
drop table if exists sls_level1_ref;
drop table if exists sls_level2_spectra;
/*===========================facility===================================*/
create table t_detector
(
no varchar(10) not null,
detector_name varchar(256) not null,
module_id varchar(20),
filter_id varchar(20),
create_time datetime,
update_time datetime,
primary key (no)
);
/*==============================================================*/ /*==============================================================*/
/* Table: sls_level1_data */ /* Table: t_detector_status */
/*==============================================================*/ /*==============================================================*/
create table sls_level1_data create table t_detector_status
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null, detector_no varchar(10) not null,
data_type varchar(64) not null, status varchar(256) not null,
prc_params varchar(1024), status_time datetime,
filename varchar(128), create_time datetime
file_path varchar(256),
prc_status tinyint(1),
prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
); );
/*==============================================================*/ /*==============================================================*/
/* Table: sls_level1_ref */ /* Table: t_facility_status */
/*==============================================================*/ /*==============================================================*/
create table sls_level1_ref ( create table t_facility_status
level1_id int(20) not null, (
ref_type varchar(64) not null, id integer PRIMARY KEY autoincrement,
cal_id int(20) not null, status varchar(256) not null,
primary key (level1_id, ref_type) status_time datetime,
create_time datetime
); );
/*==============================================================*/ /*==============================================================*/
/* Table: sls_level1_header */ /* Table: t_guiding */
/*==============================================================*/ /*==============================================================*/
create table sls_level1_header create table t_guiding
( (
id int(20) not null, id integer PRIMARY KEY autoincrement,
obs_time datetime, filename varbinary(128),
exp_time float, guiding_file_path varchar(256) not null,
ra float, guiding_no varchar(256),
"dec" float, create_time datetime
create_time datetime,
primary key (id)
); );
/*==============================================================*/ /*==============================================================*/
/* Table: sls_level2_spectra */ /* Table: t_module_status */
/*==============================================================*/ /*==============================================================*/
create table sls_level2_spectra create table t_module_status
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
spectra_id varchar(40), module_id varbinary(20),
level1_id int(20) not null, status varchar(256) not null,
region varchar(128), status_time datetime,
filename varchar(128), create_time datetime
file_path varchar(256), );
/*==============================================================*/
/* Table: t_observation */
/*==============================================================*/
create table t_observation
(
id integer PRIMARY KEY autoincrement,
obs_id varchar(10),
obs_time datetime,
exp_time float,
module_id varchar(20),
obs_type varchar(16),
facility_status_id int(20),
module_status_id int(20),
qc0_status tinyint(1),
qc0_time datetime,
prc_status tinyint(1), prc_status tinyint(1),
prc_time datetime, prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime, create_time datetime,
pipeline_id varchar(60) import_status tinyint(1)
); );
/*==============================================================*/
/* Table: ifs_level1_data */ /*===========================msc===================================*/
/*==============================================================*/ create table msc_level0_data
create table ifs_level1_data
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null, level0_id varchar(20) not null,
data_type varchar(64) not null, obs_id varchar(10) not null,
cor_sci_id int(20), detector_no varchar(10) not null,
prc_params varchar(1024), obs_type varchar(16),
obs_time datetime,
exp_time float,
detector_status_id int(20),
filename varchar(128), filename varchar(128),
file_path varchar(256), file_path varchar(256),
qc0_status tinyint(1),
qc0_time datetime,
prc_status tinyint(1), prc_status tinyint(1),
prc_time datetime, prc_time datetime,
qc1_status tinyint(1), create_time datetime
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
);
/*==============================================================*/
/* Table: ifs_level1_ref */
/*==============================================================*/
create table ifs_level1_ref (
level1_id int(20) not null,
ref_type varchar(64) not null,
cal_id int(20) not null,
primary key (level1_id, ref_type)
); );
/*==============================================================*/
/* Table: ifs_level1_header */ create table msc_level0_header
/*==============================================================*/
create table ifs_level1_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime, obs_time datetime,
...@@ -129,9 +185,17 @@ create table ifs_level1_header ...@@ -129,9 +185,17 @@ create table ifs_level1_header
primary key (id) primary key (id)
); );
/*==============================================================*/ create table msc_level0_prc
/* Table: msc_level1_data */ (
/*==============================================================*/ id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null,
pipeline_id varchar(64) not null,
prc_module varchar(32) not null,
params_file_path varchar(256),
prc_status int(2),
prc_time datetime,
result_file_path varchar(256)
);
create table msc_level1_data create table msc_level1_data
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
...@@ -148,18 +212,12 @@ create table msc_level1_data ...@@ -148,18 +212,12 @@ create table msc_level1_data
create_time datetime, create_time datetime,
pipeline_id varchar(60) pipeline_id varchar(60)
); );
/*==============================================================*/
/* Table: msc_level1_ref */
/*==============================================================*/
create table msc_level1_ref ( create table msc_level1_ref (
level1_id int(20) not null, level1_id int(20) not null,
ref_type varchar(64) not null, ref_type varchar(64) not null,
cal_id int(20) not null, cal_id int(20) not null,
primary key (level1_id, ref_type) primary key (level1_id, ref_type)
); );
/*==============================================================*/
/* Table: msc_level1_header */
/*==============================================================*/
create table msc_level1_header create table msc_level1_header
( (
id int(20) not null, id int(20) not null,
...@@ -170,21 +228,14 @@ create table msc_level1_header ...@@ -170,21 +228,14 @@ create table msc_level1_header
create_time datetime, create_time datetime,
primary key (id) primary key (id)
); );
create table msc_cal2level0
/*==============================================================*/
/* Table: t_cal2level0 */
/*==============================================================*/
create table t_cal2level0
( (
merge_id int(20) not null, merge_id int(20) not null,
level0_id varchar(20) not null, level0_id varchar(20) not null,
primary key (merge_id, level0_id) primary key (merge_id, level0_id)
); );
/*==============================================================*/ create table msc_cal_header
/* Table: t_cal_header */
/*==============================================================*/
create table t_cal_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime, obs_time datetime,
...@@ -195,10 +246,7 @@ create table t_cal_header ...@@ -195,10 +246,7 @@ create table t_cal_header
primary key (id) primary key (id)
); );
/*==============================================================*/ create table msc_cal_merge
/* Table: t_cal_merge */
/*==============================================================*/
create table t_cal_merge
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
cal_id varchar(20) not null, cal_id varchar(20) not null,
...@@ -214,60 +262,141 @@ create table t_cal_merge ...@@ -214,60 +262,141 @@ create table t_cal_merge
prc_time datetime, prc_time datetime,
create_time datetime create_time datetime
); );
create table msc_level1_prc
(
id integer PRIMARY KEY autoincrement,
level1_id int(20) not null,
pipeline_id varchar(64) not null,
prc_module varchar(32) not null,
params_file_path varchar(256),
prc_status int(2),
prc_time datetime,
result_file_path varchar(256)
);
/*===========================ifs===================================*/
create table ifs_level0_data
(
id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null,
obs_id varchar(10) not null,
detector_no varchar(10) not null,
obs_type varchar(16),
obs_time datetime,
exp_time float,
detector_status_id int(20),
filename varchar(128),
file_path varchar(256),
qc0_status tinyint(1),
qc0_time datetime,
prc_status tinyint(1),
prc_time datetime,
create_time datetime
);
/*==============================================================*/ create table ifs_level0_header
/* Table: t_detector */
/*==============================================================*/
create table t_detector
( (
no varchar(10) not null, id int(20) not null,
detector_name varchar(256) not null, obs_time datetime,
module_id varchar(20), exp_time float,
filter_id varchar(20), ra float,
"dec" float,
create_time datetime, create_time datetime,
update_time datetime, primary key (id)
primary key (no)
); );
/*==============================================================*/ create table ifs_level0_prc
/* Table: t_detector_status */
/*==============================================================*/
create table t_detector_status
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
detector_no varchar(10) not null, level0_id varchar(20) not null,
status varchar(256) not null, pipeline_id varchar(64) not null,
status_time datetime, prc_module varchar(32) not null,
create_time datetime params_file_path varchar(256),
prc_status int(2),
prc_time datetime,
result_file_path varchar(256)
);
create table ifs_cal2level0
(
merge_id int(20) not null,
level0_id varchar(20) not null,
primary key (merge_id, level0_id)
); );
/*==============================================================*/ create table ifs_cal_header
/* Table: t_facility_status */ (
/*==============================================================*/ id int(20) not null,
create table t_facility_status obs_time datetime,
exp_time float,
ra float,
"dec" float,
create_time datetime,
primary key (id)
);
create table ifs_cal_merge
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
status varchar(256) not null, cal_id varchar(20) not null,
status_time datetime, detector_no varchar(10) not null,
ref_type varchar(16),
obs_time datetime,
exp_time float,
filename varchar(128),
file_path varchar(256),
qc1_status tinyint(1),
qc1_time datetime,
prc_status tinyint(1),
prc_time datetime,
create_time datetime create_time datetime
); );
/*==============================================================*/ create table ifs_level1_data
/* Table: t_guiding */
/*==============================================================*/
create table t_guiding
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
filename varbinary(128), level0_id varchar(20) not null,
guiding_file_path varchar(256) not null, data_type varchar(64) not null,
guiding_no varchar(256), cor_sci_id int(20),
create_time datetime prc_params varchar(1024),
filename varchar(128),
file_path varchar(256),
prc_status tinyint(1),
prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
); );
/*==============================================================*/ create table ifs_level1_ref (
/* Table: t_level0_data */ level1_id int(20) not null,
/*==============================================================*/ ref_type varchar(64) not null,
create table t_level0_data cal_id int(20) not null,
primary key (level1_id, ref_type)
);
create table ifs_level1_header
(
id int(20) not null,
obs_time datetime,
exp_time float,
ra float,
"dec" float,
create_time datetime,
primary key (id)
);
create table ifs_level1_prc
(
id integer PRIMARY KEY autoincrement,
level1_id int(20) not null,
pipeline_id varchar(64) not null,
prc_module varchar(32) not null,
params_file_path varchar(256),
prc_status int(2),
prc_time datetime,
result_file_path varchar(256)
);
/*===========================sls===================================*/
create table sls_level0_data
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null, level0_id varchar(20) not null,
...@@ -286,10 +415,7 @@ create table t_level0_data ...@@ -286,10 +415,7 @@ create table t_level0_data
create_time datetime create_time datetime
); );
/*==============================================================*/ create table sls_level0_header
/* Table: t_level0_header */
/*==============================================================*/
create table t_level0_header
( (
id int(20) not null, id int(20) not null,
obs_time datetime, obs_time datetime,
...@@ -300,10 +426,7 @@ create table t_level0_header ...@@ -300,10 +426,7 @@ create table t_level0_header
primary key (id) primary key (id)
); );
/*==============================================================*/ create table sls_level0_prc
/* Table: t_level0_prc */
/*==============================================================*/
create table t_level0_prc
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null, level0_id varchar(20) not null,
...@@ -314,10 +437,76 @@ create table t_level0_prc ...@@ -314,10 +437,76 @@ create table t_level0_prc
prc_time datetime, prc_time datetime,
result_file_path varchar(256) result_file_path varchar(256)
); );
/*==============================================================*/ create table sls_cal2level0
/* Table: t_level1_prc */ (
/*==============================================================*/ merge_id int(20) not null,
create table t_level1_prc level0_id varchar(20) not null,
primary key (merge_id, level0_id)
);
create table sls_cal_header
(
id int(20) not null,
obs_time datetime,
exp_time float,
ra float,
"dec" float,
create_time datetime,
primary key (id)
);
create table sls_cal_merge
(
id integer PRIMARY KEY autoincrement,
cal_id varchar(20) not null,
detector_no varchar(10) not null,
ref_type varchar(16),
obs_time datetime,
exp_time float,
filename varchar(128),
file_path varchar(256),
qc1_status tinyint(1),
qc1_time datetime,
prc_status tinyint(1),
prc_time datetime,
create_time datetime
);
create table sls_level1_data
(
id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null,
data_type varchar(64) not null,
prc_params varchar(1024),
filename varchar(128),
file_path varchar(256),
prc_status tinyint(1),
prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
);
create table sls_level1_ref (
level1_id int(20) not null,
ref_type varchar(64) not null,
cal_id int(20) not null,
primary key (level1_id, ref_type)
);
create table sls_level1_header
(
id int(20) not null,
obs_time datetime,
exp_time float,
ra float,
"dec" float,
create_time datetime,
primary key (id)
);
create table sls_level1_prc
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
level1_id int(20) not null, level1_id int(20) not null,
...@@ -328,36 +517,235 @@ create table t_level1_prc ...@@ -328,36 +517,235 @@ create table t_level1_prc
prc_time datetime, prc_time datetime,
result_file_path varchar(256) result_file_path varchar(256)
); );
/*==============================================================*/
/* Table: t_module_status */ create table sls_level2_spectra
/*==============================================================*/
create table t_module_status
( (
id integer PRIMARY KEY autoincrement, id integer PRIMARY KEY autoincrement,
module_id varbinary(20), spectra_id varchar(40),
status varchar(256) not null, level1_id int(20) not null,
status_time datetime, region varchar(128),
create_time datetime filename varchar(128),
file_path varchar(256),
prc_status tinyint(1),
prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
); );
create table sls_level2_spectra_header
/*==============================================================*/
/* Table: t_observation */
/*==============================================================*/
create table t_observation
( (
id integer PRIMARY KEY autoincrement, id int(20) not null,
obs_id varchar(10),
obs_time datetime, obs_time datetime,
exp_time float, exp_time float,
module_id varchar(20), ra float,
obs_type varchar(16), "dec" float,
facility_status_id int(20),
module_status_id int(20),
qc0_status tinyint(1),
qc0_time datetime,
prc_status tinyint(1),
prc_time datetime,
create_time datetime, create_time datetime,
import_status tinyint(1) primary key (id)
); );
-- csst.msc_level2_catalog definition
CREATE TABLE msc_level2_catalog (
source_id integer PRIMARY KEY autoincrement,
obs_id varchar(12),
detector_no char(2),
seq int(20),
flux_aper_1 double,
flux_aper_2 double,
flux_aper_3 double,
flux_aper_4 double,
flux_aper_5 double,
flux_aper_6 double,
flux_aper_7 double,
flux_aper_8 double,
flux_aper_9 double,
flux_aper_10 double,
flux_aper_11 double,
flux_aper_12 double,
fluxerr_aper_1 double,
fluxerr_aper_2 double,
fluxerr_aper_3 double,
fluxerr_aper_4 double,
fluxerr_aper_5 double,
fluxerr_aper_6 double,
fluxerr_aper_7 double,
fluxerr_aper_8 double,
fluxerr_aper_9 double,
fluxerr_aper_10 double,
fluxerr_aper_11 double,
fluxerr_aper_12 double,
mag_aper_1 double,
mag_aper_2 double,
mag_aper_3 double,
mag_aper_4 double,
mag_aper_5 double,
mag_aper_6 double,
mag_aper_7 double,
mag_aper_8 double,
mag_aper_9 double,
mag_aper_10 double,
mag_aper_11 double,
mag_aper_12 double,
magerr_aper_1 double,
magerr_aper_2 double,
magerr_aper_3 double,
magerr_aper_4 double,
magerr_aper_5 double,
magerr_aper_6 double,
magerr_aper_7 double,
magerr_aper_8 double,
magerr_aper_9 double,
magerr_aper_10 double,
magerr_aper_11 double,
magerr_aper_12 double,
flux_auto double,
fluxerr_auto double,
mag_auto double,
magerr_auto double,
kron_radius double,
background double,
x_image double,
y_image double,
alpha_j2000 double,
delta_j2000 double,
a_image double,
b_image double,
theta_image double,
a_world double,
b_world double,
theta_world double,
theta_j2000 double,
errx2_image double,
erry2_image double,
erra_image double,
errb_image double,
errtheta_image double,
erra_world double,
errb_world double,
errtheta_world double,
errtheta_j2000 double,
xwin_image double,
ywin_image double,
alphawin_j2000 double,
deltawin_j2000 double,
errx2win_image double,
erry2win_image double,
flags int(20),
flags_weight int(20),
imaflags_iso double,
nimaflags_iso double,
fwhm_image double,
fwhm_world double,
elongation double,
ellipticity double,
class_star double,
flux_radius double,
fwhmpsf_image double,
fwhmpsf_world double,
xpsf_image double,
ypsf_image double,
alphapsf_j2000 double,
deltapsf_j2000 double,
flux_psf double,
fluxerr_psf double,
mag_psf double,
magerr_psf double,
niter_psf int(20),
chi2_psf double,
errx2psf_image double,
erry2psf_image double,
chi2_model double,
flags_model tinyint(1),
niter_model int(20),
flux_model double,
fluxerr_model double,
mag_model double,
magerr_model double,
flux_hybrid double,
fluxerr_hybrid double,
mag_hybrid double,
magerr_hybrid double,
flux_max_model double,
mu_max_model double,
flux_eff_model double,
mu_eff_model double,
flux_mean_model double,
mu_mean_model double,
xmodel_image double,
ymodel_image double,
alphamodel_j2000 double,
deltamodel_j2000 double,
erry2model_image double,
erramodel_image double,
errbmodel_image double,
errthetamodel_image double,
erramodel_world double,
errbmodel_world double,
errthetamodel_world double,
errthetamodel_j2000 double,
amodel_image double,
bmodel_image double,
thetamodel_image double,
amodel_world double,
bmodel_world double,
thetamodel_world double,
thetamodel_j2000 double,
spread_model double,
spreaderr_model double,
noisearea_model double,
flux_spheroid double,
fluxerr_spheroid double,
mag_spheroid double,
magerr_spheroid double,
flux_max_spheroid double,
mu_max_spheroid double,
flux_eff_spheroid double,
mu_eff_spheroid double,
flux_mean_spheroid double,
mu_mean_spheroid double,
fluxratio_spheroid double,
fluxratioerr_spheroid double,
spheroid_reff_image double,
spheroid_refferr_image double,
spheroid_reff_world double,
spheroid_refferr_world double,
spheroid_aspect_image double,
spheroid_aspecterr_image double,
spheroid_aspect_world double,
spheroid_aspecterr_world double,
spheroid_theta_image double,
spheroid_thetaerr_image double,
spheroid_theta_world double,
spheroid_thetaerr_world double,
spheroid_theta_j2000 double,
spheroid_sersicn double,
spheroid_sersicnerr double,
flux_disk double,
fluxerr_disk double,
mag_disk double,
magerr_disk double,
flux_max_disk double,
mu_max_disk double,
flux_eff_disk double,
mu_eff_disk double,
flux_mean_disk double,
mu_mean_disk double,
fluxratio_disk double,
fluxratioerr_disk double,
disk_scale_image double,
disk_scaleerr_image double,
disk_scale_world double,
disk_scaleerr_world double,
disk_aspect_image double,
disk_aspecterr_image double,
disk_aspect_world double,
disk_aspecterr_world double,
disk_inclination double,
disk_inclinationerr double,
disk_theta_image double,
disk_thetaerr_image double,
disk_theta_world double,
disk_thetaerr_world double,
disk_theta_j2000 double,
obs_time datetime
) ;
\ No newline at end of file
from .calmerge import CalMergeApi
from .level0 import Level0DataApi
from .detector import DetectorApi from .detector import DetectorApi
from .level0prc import Level0PrcApi
from .observation import ObservationApi from .observation import ObservationApi
\ No newline at end of file
from .calmerge import CalMergeApi
from .level0 import Level0DataApi
from .level0prc import Level0PrcApi
from .level1 import Level1DataApi from .level1 import Level1DataApi
from .level1prc import Level1PrcApi
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.ifs import CalMergeRecord
from csst_dfs_commons.models.common import from_dict_list
from .level0 import Level0DataApi
log = logging.getLogger('csst')
class CalMergeApi(object):
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
self.level0Api = Level0DataApi()
def get_latest_by_l0(self, **kwargs):
''' retrieve calibration merge records from database by level0 data
:param kwargs: Parameter dictionary, key items support:
level0_id: [str],
ref_type: [str]
:returns: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
ref_type = get_parameter(kwargs, "ref_type")
level0_data = self.level0Api.get_by_level0_id(level0_id)
if level0_data is None:
return Result.error(message = "level0 data [%s]not found"%(level0_id))
sql_data = f"select * from ifs_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
r = self.db.select_one(sql_data)
if r:
rec = CalMergeRecord().from_dict(r)
return Result.ok_data(data=rec)
sql_data = f"select * from ifs_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time <= '{level0_data.data.obs_time}' order by obs_time DESC limit 1"
r = self.db.select_one(sql_data)
if r:
rec = CalMergeRecord().from_dict(r)
return Result.ok_data(data=rec)
return Result.error(message = "not found")
except Exception as e:
return Result.error(message=str(e))
def find(self, **kwargs):
''' retrieve calibration merge records from database
parameter kwargs:
detector_no: [str]
ref_type: [str]
obs_time: (start,end)
qc1_status : [int]
prc_status : [int]
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
detector_no = get_parameter(kwargs, "detector_no")
ref_type = get_parameter(kwargs, "ref_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from ifs_cal_merge where 1=1"
sql_data = f"select * from ifs_cal_merge where 1=1"
sql_condition = ""
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if ref_type:
sql_condition = f"{sql_condition} and ref_type='{ref_type}'"
if exp_time_start:
sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
if exp_time_end:
sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if file_name:
sql_condition = f" and filename={file_name}"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
log.info(sql_count)
log.info(sql_data)
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(CalMergeRecord, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int],
cal_id : [str]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
if id == 0 and cal_id == "":
return Result.error(message="at least define id or cal_id")
if id != 0:
return self.get_by_id(id)
if cal_id != "":
return self.get_by_cal_id(cal_id)
def get_by_id(self, iid: int):
try:
r = self.db.select_one(
"select * from ifs_cal_merge where id=?", (iid,))
if r:
sql_get_level0_id = f"select level0_id from ifs_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records]
rec = CalMergeRecord().from_dict(r)
rec.level0_ids = level0_ids
return Result.ok_data(data=rec)
else:
return Result.error(message=f"id:{iid} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def get_by_cal_id(self, cal_id: str):
try:
r = self.db.select_one(
"select * from ifs_cal_merge where cal_id=?", (cal_id,))
if r:
sql_get_level0_id = f"select level0_id from ifs_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records]
rec = CalMergeRecord().from_dict(r)
rec.level0_ids = level0_ids
return Result.ok_data(data=rec)
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
result = self.get(id = id, cal_id = cal_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update ifs_cal_merge set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
result = self.get(id = id, cal_id = cal_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from ifs_cal_merge where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update ifs_cal_merge set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a calibration merge record into database
parameter kwargs:
cal_id : [str]
detector_no : [str]
ref_type : [str]
obs_time : [str]
exp_time : [float]
prc_status : [int]
prc_time : [str]
filename : [str]
file_path : [str]
level0_ids : [list]
return csst_dfs_common.models.Result
'''
rec = CalMergeRecord(
id = 0,
cal_id = get_parameter(kwargs, "cal_id"),
detector_no = get_parameter(kwargs, "detector_no"),
ref_type = get_parameter(kwargs, "ref_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
level0_ids = get_parameter(kwargs, "level0_ids", [])
)
try:
self.db.execute(
'INSERT INTO ifs_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?)',
(rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
sql_level0_ids = "insert into ifs_cal2level0 (merge_id,level0_id) values "
values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids]
_ = self.db.execute(sql_level0_ids + ",".join(values))
self.db.end()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os, sys
import argparse
import logging
from astropy.io import fits
import datetime
import shutil
from csst_dfs_api_local.common.db import DBClient
log = logging.getLogger('csst-dfs-api-local')
def ingest():
parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files")
parser.add_argument('-i','--infile', dest="infile", help="a file or a directory")
parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="whether copy files after import")
args = parser.parse_args(sys.argv[1:])
import_root_dir = args.infile
if import_root_dir is None or (not os.path.isfile(import_root_dir) and not os.path.isdir(import_root_dir)):
parser.print_help()
sys.exit(0)
db = DBClient()
if os.path.isfile(import_root_dir):
log.info(f"prepare import {import_root_dir}")
ingest_one(import_root_dir, db, args.copyfiles)
if os.path.isdir(import_root_dir):
for (path, _, file_names) in os.walk(import_root_dir):
for filename in file_names:
if filename.find(".fits") > 0:
file_full_path = os.path.join(path, filename)
log.info(f"prepare import {file_full_path}")
try:
ingest_one(file_full_path, db, args.copyfiles)
except Exception as e:
print(f"{file_full_path} import error!!!")
log.error(e)
db.close()
def ingest_one(file_path, db, copyfiles):
dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
hdul = fits.open(file_path)
header = hdul[0].header
header1 = hdul[1].header
obs_id = header["OBSID"]
exp_start_time = f"{header['DATE-OBS']}"
exp_time = header['EXPTIME']
module_id = header["INSTRUME"]
obs_type = header["OBSTYPE"]
qc0_status = -1
prc_status = -1
time_now = datetime.datetime.now()
create_time = time_now.strftime('%Y-%m-%d %H:%M:%S')
facility_status_id = 0
module_status_id = 0
existed = db.exists("select * from t_observation where obs_id=?", (obs_id,))
if not existed:
db.execute("insert into t_observation \
(obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?)",
(obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time))
db.end()
#level0
detector = header1["DETNAM"]
filename = header["FILENAME"]
existed = db.exists(
"select * from ifs_level0_data where filename=?",
(filename,)
)
if existed:
log.warning('%s has already been imported' %(file_path, ))
db.end()
return
detector_status_id = 0
file_full_path = file_path
if copyfiles:
file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
file_full_path = f"{file_dir}/{filename}.fits"
level0_id = f"{obs_id}{detector}"
c = db.execute("insert into ifs_level0_data \
(level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?,?,?)",
(level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time))
db.end()
level0_id_id = db.last_row_id()
#level0-header
ra_obj = header["OBJ_RA"]
dec_obj = header["OBJ_DEC"]
db.execute("delete from ifs_level0_header where id=?",(level0_id_id,))
db.execute("insert into ifs_level0_header \
(id, obs_time, exp_time, ra, `dec`, create_time) \
values (?,?,?,?,?,?)",
(level0_id_id, exp_start_time, exp_time, ra_obj, dec_obj, create_time))
if copyfiles:
#copy files
shutil.copyfile(file_path, file_full_path)
db.end()
print(f"{file_path} imported")
if __name__ == "__main__":
ingest()
\ No newline at end of file
...@@ -6,13 +6,14 @@ import shutil ...@@ -6,13 +6,14 @@ import shutil
from ..common.db import DBClient from ..common.db import DBClient
from ..common.utils import * from ..common.utils import *
from csst_dfs_commons.models import Result from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import Level0Record from csst_dfs_commons.models.ifs import Level0Record
from csst_dfs_commons.models.common import from_dict_list from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst') log = logging.getLogger('csst')
class Level0DataApi(object): class Level0DataApi(object):
def __init__(self): def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient() self.db = DBClient()
...@@ -42,8 +43,8 @@ class Level0DataApi(object): ...@@ -42,8 +43,8 @@ class Level0DataApi(object):
file_name = get_parameter(kwargs, "file_name") file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0) limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from t_level0_data where 1=1" sql_count = "select count(*) as c from ifs_level0_data where 1=1"
sql_data = f"select * from t_level0_data where 1=1" sql_data = f"select * from ifs_level0_data where 1=1"
sql_condition = "" sql_condition = ""
if obs_id: if obs_id:
...@@ -100,7 +101,7 @@ class Level0DataApi(object): ...@@ -100,7 +101,7 @@ class Level0DataApi(object):
def get_by_id(self, id: int): def get_by_id(self, id: int):
try: try:
r = self.db.select_one( r = self.db.select_one(
"select * from t_level0_data where id=?", (id,)) "select * from ifs_level0_data where id=?", (id,))
if r: if r:
return Result.ok_data(data=Level0Record().from_dict(r)) return Result.ok_data(data=Level0Record().from_dict(r))
else: else:
...@@ -112,7 +113,7 @@ class Level0DataApi(object): ...@@ -112,7 +113,7 @@ class Level0DataApi(object):
def get_by_level0_id(self, level0_id: str): def get_by_level0_id(self, level0_id: str):
try: try:
r = self.db.select_one( r = self.db.select_one(
"select * from t_level0_data where level0_id=?", (level0_id,)) "select * from ifs_level0_data where level0_id=?", (level0_id,))
if r: if r:
return Result.ok_data(data=Level0Record().from_dict(r)) return Result.ok_data(data=Level0Record().from_dict(r))
else: else:
...@@ -142,7 +143,7 @@ class Level0DataApi(object): ...@@ -142,7 +143,7 @@ class Level0DataApi(object):
status = get_parameter(kwargs, "status") status = get_parameter(kwargs, "status")
try: try:
self.db.execute( self.db.execute(
'update t_level0_data set prc_status=?, prc_time=? where id=?', 'update ifs_level0_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id) (status, format_time_ms(time.time()), id)
) )
self.db.end() self.db.end()
...@@ -171,7 +172,7 @@ class Level0DataApi(object): ...@@ -171,7 +172,7 @@ class Level0DataApi(object):
status = get_parameter(kwargs, "status") status = get_parameter(kwargs, "status")
try: try:
self.db.execute( self.db.execute(
'update t_level0_data set qc0_status=?, qc0_time=? where id=?', 'update ifs_level0_data set qc0_status=?, qc0_time=? where id=?',
(status, format_time_ms(time.time()), id) (status, format_time_ms(time.time()), id)
) )
self.db.end() self.db.end()
...@@ -208,7 +209,7 @@ class Level0DataApi(object): ...@@ -208,7 +209,7 @@ class Level0DataApi(object):
rec.level0_id = f"{rec.obs_id}{rec.detector_no}" rec.level0_id = f"{rec.obs_id}{rec.detector_no}"
try: try:
existed = self.db.exists( existed = self.db.exists(
"select * from t_level0_data where filename=?", "select * from ifs_level0_data where filename=?",
(rec.filename,) (rec.filename,)
) )
if existed: if existed:
...@@ -216,7 +217,7 @@ class Level0DataApi(object): ...@@ -216,7 +217,7 @@ class Level0DataApi(object):
return Result.error(message ='%s existed' %(rec.filename, )) return Result.error(message ='%s existed' %(rec.filename, ))
self.db.execute( self.db.execute(
'INSERT INTO t_level0_data (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \ 'INSERT INTO ifs_level0_data (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?,?,?)', VALUES(?,?,?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,-1,-1,format_time_ms(time.time())) (rec.level0_id, rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,-1,-1,format_time_ms(time.time()))
) )
......
...@@ -6,13 +6,14 @@ import shutil ...@@ -6,13 +6,14 @@ import shutil
from ..common.db import DBClient from ..common.db import DBClient
from ..common.utils import * from ..common.utils import *
from csst_dfs_commons.models import Result from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import Level0PrcRecord from csst_dfs_commons.models.ifs import Level0PrcRecord
from csst_dfs_commons.models.common import from_dict_list from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst') log = logging.getLogger('csst')
class Level0PrcApi(object): class Level0PrcApi(object):
def __init__(self): def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient() self.db = DBClient()
...@@ -33,7 +34,7 @@ class Level0PrcApi(object): ...@@ -33,7 +34,7 @@ class Level0PrcApi(object):
prc_module = get_parameter(kwargs, "prc_module") prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status") prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from t_level0_prc" sql_data = f"select * from ifs_level0_prc"
sql_condition = f"where level0_id='{level0_id}'" sql_condition = f"where level0_id='{level0_id}'"
if pipeline_id: if pipeline_id:
...@@ -65,14 +66,14 @@ class Level0PrcApi(object): ...@@ -65,14 +66,14 @@ class Level0PrcApi(object):
try: try:
existed = self.db.exists( existed = self.db.exists(
"select * from t_level0_prc where id=?", "select * from ifs_level0_prc where id=?",
(id,) (id,)
) )
if not existed: if not existed:
log.warning('%s not found' %(id, )) log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, )) return Result.error(message ='%s not found' %(id, ))
self.db.execute( self.db.execute(
'update t_level0_prc set prc_status=?, prc_time=? where id=?', 'update ifs_level0_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id) (status, format_time_ms(time.time()), id)
) )
self.db.end() self.db.end()
...@@ -108,7 +109,7 @@ class Level0PrcApi(object): ...@@ -108,7 +109,7 @@ class Level0PrcApi(object):
) )
try: try:
self.db.execute( self.db.execute(
'INSERT INTO t_level0_prc (level0_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ 'INSERT INTO ifs_level0_prc (level0_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)', VALUES(?,?,?,?,?,?,?)',
(rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) (rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
) )
......
...@@ -6,7 +6,7 @@ import shutil ...@@ -6,7 +6,7 @@ import shutil
from ..common.db import DBClient from ..common.db import DBClient
from ..common.utils import * from ..common.utils import *
from csst_dfs_commons.models import Result from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import Level1PrcRecord from csst_dfs_commons.models.ifs import Level1PrcRecord
from csst_dfs_commons.models.common import from_dict_list from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst') log = logging.getLogger('csst')
...@@ -33,7 +33,7 @@ class Level1PrcApi(object): ...@@ -33,7 +33,7 @@ class Level1PrcApi(object):
prc_module = get_parameter(kwargs, "prc_module") prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status") prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from t_level1_prc" sql_data = f"select * from ifs_level1_prc"
sql_condition = f"where level1_id={level1_id}" sql_condition = f"where level1_id={level1_id}"
if pipeline_id: if pipeline_id:
...@@ -65,14 +65,14 @@ class Level1PrcApi(object): ...@@ -65,14 +65,14 @@ class Level1PrcApi(object):
try: try:
existed = self.db.exists( existed = self.db.exists(
"select * from t_level1_prc where id=?", "select * from ifs_level1_prc where id=?",
(id,) (id,)
) )
if not existed: if not existed:
log.warning('%s not found' %(id, )) log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, )) return Result.error(message ='%s not found' %(id, ))
self.db.execute( self.db.execute(
'update t_level1_prc set prc_status=?, prc_time=? where id=?', 'update ifs_level1_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id) (status, format_time_ms(time.time()), id)
) )
self.db.end() self.db.end()
...@@ -108,7 +108,7 @@ class Level1PrcApi(object): ...@@ -108,7 +108,7 @@ class Level1PrcApi(object):
) )
try: try:
self.db.execute( self.db.execute(
'INSERT INTO t_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \ 'INSERT INTO ifs_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)', VALUES(?,?,?,?,?,?,?)',
(rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path) (rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
) )
......
from .calmerge import CalMergeApi
from .level0 import Level0DataApi
from .level0prc import Level0PrcApi
from .level1 import Level1DataApi from .level1 import Level1DataApi
from .level1prc import Level1PrcApi
from .level2catalog import Level2CatalogApi
\ No newline at end of file
...@@ -6,14 +6,15 @@ import shutil ...@@ -6,14 +6,15 @@ import shutil
from ..common.db import DBClient from ..common.db import DBClient
from ..common.utils import * from ..common.utils import *
from csst_dfs_commons.models import Result from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import CalMergeRecord from csst_dfs_commons.models.msc import CalMergeRecord
from csst_dfs_commons.models.common import from_dict_list from csst_dfs_commons.models.common import from_dict_list
from .level0 import Level0DataApi from .level0 import Level0DataApi
log = logging.getLogger('csst') log = logging.getLogger('csst')
class CalMergeApi(object): class CalMergeApi(object):
def __init__(self): def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst") self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient() self.db = DBClient()
self.level0Api = Level0DataApi() self.level0Api = Level0DataApi()
...@@ -35,14 +36,14 @@ class CalMergeApi(object): ...@@ -35,14 +36,14 @@ class CalMergeApi(object):
if level0_data is None: if level0_data is None:
return Result.error(message = "level0 data [%s]not found"%(level0_id)) return Result.error(message = "level0 data [%s]not found"%(level0_id))
sql_data = f"select * from t_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1" sql_data = f"select * from msc_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
r = self.db.select_one(sql_data) r = self.db.select_one(sql_data)
if r: if r:
rec = CalMergeRecord().from_dict(r) rec = CalMergeRecord().from_dict(r)
return Result.ok_data(data=rec) return Result.ok_data(data=rec)
sql_data = f"select * from t_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time <= '{level0_data.data.obs_time}' order by obs_time DESC limit 1" sql_data = f"select * from msc_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time <= '{level0_data.data.obs_time}' order by obs_time DESC limit 1"
r = self.db.select_one(sql_data) r = self.db.select_one(sql_data)
if r: if r:
...@@ -78,8 +79,8 @@ class CalMergeApi(object): ...@@ -78,8 +79,8 @@ class CalMergeApi(object):
file_name = get_parameter(kwargs, "file_name") file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0) limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from t_cal_merge where 1=1" sql_count = "select count(*) as c from msc_cal_merge where 1=1"
sql_data = f"select * from t_cal_merge where 1=1" sql_data = f"select * from msc_cal_merge where 1=1"
sql_condition = "" sql_condition = ""
if detector_no: if detector_no:
...@@ -138,10 +139,10 @@ class CalMergeApi(object): ...@@ -138,10 +139,10 @@ class CalMergeApi(object):
def get_by_id(self, id: str): def get_by_id(self, id: str):
try: try:
r = self.db.select_one( r = self.db.select_one(
"select * from t_cal_merge where id=?", (id,)) "select * from msc_cal_merge where id=?", (id,))
if r: if r:
sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" sql_get_level0_id = f"select level0_id from msc_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id) _, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records] level0_ids = [r["level0_id"] for r in records]
...@@ -158,10 +159,10 @@ class CalMergeApi(object): ...@@ -158,10 +159,10 @@ class CalMergeApi(object):
def get_by_cal_id(self, cal_id: str): def get_by_cal_id(self, cal_id: str):
try: try:
r = self.db.select_one( r = self.db.select_one(
"select * from t_cal_merge where cal_id=?", (cal_id,)) "select * from msc_cal_merge where cal_id=?", (cal_id,))
if r: if r:
sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" sql_get_level0_id = f"select level0_id from msc_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id) _, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records] level0_ids = [r["level0_id"] for r in records]
...@@ -197,7 +198,7 @@ class CalMergeApi(object): ...@@ -197,7 +198,7 @@ class CalMergeApi(object):
try: try:
self.db.execute( self.db.execute(
'update t_cal_merge set qc1_status=?, qc1_time=? where id=?', 'update msc_cal_merge set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), id) (status, format_time_ms(time.time()), id)
) )
self.db.end() self.db.end()
...@@ -229,14 +230,14 @@ class CalMergeApi(object): ...@@ -229,14 +230,14 @@ class CalMergeApi(object):
try: try:
existed = self.db.exists( existed = self.db.exists(
"select * from t_cal_merge where id=?", "select * from msc_cal_merge where id=?",
(id,) (id,)
) )
if not existed: if not existed:
log.warning('%s not found' %(id, )) log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, )) return Result.error(message ='%s not found' %(id, ))
self.db.execute( self.db.execute(
'update t_cal_merge set prc_status=?, prc_time=? where id=?', 'update msc_cal_merge set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id) (status, format_time_ms(time.time()), id)
) )
self.db.end() self.db.end()
...@@ -278,14 +279,14 @@ class CalMergeApi(object): ...@@ -278,14 +279,14 @@ class CalMergeApi(object):
) )
try: try:
self.db.execute( self.db.execute(
'INSERT INTO t_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \ 'INSERT INTO msc_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?)', VALUES(?,?,?,?,?,?,?,?,?,?)',
(rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time())) (rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time()))
) )
self.db.end() self.db.end()
rec.id = self.db.last_row_id() rec.id = self.db.last_row_id()
sql_level0_ids = "insert into t_cal2level0 (merge_id,level0_id) values " sql_level0_ids = "insert into msc_cal2level0 (merge_id,level0_id) values "
values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids] values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids]
_ = self.db.execute(sql_level0_ids + ",".join(values)) _ = self.db.execute(sql_level0_ids + ",".join(values))
......
import os, sys
import argparse
import logging
from astropy.io import fits
import datetime
import shutil
from csst_dfs_api_local.common.db import DBClient
log = logging.getLogger('csst-dfs-api-local')
def ingest():
parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files")
parser.add_argument('-i','--infile', dest="infile", help="a file or a directory")
parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="whether copy files after import")
args = parser.parse_args(sys.argv[1:])
import_root_dir = args.infile
if import_root_dir is None or (not os.path.isfile(import_root_dir) and not os.path.isdir(import_root_dir)):
parser.print_help()
sys.exit(0)
db = DBClient()
if os.path.isfile(import_root_dir):
log.info(f"prepare import {import_root_dir}")
ingest_one(import_root_dir, db, args.copyfiles)
if os.path.isdir(import_root_dir):
for (path, _, file_names) in os.walk(import_root_dir):
for filename in file_names:
if filename.find(".fits") > 0:
file_full_path = os.path.join(path, filename)
log.info(f"prepare import {file_full_path}")
ingest_one(file_full_path, db, args.copyfiles)
db.close()
def ingest_one(file_path, db, copyfiles):
dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
hdul = fits.open(file_path)
header = hdul[0].header
obs_id = header["OBSID"]
exp_start_time = f"{header['DATE-OBS']} {header['TIME-OBS']}"
exp_time = header['EXPTIME']
module_id = header["INSTRUME"]
obs_type = header["FILETYPE"]
qc0_status = -1
prc_status = -1
time_now = datetime.datetime.now()
create_time = time_now.strftime('%Y-%m-%d %H:%M:%S')
facility_status_id = 0
module_status_id = 0
existed = db.exists("select * from t_observation where obs_id=?", (obs_id,))
if not existed:
db.execute("insert into t_observation \
(obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?)",
(obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time))
db.end()
#level0
detector = header["DETECTOR"]
filename = header["FILENAME"]
existed = db.exists(
"select * from t_level0_data where filename=?",
(filename,)
)
if existed:
log.warning('%s has already been imported' %(file_path, ))
db.end()
return
detector_status_id = 0
file_full_path = file_path
if copyfiles:
file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
file_full_path = f"{file_dir}/{filename}.fits"
level0_id = f"{obs_id}{detector}"
c = db.execute("insert into msc_level0_data \
(level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?,?,?)",
(level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time))
db.end()
level0_id_id = db.last_row_id()
#level0-header
ra_obj = header["RA_OBJ"]
dec_obj = header["DEC_OBJ"]
db.execute("delete from msc_level0_header where id=?",(level0_id_id,))
db.execute("insert into msc_level0_header \
(id, obs_time, exp_time, ra, `dec`, create_time) \
values (?,?,?,?,?,?)",
(level0_id_id, exp_start_time, exp_time, ra_obj, dec_obj, create_time))
if copyfiles:
#copy files
shutil.copyfile(file_path, file_full_path)
db.end()
print(f"{file_path} imported")
if __name__ == "__main__":
ingest()
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.msc import Level0Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level0DataApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level0 records from database
parameter kwargs:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
obs_id = get_parameter(kwargs, "obs_id")
detector_no = get_parameter(kwargs, "detector_no")
obs_type = get_parameter(kwargs, "obs_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc0_status = get_parameter(kwargs, "qc0_status")
prc_status = get_parameter(kwargs, "prc_status")
file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from msc_level0_data where 1=1"
sql_data = f"select * from msc_level0_data where 1=1"
sql_condition = ""
if obs_id:
sql_condition = f"{sql_condition} and obs_id='{obs_id}'"
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if obs_type:
sql_condition = f"{sql_condition} and obs_type='{obs_type}'"
if exp_time_start:
sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
if exp_time_end:
sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
if qc0_status:
sql_condition = f"{sql_condition} and qc0_status={qc0_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if file_name:
sql_condition = f" and filename='{file_name}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level0Record, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int],
level0_id : [str]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
level0_id = get_parameter(kwargs, "level0_id", "")
if id == 0 and level0_id == "":
return Result.error(message="at least define id or level0_id")
if id != 0:
return self.get_by_id(id)
if level0_id != "":
return self.get_by_level0_id(level0_id)
def get_by_id(self, id: int):
try:
r = self.db.select_one(
"select * from msc_level0_data where id=?", (id,))
if r:
return Result.ok_data(data=Level0Record().from_dict(r))
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def get_by_level0_id(self, level0_id: str):
try:
r = self.db.select_one(
"select * from msc_level0_data where level0_id=?", (level0_id,))
if r:
return Result.ok_data(data=Level0Record().from_dict(r))
else:
return Result.error(message=f"level0_id:{level0_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
level0_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
level0_id = get_parameter(kwargs, "level0_id")
result = self.get(id = id, level0_id = level0_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update msc_level0_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc0_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
id : [int],
level0_id : [str],
status : [int]
'''
id = get_parameter(kwargs, "id")
level0_id = get_parameter(kwargs, "level0_id")
result = self.get(id = id, level0_id = level0_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update msc_level0_data set qc0_status=?, qc0_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level0 data record into database
parameter kwargs:
obs_id = [str]
detector_no = [str]
obs_type = [str]
obs_time = [str]
exp_time = [int]
detector_status_id = [int]
filename = [str]
file_path = [str]
return: csst_dfs_common.models.Result
'''
rec = Level0Record(
obs_id = get_parameter(kwargs, "obs_id"),
detector_no = get_parameter(kwargs, "detector_no"),
obs_type = get_parameter(kwargs, "obs_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
detector_status_id = get_parameter(kwargs, "detector_status_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path")
)
rec.level0_id = f"{rec.obs_id}{rec.detector_no}"
try:
existed = self.db.exists(
"select * from msc_level0_data where filename=?",
(rec.filename,)
)
if existed:
log.warning('%s existed' %(rec.filename, ))
return Result.error(message ='%s existed' %(rec.filename, ))
self.db.execute(
'INSERT INTO msc_level0_data (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,-1,-1,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.msc import Level0PrcRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level0PrcApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level0 procedure records from database
parameter kwargs:
level0_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
pipeline_id = get_parameter(kwargs, "pipeline_id")
prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from msc_level0_prc"
sql_condition = f"where level0_id='{level0_id}'"
if pipeline_id:
sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'"
if prc_module:
sql_condition = sql_condition + " and prc_module ='" + prc_module + "'"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level0PrcRecord, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from msc_level0_prc where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update msc_level0_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level0 procedure record into database
parameter kwargs:
level0_id : [str]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec = Level0PrcRecord(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_file_path = get_parameter(kwargs, "params_file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
result_file_path = get_parameter(kwargs, "result_file_path")
)
try:
self.db.execute(
'INSERT INTO msc_level0_prc (level0_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)',
(rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.msc import Level1PrcRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1PrcApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 procedure records from database
parameter kwargs:
level1_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
level1_id = get_parameter(kwargs, "level1_id", 0)
pipeline_id = get_parameter(kwargs, "pipeline_id")
prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from msc_level1_prc"
sql_condition = f"where level1_id={level1_id}"
if pipeline_id:
sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'"
if prc_module:
sql_condition = sql_condition + " and prc_module ='" + prc_module + "'"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from msc_level1_prc where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update msc_level1_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 procedure record into database
parameter kwargs:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec = Level1PrcRecord(
id = 0,
level1_id = get_parameter(kwargs, "level1_id", 0),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_file_path = get_parameter(kwargs, "params_file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
result_file_path = get_parameter(kwargs, "result_file_path")
)
try:
self.db.execute(
'INSERT INTO msc_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)',
(rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
import os
import logging
import time, datetime
import shutil
from traceback import print_stack
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.common import from_dict_list
from csst_dfs_commons.models.msc import MSCLevel2CatalogRecord
log = logging.getLogger('csst')
class Level2CatalogApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
obs_id: [str]
detector_no: [str]
obs_time : (start, end),
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
obs_id = get_parameter(kwargs, "obs_id")
detector_no = get_parameter(kwargs, "detector_no")
obs_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
obs_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from msc_level2_catalog where 1=1"
sql_data = f"select * from msc_level2_catalog where 1=1"
sql_condition = ""
if obs_id:
sql_condition = f"{sql_condition} and obs_id='{obs_id}'"
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if obs_time_start:
sql_condition = f"{sql_condition} and obs_time >='{obs_time_start}'"
if obs_time_end:
sql_condition = f"{sql_condition} and obs_time <='{obs_time_end}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(MSCLevel2CatalogRecord, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def write(self, records):
try:
recordStrs = []
for recordStr in records:
recordStrs.append(f"({recordStr})")
if recordStrs:
self.db.execute(
f"insert into msc_level2_catalog values {','.join(recordStrs)}"
)
self.db.end()
return Result.ok_data()
except Exception as e:
print_stack()
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.sls import CalMergeRecord
from csst_dfs_commons.models.common import from_dict_list
from .level0 import Level0DataApi
log = logging.getLogger('csst')
class CalMergeApi(object):
def __init__(self, sub_system = "sls"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
self.level0Api = Level0DataApi()
def get_latest_by_l0(self, **kwargs):
''' retrieve calibration merge records from database by level0 data
:param kwargs: Parameter dictionary, key items support:
level0_id: [str],
ref_type: [str]
:returns: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
ref_type = get_parameter(kwargs, "ref_type")
level0_data = self.level0Api.get_by_level0_id(level0_id)
if level0_data is None:
return Result.error(message = "level0 data [%s]not found"%(level0_id))
sql_data = f"select * from sls_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time >= '{level0_data.data.obs_time}' order by obs_time ASC limit 1"
r = self.db.select_one(sql_data)
if r:
rec = CalMergeRecord().from_dict(r)
return Result.ok_data(data=rec)
sql_data = f"select * from sls_cal_merge where detector_no='{level0_data.data.detector_no}' and ref_type='{ref_type}' and obs_time <= '{level0_data.data.obs_time}' order by obs_time DESC limit 1"
r = self.db.select_one(sql_data)
if r:
rec = CalMergeRecord().from_dict(r)
return Result.ok_data(data=rec)
return Result.error(message = "not found")
except Exception as e:
return Result.error(message=str(e))
def find(self, **kwargs):
''' retrieve calibration merge records from database
parameter kwargs:
detector_no: [str]
ref_type: [str]
obs_time: (start,end)
qc1_status : [int]
prc_status : [int]
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
detector_no = get_parameter(kwargs, "detector_no")
ref_type = get_parameter(kwargs, "ref_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from sls_cal_merge where 1=1"
sql_data = f"select * from sls_cal_merge where 1=1"
sql_condition = ""
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if ref_type:
sql_condition = f"{sql_condition} and ref_type='{ref_type}'"
if exp_time_start:
sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
if exp_time_end:
sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if file_name:
sql_condition = f" and filename={file_name}"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
log.info(sql_count)
log.info(sql_data)
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(CalMergeRecord, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int],
cal_id : [str]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
if id == 0 and cal_id == "":
return Result.error(message="at least define id or cal_id")
if id != 0:
return self.get_by_id(id)
if cal_id != "":
return self.get_by_cal_id(cal_id)
def get_by_id(self, id: str):
try:
r = self.db.select_one(
"select * from sls_cal_merge where id=?", (id,))
if r:
sql_get_level0_id = f"select level0_id from sls_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records]
rec = CalMergeRecord().from_dict(r)
rec.level0_ids = level0_ids
return Result.ok_data(data=rec)
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def get_by_cal_id(self, cal_id: str):
try:
r = self.db.select_one(
"select * from sls_cal_merge where cal_id=?", (cal_id,))
if r:
sql_get_level0_id = f"select level0_id from sls_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records]
rec = CalMergeRecord().from_dict(r)
rec.level0_ids = level0_ids
return Result.ok_data(data=rec)
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
result = self.get(id = id, cal_id = cal_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update sls_cal_merge set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
cal_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
cal_id = get_parameter(kwargs, "cal_id", "")
result = self.get(id = id, cal_id = cal_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_cal_merge where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update sls_cal_merge set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a calibration merge record into database
parameter kwargs:
cal_id : [str]
detector_no : [str]
ref_type : [str]
obs_time : [str]
exp_time : [float]
prc_status : [int]
prc_time : [str]
filename : [str]
file_path : [str]
level0_ids : [list]
return csst_dfs_common.models.Result
'''
rec = CalMergeRecord(
id = 0,
cal_id = get_parameter(kwargs, "cal_id"),
detector_no = get_parameter(kwargs, "detector_no"),
ref_type = get_parameter(kwargs, "ref_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
level0_ids = get_parameter(kwargs, "level0_ids", [])
)
try:
self.db.execute(
'INSERT INTO sls_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?)',
(rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
sql_level0_ids = "insert into sls_cal2level0 (merge_id,level0_id) values "
values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids]
_ = self.db.execute(sql_level0_ids + ",".join(values))
self.db.end()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.sls import Level0Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level0DataApi(object):
def __init__(self, sub_system = "sls"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level0 records from database
parameter kwargs:
obs_id: [str]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
obs_id = get_parameter(kwargs, "obs_id")
detector_no = get_parameter(kwargs, "detector_no")
obs_type = get_parameter(kwargs, "obs_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc0_status = get_parameter(kwargs, "qc0_status")
prc_status = get_parameter(kwargs, "prc_status")
file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from sls_level0_data where 1=1"
sql_data = f"select * from sls_level0_data where 1=1"
sql_condition = ""
if obs_id:
sql_condition = f"{sql_condition} and obs_id='{obs_id}'"
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if obs_type:
sql_condition = f"{sql_condition} and obs_type='{obs_type}'"
if exp_time_start:
sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
if exp_time_end:
sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
if qc0_status:
sql_condition = f"{sql_condition} and qc0_status={qc0_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if file_name:
sql_condition = f" and filename='{file_name}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level0Record, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int],
level0_id : [str]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id", 0)
level0_id = get_parameter(kwargs, "level0_id", "")
if id == 0 and level0_id == "":
return Result.error(message="at least define id or level0_id")
if id != 0:
return self.get_by_id(id)
if level0_id != "":
return self.get_by_level0_id(level0_id)
def get_by_id(self, id: int):
try:
r = self.db.select_one(
"select * from sls_level0_data where id=?", (id,))
if r:
return Result.ok_data(data=Level0Record().from_dict(r))
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def get_by_level0_id(self, level0_id: str):
try:
r = self.db.select_one(
"select * from sls_level0_data where level0_id=?", (level0_id,))
if r:
return Result.ok_data(data=Level0Record().from_dict(r))
else:
return Result.error(message=f"level0_id:{level0_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
level0_id : [str],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
level0_id = get_parameter(kwargs, "level0_id")
result = self.get(id = id, level0_id = level0_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update sls_level0_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc0_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
id : [int],
level0_id : [str],
status : [int]
'''
id = get_parameter(kwargs, "id")
level0_id = get_parameter(kwargs, "level0_id")
result = self.get(id = id, level0_id = level0_id)
if not result.success:
return Result.error(message="not found")
id = result.data.id
status = get_parameter(kwargs, "status")
try:
self.db.execute(
'update sls_level0_data set qc0_status=?, qc0_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level0 data record into database
parameter kwargs:
obs_id = [str]
detector_no = [str]
obs_type = [str]
obs_time = [str]
exp_time = [int]
detector_status_id = [int]
filename = [str]
file_path = [str]
return: csst_dfs_common.models.Result
'''
rec = Level0Record(
obs_id = get_parameter(kwargs, "obs_id"),
detector_no = get_parameter(kwargs, "detector_no"),
obs_type = get_parameter(kwargs, "obs_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
detector_status_id = get_parameter(kwargs, "detector_status_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path")
)
rec.level0_id = f"{rec.obs_id}{rec.detector_no}"
try:
existed = self.db.exists(
"select * from sls_level0_data where filename=?",
(rec.filename,)
)
if existed:
log.warning('%s existed' %(rec.filename, ))
return Result.error(message ='%s existed' %(rec.filename, ))
self.db.execute(
'INSERT INTO sls_level0_data (level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,-1,-1,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.sls import Level0PrcRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level0PrcApi(object):
def __init__(self, sub_system = "sls"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level0 procedure records from database
parameter kwargs:
level0_id: [str]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
pipeline_id = get_parameter(kwargs, "pipeline_id")
prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from sls_level0_prc"
sql_condition = f"where level0_id='{level0_id}'"
if pipeline_id:
sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'"
if prc_module:
sql_condition = sql_condition + " and prc_module ='" + prc_module + "'"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level0PrcRecord, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_level0_prc where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update sls_level0_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level0 procedure record into database
parameter kwargs:
level0_id : [str]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec = Level0PrcRecord(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_file_path = get_parameter(kwargs, "params_file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
result_file_path = get_parameter(kwargs, "result_file_path")
)
try:
self.db.execute(
'INSERT INTO sls_level0_prc (level0_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)',
(rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment