Commit 7f8aff32 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

level1 and sls

parent 46daedc4
...@@ -29,7 +29,64 @@ drop table if exists t_level0_prc; ...@@ -29,7 +29,64 @@ drop table if exists t_level0_prc;
drop table if exists t_module_status; drop table if exists t_module_status;
drop table if exists t_observation; drop table if exists t_observation;
/*==============================================================*/
/* Table: sls_level1_data */
/*==============================================================*/
create table sls_level1_data
(
id integer PRIMARY KEY autoincrement,
level0_id varchar(20) not null,
data_type varchar(64) not null,
prc_params varchar(1024),
filename varchar(128),
file_path varchar(256),
prc_status tinyint(1),
prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
);
/*==============================================================*/
/* Table: sls_level1_ref */
/*==============================================================*/
create table sls_level1_ref (
level1_id int(20) not null,
ref_type varchar(64) not null,
cal_id int(20) not null,
primary key (level1_id, ref_type)
);
/*==============================================================*/
/* Table: sls_level1_header */
/*==============================================================*/
create table sls_level1_header
(
id int(20) not null,
obs_time datetime,
exp_time float,
ra float,
"dec" float,
create_time datetime,
primary key (id)
);
/*==============================================================*/
/* Table: sls_level2_spectra */
/*==============================================================*/
create table sls_level2_spectra
(
id integer PRIMARY KEY autoincrement,
spectra_id varchar(40),
level1_id int(20) not null,
region varchar(128),
filename varchar(128),
file_path varchar(256),
prc_status tinyint(1),
prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
);
/*==============================================================*/ /*==============================================================*/
/* Table: ifs_level1_data */ /* Table: ifs_level1_data */
/*==============================================================*/ /*==============================================================*/
...@@ -40,12 +97,6 @@ create table ifs_level1_data ...@@ -40,12 +97,6 @@ create table ifs_level1_data
data_type varchar(64) not null, data_type varchar(64) not null,
cor_sci_id int(20), cor_sci_id int(20),
prc_params varchar(1024), prc_params varchar(1024),
flat_id int(20),
dark_id int(20),
bias_id int(20),
lamp_id int(20),
arc_id int(20),
sky_id int(20),
filename varchar(128), filename varchar(128),
file_path varchar(256), file_path varchar(256),
prc_status tinyint(1), prc_status tinyint(1),
...@@ -55,7 +106,15 @@ create table ifs_level1_data ...@@ -55,7 +106,15 @@ create table ifs_level1_data
create_time datetime, create_time datetime,
pipeline_id varchar(60) pipeline_id varchar(60)
); );
/*==============================================================*/
/* Table: ifs_level1_ref */
/*==============================================================*/
create table ifs_level1_ref (
level1_id int(20) not null,
ref_type varchar(64) not null,
cal_id int(20) not null,
primary key (level1_id, ref_type)
);
/*==============================================================*/ /*==============================================================*/
/* Table: ifs_level1_header */ /* Table: ifs_level1_header */
/*==============================================================*/ /*==============================================================*/
...@@ -80,9 +139,6 @@ create table msc_level1_data ...@@ -80,9 +139,6 @@ create table msc_level1_data
data_type varchar(64) not null, data_type varchar(64) not null,
cor_sci_id int(20), cor_sci_id int(20),
prc_params varchar(1024), prc_params varchar(1024),
flat_id int(20),
dark_id int(20),
bias_id int(20),
filename varchar(128), filename varchar(128),
file_path varchar(256), file_path varchar(256),
prc_status tinyint(1), prc_status tinyint(1),
...@@ -92,7 +148,15 @@ create table msc_level1_data ...@@ -92,7 +148,15 @@ create table msc_level1_data
create_time datetime, create_time datetime,
pipeline_id varchar(60) pipeline_id varchar(60)
); );
/*==============================================================*/
/* Table: msc_level1_ref */
/*==============================================================*/
create table msc_level1_ref (
level1_id int(20) not null,
ref_type varchar(64) not null,
cal_id int(20) not null,
primary key (level1_id, ref_type)
);
/*==============================================================*/ /*==============================================================*/
/* Table: msc_level1_header */ /* Table: msc_level1_header */
/*==============================================================*/ /*==============================================================*/
...@@ -250,7 +314,20 @@ create table t_level0_prc ...@@ -250,7 +314,20 @@ create table t_level0_prc
prc_time datetime, prc_time datetime,
result_file_path varchar(256) result_file_path varchar(256)
); );
/*==============================================================*/
/* Table: t_level1_prc */
/*==============================================================*/
create table t_level1_prc
(
id integer PRIMARY KEY autoincrement,
level1_id int(20) not null,
pipeline_id varchar(64) not null,
prc_module varchar(32) not null,
params_file_path varchar(256),
prc_status int(2),
prc_time datetime,
result_file_path varchar(256)
);
/*==============================================================*/ /*==============================================================*/
/* Table: t_module_status */ /* Table: t_module_status */
/*==============================================================*/ /*==============================================================*/
......
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import Level1PrcRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1PrcApi(object):
def __init__(self):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 procedure records from database
parameter kwargs:
level1_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
level1_id = get_parameter(kwargs, "level1_id", 0)
pipeline_id = get_parameter(kwargs, "pipeline_id")
prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from t_level1_prc"
sql_condition = f"where level1_id={level1_id}"
if pipeline_id:
sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'"
if prc_module:
sql_condition = sql_condition + " and prc_module ='" + prc_module + "'"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from t_level1_prc where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update t_level1_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 procedure record into database
parameter kwargs:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec = Level1PrcRecord(
id = 0,
level1_id = get_parameter(kwargs, "level1_id", 0),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_file_path = get_parameter(kwargs, "params_file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
result_file_path = get_parameter(kwargs, "result_file_path")
)
try:
self.db.execute(
'INSERT INTO t_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)',
(rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
...@@ -23,7 +23,6 @@ class Level1DataApi(object): ...@@ -23,7 +23,6 @@ class Level1DataApi(object):
parameter kwargs: parameter kwargs:
level0_id: [str] level0_id: [str]
data_type: [str] data_type: [str]
obs_type: [str]
create_time : (start, end), create_time : (start, end),
qc1_status : [int], qc1_status : [int],
prc_status : [int], prc_status : [int],
...@@ -161,17 +160,12 @@ class Level1DataApi(object): ...@@ -161,17 +160,12 @@ class Level1DataApi(object):
data_type : [str] data_type : [str]
cor_sci_id : [int] cor_sci_id : [int]
prc_params : [str] prc_params : [str]
flat_id : [int]
dark_id : [int]
bias_id : [int]
lamp_id : [int]
arc_id : [int]
sky_id : [int]
filename : [str] filename : [str]
file_path : [str] file_path : [str]
prc_status : [int] prc_status : [int]
prc_time : [str] prc_time : [str]
pipeline_id : [str] pipeline_id : [str]
refs : [dict]
return csst_dfs_common.models.Result return csst_dfs_common.models.Result
''' '''
...@@ -182,17 +176,12 @@ class Level1DataApi(object): ...@@ -182,17 +176,12 @@ class Level1DataApi(object):
data_type = get_parameter(kwargs, "data_type"), data_type = get_parameter(kwargs, "data_type"),
cor_sci_id = get_parameter(kwargs, "cor_sci_id"), cor_sci_id = get_parameter(kwargs, "cor_sci_id"),
prc_params = get_parameter(kwargs, "prc_params"), prc_params = get_parameter(kwargs, "prc_params"),
flat_id = get_parameter(kwargs, "flat_id"),
dark_id = get_parameter(kwargs, "dark_id"),
bias_id = get_parameter(kwargs, "bias_id"),
lamp_id = get_parameter(kwargs, "lamp_id"),
arc_id = get_parameter(kwargs, "arc_id"),
sky_id = get_parameter(kwargs, "sky_id"),
filename = get_parameter(kwargs, "filename"), filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"), file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1), prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id") pipeline_id = get_parameter(kwargs, "pipeline_id"),
refs = get_parameter(kwargs, "refs", {})
) )
existed = self.db.exists( existed = self.db.exists(
"select * from ifs_level1_data where filename=?", "select * from ifs_level1_data where filename=?",
...@@ -202,14 +191,22 @@ class Level1DataApi(object): ...@@ -202,14 +191,22 @@ class Level1DataApi(object):
log.error(f'{rec.filename} has already been existed') log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed') return Result.error(message=f'{rec.filename} has already been existed')
now_str = format_time_ms(time.time())
self.db.execute( self.db.execute(
'INSERT INTO ifs_level1_data (level0_id,data_type,cor_sci_id,prc_params,flat_id,dark_id,bias_id,lamp_id,arc_id,sky_id,filename,file_path,qc1_status,prc_status,prc_time, create_time,pipeline_id) \ 'INSERT INTO ifs_level1_data (level0_id,data_type,cor_sci_id,prc_params,filename,file_path,qc1_status,prc_status,prc_time, create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', VALUES(?,?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.flat_id, rec.dark_id, rec.bias_id, rec.lamp_id, rec.arc_id, rec.sky_id, rec.filename, rec.file_path, -1, rec.prc_status,rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) (rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status,rec.prc_time, now_str, rec.pipeline_id,)
) )
self.db.end() self.db.end()
rec.id = self.db.last_row_id() rec.id = self.db.last_row_id()
if rec.refs.items():
sql_refs = "insert into ifs_level1_ref (level1_id,ref_type,cal_id) values "
values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()]
_ = self.db.execute(sql_refs + ",".join(values))
self.db.end()
rec.create_time = now_str
return Result.ok_data(data=rec) return Result.ok_data(data=rec)
except Exception as e: except Exception as e:
log.error(e) log.error(e)
......
...@@ -23,7 +23,6 @@ class Level1DataApi(object): ...@@ -23,7 +23,6 @@ class Level1DataApi(object):
parameter kwargs: parameter kwargs:
level0_id: [str] level0_id: [str]
data_type: [str] data_type: [str]
obs_type: [str]
create_time : (start, end), create_time : (start, end),
qc1_status : [int], qc1_status : [int],
prc_status : [int], prc_status : [int],
...@@ -74,28 +73,6 @@ class Level1DataApi(object): ...@@ -74,28 +73,6 @@ class Level1DataApi(object):
except Exception as e: except Exception as e:
return Result.error(message=str(e)) return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
id = [int],
level0_id = [str]
return dict or None
'''
try:
fits_id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from msc_level1_data where id=?", (fits_id,))
if r:
return Result.ok_data(data=Level1Record().from_dict(r))
else:
return Result.error(message=f"id:{fits_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def get(self, **kwargs): def get(self, **kwargs):
''' '''
parameter kwargs: parameter kwargs:
...@@ -180,17 +157,12 @@ class Level1DataApi(object): ...@@ -180,17 +157,12 @@ class Level1DataApi(object):
data_type : [str] data_type : [str]
cor_sci_id : [int] cor_sci_id : [int]
prc_params : [str] prc_params : [str]
flat_id : [int]
dark_id : [int]
bias_id : [int]
lamp_id : [int]
arc_id : [int]
sky_id : [int]
filename : [str] filename : [str]
file_path : [str] file_path : [str]
prc_status : [int] prc_status : [int]
prc_time : [str] prc_time : [str]
pipeline_id : [str] pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result return csst_dfs_common.models.Result
''' '''
...@@ -201,14 +173,12 @@ class Level1DataApi(object): ...@@ -201,14 +173,12 @@ class Level1DataApi(object):
data_type = get_parameter(kwargs, "data_type"), data_type = get_parameter(kwargs, "data_type"),
cor_sci_id = get_parameter(kwargs, "cor_sci_id"), cor_sci_id = get_parameter(kwargs, "cor_sci_id"),
prc_params = get_parameter(kwargs, "prc_params"), prc_params = get_parameter(kwargs, "prc_params"),
flat_id = get_parameter(kwargs, "flat_id"),
dark_id = get_parameter(kwargs, "dark_id"),
bias_id = get_parameter(kwargs, "bias_id"),
filename = get_parameter(kwargs, "filename"), filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"), file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1), prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())), prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id") pipeline_id = get_parameter(kwargs, "pipeline_id"),
refs = get_parameter(kwargs, "refs", {})
) )
existed = self.db.exists( existed = self.db.exists(
"select * from msc_level1_data where filename=?", "select * from msc_level1_data where filename=?",
...@@ -219,13 +189,19 @@ class Level1DataApi(object): ...@@ -219,13 +189,19 @@ class Level1DataApi(object):
return Result.error(message=f'{rec.filename} has already been existed') return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute( self.db.execute(
'INSERT INTO msc_level1_data (level0_id,data_type,cor_sci_id,prc_params,flat_id,dark_id,bias_id,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \ 'INSERT INTO msc_level1_data (level0_id,data_type,cor_sci_id,prc_params,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)', VALUES(?,?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.flat_id, rec.dark_id, rec.bias_id, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,) (rec.level0_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,)
) )
self.db.end() self.db.end()
rec.id = self.db.last_row_id() rec.id = self.db.last_row_id()
if rec.refs.items():
sql_refs = "insert into msc_level1_ref (level1_id,ref_type,cal_id) values "
values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()]
_ = self.db.execute(sql_refs + ",".join(values))
self.db.end()
return Result.ok_data(data=rec) return Result.ok_data(data=rec)
except Exception as e: except Exception as e:
log.error(e) log.error(e)
......
from .level1 import Level1DataApi
from .level2spectra import Level2SpectraApi
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.sls import Level1Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1DataApi(object):
def __init__(self, sub_system = "sls"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
level0_id: [str]
data_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
data_type = get_parameter(kwargs, "data_type")
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0]
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
filename = get_parameter(kwargs, "filename")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from sls_level1_data where 1=1"
sql_data = f"select * from sls_level1_data where 1=1"
sql_condition = ""
if level0_id:
sql_condition = f"{sql_condition} and level0_id='{level0_id}'"
if data_type:
sql_condition = f"{sql_condition} and data_type='{data_type}'"
if create_time_start:
sql_condition = f"{sql_condition} and create_time >='{create_time_start}'"
if create_time_end:
sql_condition = f"{sql_condition} and create_time <='{create_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if filename:
sql_condition = f" and filename='{filename}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
id = [int]
return csst_dfs_common.models.Result
'''
try:
id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from sls_level1_data where id=?", (id,))
if r:
return Result.ok_data(data=Level1Record().from_dict(r))
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update sls_level1_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of QC1
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update sls_level1_data set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
level0_id : [str]
data_type : [str]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result
'''
try:
rec = Level1Record(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
data_type = get_parameter(kwargs, "data_type"),
prc_params = get_parameter(kwargs, "prc_params"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
refs = get_parameter(kwargs, "refs", {})
)
existed = self.db.exists(
"select * from sls_level1_data where filename=?",
(rec.filename,)
)
if existed:
log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute(
'INSERT INTO sls_level1_data (level0_id,data_type,prc_params,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.data_type, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,)
)
self.db.end()
rec.id = self.db.last_row_id()
if rec.refs.items():
sql_refs = "insert into sls_level1_ref (level1_id,ref_type,cal_id) values "
values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()]
_ = self.db.execute(sql_refs + ",".join(values))
self.db.end()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.sls import Level2Spectra
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level2SpectraApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve records from database
parameter kwargs:
level1_id: [int]
spectra_id: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
level1_id = get_parameter(kwargs, "level1_id", 0)
spectra_id = get_parameter(kwargs, "spectra_id")
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0]
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
filename = get_parameter(kwargs, "filename")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from sls_level2_spectra where 1=1"
sql_data = f"select * from sls_level2_spectra where 1=1"
sql_condition = ""
if level1_id > 0:
sql_condition = f"{sql_condition} and level1_id={level1_id}"
if spectra_id:
sql_condition = f"{sql_condition} and spectra_id='{spectra_id}'"
if create_time_start:
sql_condition = f"{sql_condition} and create_time >='{create_time_start}'"
if create_time_end:
sql_condition = f"{sql_condition} and create_time <='{create_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if filename:
sql_condition = f" and filename='{filename}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level2Spectra, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
id = [int]
return csst_dfs_common.models.Result
'''
try:
id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from sls_level2_spectra where id=?", (id,))
if r:
return Result.ok_data(data=Level2Spectra().from_dict(r))
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_level2_spectra where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update sls_level2_spectra set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of QC1
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_level2_spectra where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update sls_level2_spectra set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
level1_id: [int]
spectra_id : [str]
region : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
return csst_dfs_common.models.Result
'''
try:
rec = Level2Spectra(
id = 0,
level1_id = get_parameter(kwargs, "level1_id"),
spectra_id = get_parameter(kwargs, "spectra_id"),
region = get_parameter(kwargs, "region"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id")
)
existed = self.db.exists(
"select * from sls_level2_spectra where filename=?",
(rec.filename,)
)
if existed:
log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute(
'INSERT INTO sls_level2_spectra (level1_id,spectra_id,region,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?,?)',
(rec.level1_id, rec.spectra_id, rec.region, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.facility.level1prc import Level1PrcApi
class Level1PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1PrcApi()
def test_find(self):
recs = self.api.find(level1_id=1)
print('find:', recs)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_write(self):
rec = self.api.write(level1_id=1,
pipeline_id = "P1",
prc_module = "QC0",
params_file_path = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
result_file_path = "/opt/dddasd.header")
print('write:', rec)
\ No newline at end of file
...@@ -32,15 +32,10 @@ class IFSResult1TestCase(unittest.TestCase): ...@@ -32,15 +32,10 @@ class IFSResult1TestCase(unittest.TestCase):
data_type = "sci", data_type = "sci",
cor_sci_id = 2, cor_sci_id = 2,
prc_params = "/opt/dddasd.params", prc_params = "/opt/dddasd.params",
flat_id = 1,
dark_id = 2,
bias_id = 3,
lamp_id = 4,
arc_id = 5,
sky_id = 6,
prc_status = 3, prc_status = 3,
prc_time = '2021-06-05 11:12:13', prc_time = '2021-06-05 11:12:13',
filename = "dddasd23", filename = "dddasd223234.fits",
file_path = "/opt/dddasd23.fits", file_path = "/opt/dddasd23.fits",
pipeline_id = "P2") pipeline_id = "P2",
refs = {'dark': 1, 'bias': 2, 'flat': 3 })
print('write:', rec) print('write:', rec)
\ No newline at end of file
...@@ -31,12 +31,10 @@ class IFSResult1TestCase(unittest.TestCase): ...@@ -31,12 +31,10 @@ class IFSResult1TestCase(unittest.TestCase):
data_type = "sci", data_type = "sci",
cor_sci_id = 2, cor_sci_id = 2,
prc_params = "/opt/dddasd.params", prc_params = "/opt/dddasd.params",
flat_id = 1,
dark_id = 2,
bias_id = 3,
prc_status = 3, prc_status = 3,
prc_time = '2021-06-05 11:12:13', prc_time = '2021-06-05 11:12:13',
filename = "dddasd", filename = "dddasd",
file_path = "/opt/dddasd.fits", file_path = "/opt/dddasd.fits",
pipeline_id = "P2") pipeline_id = "P2",
refs = {'dark': 1, 'bias': 2, 'flat': 3 })
print('write:', rec) print('write:', rec)
\ No newline at end of file
import unittest
from csst_dfs_api_local.sls import Level1DataApi
class SLSResult1TestCase(unittest.TestCase):
def setUp(self):
self.api = Level1DataApi()
def test_find(self):
recs = self.api.find(
level0_id='0000223',
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")
)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 1)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 1, status = 7)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
level0_id='0000223',
data_type = "sci",
prc_params = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-05 11:12:13',
filename = "dddasd223234.fits",
file_path = "/opt/dddasd23.fits",
pipeline_id = "P2",
refs = {'dark': 1, 'bias': 2, 'flat': 3 })
print('write:', rec)
\ No newline at end of file
import unittest
from csst_dfs_api_local.sls import Level2SpectraApi
class SLSLevel2SpectraTestCase(unittest.TestCase):
def setUp(self):
self.api = Level2SpectraApi()
def test_find(self):
recs = self.api.find(
level1_id=1,
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")
)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 1)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 1, status = 7)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
level1_id=2,
spectra_id = "222",
region = "[12,13,24,24]",
prc_status = 3,
prc_time = '2021-06-05 11:12:13',
filename = "dddasd223234.fits",
file_path = "/opt/dddasd23.fits",
pipeline_id = "P2")
print('write:', rec)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment