Commit ea741fe5 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

c3

parent 66ee8623
from csst_dfs_commons.logging import setup_logging
setup_logging()
\ No newline at end of file
...@@ -42,6 +42,9 @@ class DBClient(object): ...@@ -42,6 +42,9 @@ class DBClient(object):
for s in statements: for s in statements:
self.execute(s) self.execute(s)
def last_row_id(self):
return self._cursor.lastrowid
def select_one(self, sql, param=()): def select_one(self, sql, param=()):
"""查询单个结果""" """查询单个结果"""
_ = self.__execute(sql, param) _ = self.__execute(sql, param)
......
drop table if exists ifs_level1_data;
drop table if exists ifs_level1_header;
drop table if exists msc_level1_data;
drop table if exists msc_level1_header;
drop table if exists t_cal2level0;
drop table if exists t_cal_header;
drop table if exists t_cal_merge;
drop table if exists t_detector;
drop table if exists t_detector_status;
drop table if exists t_facility_status;
drop table if exists t_guiding;
drop table if exists t_level0_data;
drop table if exists t_level0_header;
drop table if exists t_level0_prc;
drop table if exists t_module_status;
drop table if exists t_observation;
/*==============================================================*/
/* Table: ifs_level1_data */
/*==============================================================*/
create table ifs_level1_data
(
id integer PRIMARY KEY autoincrement,
raw_id int(20) not null,
data_type varchar(64) not null,
cor_sci_id int(20),
prc_params varchar(1024),
flat_id int(20),
dark_id int(20),
bias_id int(20),
lamp_id int(20),
arc_id int(20),
sky_id int(20),
filename varchar(128),
file_path varchar(256),
prc_status tinyint(1),
prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
);
/*==============================================================*/
/* Table: ifs_level1_header */
/*==============================================================*/
create table ifs_level1_header
(
id int(20) not null,
obs_time datetime,
exp_time float,
ra float,
"dec" float,
create_time datetime,
primary key (id)
);
/*==============================================================*/ /*==============================================================*/
/* Table: ifs_result_0_1 */ /* Table: msc_level1_data */
/*==============================================================*/ /*==============================================================*/
create table ifs_result_0_1 ( create table msc_level1_data
result0_id INT not null, (
result1_id INT not null, id integer PRIMARY KEY autoincrement,
create_time DATETIME null, raw_id int(20) not null,
primary key (result0_id, result1_id) data_type varchar(64) not null,
cor_sci_id int(20),
prc_params varchar(1024),
flat_id int(20),
dark_id int(20),
bias_id int(20),
filename varchar(128),
file_path varchar(256),
prc_status tinyint(1),
prc_time datetime,
qc1_status tinyint(1),
qc1_time datetime,
create_time datetime,
pipeline_id varchar(60)
); );
/*==============================================================*/ /*==============================================================*/
/* Table: ifs_raw_ref */ /* Table: msc_level1_header */
/*==============================================================*/ /*==============================================================*/
create table ifs_raw_ref ( create table msc_level1_header
fit_id INT not null, (
ref_id INT not null, id int(20) not null,
create_time DATETIME null, obs_time datetime,
primary key (fit_id, ref_id) exp_time float,
ra float,
"dec" float,
create_time datetime,
primary key (id)
); );
/*==============================================================*/ /*==============================================================*/
/* Table: ifs_rawfits */ /* Table: t_cal2level0 */
/*==============================================================*/ /*==============================================================*/
create table ifs_rawfits ( create table t_cal2level0
id integer PRIMARY KEY autoincrement, (
filename VARCHAR(100) null, merge_id int(20) not null,
obs_time INT null, level0_id int(20) not null,
ccd_num INT null, primary key (merge_id, level0_id)
exp_time DATETIME null,
file_path VARCHAR(128) null,
qc0_status tinyint(1) null,
qc0_time DATETIME null,
prc_status tinyint(1) null,
prc_time DATETIME null,
create_time DATETIME null
); );
/*==============================================================*/ /*==============================================================*/
/* Table: ifs_ref_fits */ /* Table: t_cal_header */
/*==============================================================*/ /*==============================================================*/
create table ifs_ref_fits ( create table t_cal_header
id integer PRIMARY KEY autoincrement, (
filename VARCHAR(128) null, id int(20) not null,
obs_time INT null, obs_time datetime,
exp_time DATETIME null, exp_time float,
ccd_num INT null, ra float,
file_path VARCHAR(256) null, "dec" float,
ref_type VARCHAR(32) null, create_time datetime,
create_time DATETIME null, primary key (id)
status tinyint(1) null
); );
/*==============================================================*/ /*==============================================================*/
/* Table: ifs_result_0 */ /* Table: t_cal_merge */
/*==============================================================*/ /*==============================================================*/
create table ifs_result_0 ( create table t_cal_merge
id integer PRIMARY KEY autoincrement, (
filename VARCHAR(100) null, id integer PRIMARY KEY autoincrement,
raw_id INT null, detector_no varchar(64) not null,
file_path VARCHAR(128) null, ref_type varchar(16),
create_time DATETIME null, obs_time datetime,
proc_type VARCHAR(32) null exp_time float,
filename varchar(128),
file_path varchar(256),
qc1_status tinyint(1),
qc1_time datetime,
prc_status tinyint(1),
prc_time datetime,
create_time datetime
); );
/*==============================================================*/ /*==============================================================*/
/* Table: ifs_result_1 */ /* Table: t_detector */
/*==============================================================*/ /*==============================================================*/
create table ifs_result_1 ( create table t_detector
id integer PRIMARY KEY autoincrement, (
filename VARCHAR(100) null, no varchar(64) not null,
file_path VARCHAR(128) null, detector_name varchar(256) not null,
create_time DATETIME null, module_id varchar(20),
proc_type VARCHAR(32) null filter_id varchar(20),
create_time datetime,
update_time datetime,
primary key (no)
); );
/*==============================================================*/
/* Table: t_detector_status */
/*==============================================================*/
create table t_detector_status
(
id integer PRIMARY KEY autoincrement,
detector_no varchar(64) not null,
status varchar(256) not null,
status_time datetime,
create_time datetime
);
/*==============================================================*/
/* Table: t_facility_status */
/*==============================================================*/
create table t_facility_status
(
id integer PRIMARY KEY autoincrement,
status varchar(256) not null,
status_time datetime,
create_time datetime
);
/*==============================================================*/
/* Table: t_guiding */
/*==============================================================*/
create table t_guiding
(
id integer PRIMARY KEY autoincrement,
filename varbinary(128),
guiding_file_path varchar(256) not null,
guiding_no varchar(256),
create_time datetime
);
/*==============================================================*/
/* Table: t_level0_data */
/*==============================================================*/
create table t_level0_data
(
id integer PRIMARY KEY autoincrement,
obs_id int(20) not null,
detector_no varchar(64) not null,
obs_type varchar(16),
obs_time datetime,
exp_time float,
detector_status_id int(20),
filename varchar(128),
file_path varchar(256),
qc0_status tinyint(1),
qc0_time datetime,
prc_status tinyint(1),
prc_time datetime,
create_time datetime
);
/*==============================================================*/
/* Table: t_level0_header */
/*==============================================================*/
create table t_level0_header
(
id int(20) not null,
obs_time datetime,
exp_time float,
ra float,
"dec" float,
create_time datetime,
primary key (id)
);
/*==============================================================*/
/* Table: t_level0_prc */
/*==============================================================*/
create table t_level0_prc
(
id integer PRIMARY KEY autoincrement,
level0_id int(20) not null,
pipeline_id varchar(64) not null,
prc_module varchar(32) not null,
params_id varchar(256),
prc_status int(2),
prc_time datetime,
file_path varchar(256)
);
/*==============================================================*/
/* Table: t_module_status */
/*==============================================================*/
create table t_module_status
(
id integer PRIMARY KEY autoincrement,
module_id varbinary(20),
status varchar(256) not null,
status_time datetime,
create_time datetime
);
/*==============================================================*/
/* Table: t_observation */
/*==============================================================*/
create table t_observation
(
id integer PRIMARY KEY autoincrement,
obs_time datetime,
exp_time float,
module_id varchar(20),
obs_type varchar(16),
facility_status_id int(20),
module_status_id int(20),
qc0_status tinyint(1),
qc0_time datetime,
prc_status tinyint(1),
prc_time datetime,
create_time datetime,
import_status tinyint(1)
);
import os, sys
import argparse
import logging
from astropy.io import fits
import datetime
import shutil
from csst_dfs_api_local.common.db import DBClient
log = logging.getLogger('csst-dfs-api-local')
def ingest():
db = DBClient()
parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files")
parser.add_argument('-i','--infile', dest="infile", help="a file or a directory")
parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="move files after import")
args = parser.parse_args(sys.argv[1:])
import_root_dir = args.infile
if os.path.isfile(import_root_dir):
log.info(f"prepare import {import_root_dir}")
ingesst_one(import_root_dir, db, args.copyfiles)
if os.path.isdir(import_root_dir):
for (path, _, file_names) in os.walk(import_root_dir):
for filename in file_names:
if filename.find(".fits") > 0:
file_full_path = os.path.join(path, filename)
log.info(f"prepare import {file_full_path}")
ingesst_one(file_full_path, db, args.copyfiles)
db.close()
def ingesst_one(file_path, db, copyfiles):
dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
hdul = fits.open(file_path)
header = hdul[0].header
obs_id = header["OBSID"]
exp_start_time = f"{header['DATE-OBS']} {header['TIME-OBS']}"
exp_time = header['EXPTIME']
module_id = header["INSTRUME"]
obs_type = header["FILETYPE"]
qc0_status = 0
prc_status = 0
time_now = datetime.datetime.now()
create_time = time_now.strftime('%Y-%m-%d %H:%M:%S')
facility_status_id = 0
module_status_id = 0
existed = db.exists("select * from t_observation where id=?", (obs_id,))
if not existed:
db.execute("insert into t_observation \
(id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?)",
(obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time))
#level0
detector = header["DETECTOR"]
filename = header["FILENAME"]
existed = db.exists(
"select * from t_level0_data where filename=?",
(filename,)
)
if existed:
log.warning('%s has already been imported' %(file_path, ))
db.end()
return
detector_status_id = 0
file_full_path = file_path
if copyfiles:
obs_id_str = "%07d" % (obs_id)
file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id_str}/MS"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
file_full_path = f"{file_dir}/{filename}.fits"
c = db.execute("insert into t_level0_data \
(obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?,?)",
(obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time))
db.end()
level0_id = db.last_row_id()
#level0-header
ra_obj = header["RA_OBJ"]
dec_obj = header["DEC_OBJ"]
c = db.execute("insert into t_level0_header \
(id, obs_time, exp_time, ra, `dec`, create_time) \
values (?,?,?,?,?,?)",
(level0_id, exp_start_time, exp_time, ra_obj, dec_obj, create_time))
if copyfiles:
#copy files
shutil.copyfile(file_path, file_full_path)
db.end()
print(f"{file_path} imported")
if __name__ == "__main__":
ingest()
\ No newline at end of file
from .calmerge import CalMergeApi
from .level0 import Level0DataApi
from .detector import DetectorApi
from .level0prc import Level0PrcApi
from .observation import ObservationApi
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import CalMergeRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class CalMergeApi(object):
def __init__(self):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve calibration merge records from database
parameter kwargs:
detector_no: [str]
ref_type: [str]
obs_time: (start,end)
qc1_status : [int]
prc_status : [int]
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
detector_no = get_parameter(kwargs, "detector_no")
ref_type = get_parameter(kwargs, "ref_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from t_cal_merge where 1=1"
sql_data = f"select * from t_cal_merge where 1=1"
sql_condition = ""
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if ref_type:
sql_condition = f"{sql_condition} and ref_type='{ref_type}'"
if exp_time_start:
sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
if exp_time_end:
sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if file_name:
sql_condition = f" and filename={file_name}"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
log.info(sql_count)
log.info(sql_data)
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(CalMergeRecord, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try:
id = get_parameter(kwargs, "id")
r = self.db.select_one(
"select * from t_cal_merge where id=?", (id,))
if r:
sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}"
_, records = self.db.select_many(sql_get_level0_id)
level0_ids = [r["level0_id"] for r in records]
rec = CalMergeRecord().from_dict(r)
rec.level0_ids = level0_ids
return Result.ok_data(data=rec)
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from t_cal_merge where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update t_cal_merge set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=e.message)
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from t_cal_merge where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update t_cal_merge set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=e.message)
def write(self, **kwargs):
''' insert a calibration merge record into database
parameter kwargs:
id : [int]
detector_no : [str]
ref_type : [str]
obs_time : [str]
exp_time : [float]
prc_status : [int]
prc_time : [str]
filename : [str]
file_path : [str]
level0_ids : [list]
return csst_dfs_common.models.Result
'''
rec = CalMergeRecord(
id = 0,
detector_no = get_parameter(kwargs, "detector_no"),
ref_type = get_parameter(kwargs, "ref_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status"),
prc_time = get_parameter(kwargs, "prc_time"),
level0_ids = get_parameter(kwargs, "level0_ids", [])
)
try:
self.db.execute(
'INSERT INTO t_cal_merge (detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \
VALUES(?,?,?,?,?,?,?,?,?)',
(rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
sql_level0_ids = "insert into t_cal2level0 (merge_id,level0_id) values "
values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids]
_ = self.db.execute(sql_level0_ids + ",".join(values))
self.db.end()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import Detector, DetectorStatus
from csst_dfs_commons.models.common import from_dict_list
from ..common.db import DBClient
from ..common.utils import *
log = logging.getLogger('csst')
class DetectorApi(object):
def __init__(self):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve detector records from database
parameter kwargs:
module_id: [str]
key: [str]
return: csst_dfs_common.models.Result
'''
try:
module_id = get_parameter(kwargs, "module_id")
key = get_parameter(kwargs, "key","")
sql_data = f"select * from t_detector where 1=1"
sql_condition = ""
if module_id:
sql_condition = f"{sql_condition} and module_id='{module_id}'"
sql_condition = f"{sql_condition} and (`no` like ? or detector_name like ?)"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data,(f'%{key}%',f'%{key}%'))
return Result.ok_data(data=from_dict_list(Detector, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
no : [str]
return csst_dfs_common.models.Result
'''
try:
no = get_parameter(kwargs, "no")
r = self.db.select_one(
"select * from t_detector where no=?", (no,))
if r:
return Result.ok_data(data=Detector().from_dict(r))
else:
return Result.error(message=f"detector no:{no} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update(self, **kwargs):
''' update a detector by no
parameter kwargs:
no : [str],
detector_name : [str],
module_id : [str],
filter_id : [str]
return csst_dfs_common.models.Result
'''
try:
no = get_parameter(kwargs, "no")
result_get = self.get(no=no)
if not result_get.success:
return result_get
detector_name = get_parameter(kwargs, "detector_name", result_get.data.detector_name),
module_id = get_parameter(kwargs, "module_id", result_get.data.module_id),
filter_id = get_parameter(kwargs, "filter_id", result_get.data.filter_id)
sql_data = f"update t_detector set detector_name='{detector_name}',\
module_id='{module_id}',\
filter_id='{filter_id}',\
update_time=now() where `no`='{no}'"
_ = self.db.execute(sql_data)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def delete(self, **kwargs):
''' delete a detector by no
parameter kwargs:
no : [str]
return csst_dfs_common.models.Result
'''
try:
no = get_parameter(kwargs, "no")
sql_data = f"delete from t_detector where `no`='{no}'"
_ = self.db.execute(sql_data)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a detector record into database
parameter kwargs:
no : [str],
detector_name : [str],
module_id : [str],
filter_id : [str]
return csst_dfs_common.models.Result
'''
rec = Detector(
no = get_parameter(kwargs, "no"),
detector_name = get_parameter(kwargs, "detector_name"),
module_id = get_parameter(kwargs, "module_id"),
filter_id = get_parameter(kwargs, "filter_id"),
create_time = format_time_ms(time.time())
)
try:
existed = self.db.exists(
"select * from t_detector where no=?",
(rec.no,)
)
if existed:
log.warning('%s existed' %(rec.no, ))
return Result.error(message ='%s existed' %(rec.no, ))
self.db.execute(
'INSERT INTO t_detector (`no`,detector_name,module_id,filter_id,create_time) \
VALUES(?,?,?,?,?)',
(rec.no, rec.detector_name, rec.module_id, rec.filter_id,rec.create_time)
)
self.db.end()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def find_status(self, **kwargs):
''' retrieve a detector status's from database
parameter kwargs:
detector_no: [str]
status_occur_time: (begin,end)
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
detector_no = get_parameter(kwargs, "detector_no")
status_begin_time = get_parameter(kwargs, "status_occur_time", [None, None])[0]
status_end_time = get_parameter(kwargs, "status_occur_time", [None, None])[1]
limit = get_parameter(kwargs, "limit", 0)
sql_count = f"select count(*) as c from t_detector_status where detector_no='{detector_no}'"
sql_data = f"select * from t_detector_status where detector_no='{detector_no}'"
sql_condition = ""
if status_begin_time:
sql_condition = f"{sql_condition} and status_time >='{status_begin_time}'"
if status_end_time:
sql_condition = f"{sql_condition} and status_time <='{status_end_time}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.mysql_client.select_one(sql_count)
_, records = self.mysql_client.select_many(sql_data)
return Result.ok_data(data=from_dict_list(DetectorStatus, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get_status(self, **kwargs):
''' fetch a record from database
parameter kwargs:
id : [int]
return csst_dfs_common.models.Result
'''
try:
id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from t_detector_status where id=?", (id,))
if r:
return Result.ok_data(data=DetectorStatus().from_dict(r))
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write_status(self, **kwargs):
''' insert a detector status into database
parameter kwargs:
detector_no : [str],
status : [str],
status_time : [str]
return csst_dfs_common.models.Result
'''
rec = DetectorStatus(
id = 0,
detector_no = get_parameter(kwargs, "detector_no"),
status = get_parameter(kwargs, "status"),
status_time = get_parameter(kwargs, "status_time")
)
try:
self.db.execute(
'INSERT INTO t_detector_status (detector_no,status,status_time,create_time) \
VALUES(?,?,?,?)',
(rec.detector_no, rec.status, rec.status_time,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import Level0Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level0DataApi(object):
def __init__(self):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level0 records from database
parameter kwargs:
obs_id: [int]
detector_no: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
file_name: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
obs_id = get_parameter(kwargs, "obs_id")
detector_no = get_parameter(kwargs, "detector_no")
obs_type = get_parameter(kwargs, "obs_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc0_status = get_parameter(kwargs, "qc0_status")
prc_status = get_parameter(kwargs, "prc_status")
file_name = get_parameter(kwargs, "file_name")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from t_level0_data where 1=1"
sql_data = f"select * from t_level0_data where 1=1"
sql_condition = ""
if obs_id > 0:
sql_condition = f"{sql_condition} and obs_id={obs_id}"
if detector_no:
sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
if obs_type:
sql_condition = f"{sql_condition} and obs_type='{obs_type}'"
if exp_time_start:
sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
if exp_time_end:
sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
if qc0_status:
sql_condition = f"{sql_condition} and qc0_status={qc0_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if file_name:
sql_condition = f" and filename='{file_name}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level0Record, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
''' fetch a record from database
parameter kwargs:
fits_id : [int]
return csst_dfs_common.models.Result
'''
try:
fits_id = get_parameter(kwargs, "fits_id", -1)
r = self.db.select_one(
"select * from t_level0_data where id=?", (fits_id,))
if r:
return Result.ok_data(data=Level0Record().from_dict(r))
else:
return Result.error(message=f"fits_id:{fits_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
fits_id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from t_level0_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update t_level0_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc0_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
fits_id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from t_level0_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update t_level0_data set qc0_status=?, qc0_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level0 data record into database
parameter kwargs:
obs_id = [int]
detector_no = [str]
obs_type = [str]
obs_time = [str]
exp_time = [int]
detector_status_id = [int]
filename = [str]
file_path = [str]
return: csst_dfs_common.models.Result
'''
rec = Level0Record(
obs_id = get_parameter(kwargs, "obs_id"),
detector_no = get_parameter(kwargs, "detector_no"),
obs_type = get_parameter(kwargs, "obs_type"),
obs_time = get_parameter(kwargs, "obs_time"),
exp_time = get_parameter(kwargs, "exp_time"),
detector_status_id = get_parameter(kwargs, "detector_status_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path")
)
try:
existed = self.db.exists(
"select * from t_level0_data where filename=?",
(rec.filename,)
)
if existed:
log.warning('%s existed' %(rec.filename, ))
return Result.error(message ='%s existed' %(rec.filename, ))
self.db.execute(
'INSERT INTO t_level0_data (obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
VALUES(?,?,?,?,?,?,?,?,?,?,?)',
(rec.obs_id, rec.detector_no, rec.obs_type, rec.obs_time, rec.exp_time, rec.detector_status_id, rec.filename, rec.file_path,0,0,format_time_ms(time.time()))
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import Level0PrcRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level0PrcApi(object):
def __init__(self):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level0 procedure records from database
parameter kwargs:
level0_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
pipeline_id = get_parameter(kwargs, "pipeline_id")
prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from t_level0_prc"
sql_condition = f"where level0_id={level0_id}"
if pipeline_id:
sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'"
if prc_module:
sql_condition = sql_condition + " and prc_module ='" + prc_module + "'"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level0PrcRecord, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from t_level0_prc where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update t_level0_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level0 procedure record into database
parameter kwargs:
level0_id : [int]
pipeline_id : [str]
prc_module : [str]
params_id : [str]
prc_status : [int]
prc_time : [str]
file_path : [str]
return csst_dfs_common.models.Result
'''
rec = Level0PrcRecord(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_id = get_parameter(kwargs, "params_id"),
prc_status = get_parameter(kwargs, "prc_status"),
prc_time = get_parameter(kwargs, "prc_time"),
file_path = get_parameter(kwargs, "file_path")
)
try:
self.db.execute(
'INSERT INTO t_level0_prc (level0_id,pipeline_id,prc_module, params_id,prc_status,prc_time,file_path) \
VALUES(?,?,?,?,?,?,?)',
(rec.level0_id, rec.pipeline_id, rec.prc_module, rec.params_id, rec.prc_status, rec.prc_time, rec.file_path)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import Observation
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class ObservationApi(object):
def __init__(self):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve exposure records from database
parameter kwargs:
module_id: [str]
obs_type: [str]
obs_time : (start, end),
qc0_status : [int],
prc_status : [int],
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
module_id = get_parameter(kwargs, "module_id")
obs_type = get_parameter(kwargs, "obs_type")
exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]
qc0_status = get_parameter(kwargs, "qc0_status")
prc_status = get_parameter(kwargs, "prc_status")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from t_observation"
sql_data = f"select * from t_observation"
sql_condition = "where module_id='" + module_id + "'"
if obs_type:
sql_condition = sql_condition + " and obs_type='" + obs_type + "'"
if exp_time_start:
sql_condition = sql_condition + " and obs_time >='" + exp_time_start + "'"
if exp_time_end:
sql_condition = sql_condition + " and obs_time <='" + exp_time_end + "'"
if qc0_status:
sql_condition = f"{sql_condition} and qc0_status={qc0_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Observation, records)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
obs_id = [int]
return dict or None
'''
try:
obs_id = get_parameter(kwargs, "obs_id", -1)
r = self.db.select_one(
"select * from t_observation where id=?", (obs_id,))
if r:
return Result.ok_data(data=Observation().from_dict(r))
else:
return Result.error(message=f"obs_id:{obs_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
obs_id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
obs_id = get_parameter(kwargs, "obs_id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from t_observation where id=?",
(obs_id,)
)
if not existed:
log.warning('%s not found' %(obs_id, ))
return Result.error(message ='%s not found' %(obs_id, ))
self.db.execute(
'update t_observation set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), obs_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc0_status(self, **kwargs):
''' update the status of QC0
parameter kwargs:
obs_id : [int],
status : [int]
'''
obs_id = get_parameter(kwargs, "obs_id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from t_observation where id=?",
(obs_id,)
)
if not existed:
log.warning('%s not found' %(obs_id, ))
return Result.error(message ='%s not found' %(obs_id, ))
self.db.execute(
'update t_observation set qc0_status=?, qc0_time=? where id=?',
(status, format_time_ms(time.time()), obs_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a observational record into database
parameter kwargs:
obs_id = [int]
obs_time = [str]
exp_time = [int]
module_id = [str]
obs_type = [str]
facility_status_id = [int]
module_status_id = [int]
return: csst_dfs_common.models.Result
'''
obs_id = get_parameter(kwargs, "obs_id", 0)
obs_time = get_parameter(kwargs, "obs_time")
exp_time = get_parameter(kwargs, "exp_time")
module_id = get_parameter(kwargs, "module_id")
obs_type = get_parameter(kwargs, "obs_type")
facility_status_id = get_parameter(kwargs, "facility_status_id")
module_status_id = get_parameter(kwargs, "module_status_id")
try:
if obs_id == 0:
r = self.db.select_one("select max(id) as max_id from t_observation")
obs_id = r["max_id"]+1
existed = self.db.exists(
"select * from t_observation where id=?",
(obs_id,)
)
if existed:
log.warning('%s existed' %(obs_id, ))
return Result.error(message ='%s existed' %(obs_id, ))
self.db.execute(
'INSERT INTO t_observation (id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status,create_time) \
VALUES(?,?,?,?,?,?,?,?,?)',
(obs_id, obs_time, exp_time, module_id, obs_type, facility_status_id, module_status_id,0,format_time_ms(time.time()))
)
self.db.end()
return self.get(obs_id = obs_id)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
from .fits import FitsApi from .level1 import Level1DataApi
from .reffits import RefFitsApi \ No newline at end of file
from .result0 import Result0Api
from .result1 import Result1Api
def ingest():
import os
root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
paths = {}
fitsApi = FitsApi()
refApi = RefFitsApi()
for (path, _, file_names) in os.walk(root_dir):
for filename in file_names:
if filename.find(".fits") > 0:
file_full_path = os.path.join(path, filename)
file_type = 'None'
if 'obs' in filename.lower():
file_type = 'obs'
elif 'flat' in filename.lower():
file_type = 'flat'
elif 'bias' in filename.lower():
file_type = 'bias'
elif 'hgar' in filename.lower():
file_type = 'arc'
elif 'sky' in filename.lower():
file_type = 'sky'
file_path = file_full_path.replace(root_dir, '')
if file_path.index("/") == 0:
file_path = file_path[1:]
if file_type in ['obs']:
fitsApi.import2db(file_path = file_path)
print("%s [type:%s] imported" %(file_full_path, file_type))
if file_type in ['flat', 'bias', 'arc','hgar', 'sky']:
refApi.import2db(file_path = file_path)
print("%s [type:%s] imported" %(file_full_path, file_type))
return paths
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.ifs import Level1Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1DataApi(object):
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
raw_id: [int]
data_type: [str]
obs_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
raw_id = get_parameter(kwargs, "raw_id")
data_type = get_parameter(kwargs, "data_type")
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0]
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
filename = get_parameter(kwargs, "filename")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from ifs_level1_data where 1=1"
sql_data = f"select * from ifs_level1_data where 1=1"
sql_condition = ""
if raw_id:
sql_condition = f"{sql_condition} and raw_id={raw_id}"
if data_type:
sql_condition = f"{sql_condition} and data_type='{data_type}'"
if create_time_start:
sql_condition = f"{sql_condition} and create_time >='{create_time_start}'"
if create_time_end:
sql_condition = f"{sql_condition} and create_time <='{create_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if filename:
sql_condition = f" and filename='{filename}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=e.message)
def get(self, **kwargs):
'''
parameter kwargs:
id = [int]
return dict or None
'''
try:
fits_id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from ifs_level1_data where id=?", (fits_id,))
if r:
return Result.ok_data(data=Level1Record().from_dict(r))
else:
return Result.error(message=f"id:{fits_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=e.message)
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from ifs_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update ifs_level1_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=e.message)
def update_qc1_status(self, **kwargs):
''' update the status of QC1
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from ifs_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update ifs_level1_data set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=e.message)
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
raw_id : [int]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
flat_id : [int]
dark_id : [int]
bias_id : [int]
lamp_id : [int]
arc_id : [int]
sky_id : [int]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
return csst_dfs_common.models.Result
'''
try:
rec = Level1Record(
id = 0,
raw_id = get_parameter(kwargs, "raw_id"),
data_type = get_parameter(kwargs, "data_type"),
cor_sci_id = get_parameter(kwargs, "cor_sci_id"),
prc_params = get_parameter(kwargs, "prc_params"),
flat_id = get_parameter(kwargs, "flat_id"),
dark_id = get_parameter(kwargs, "dark_id"),
bias_id = get_parameter(kwargs, "bias_id"),
lamp_id = get_parameter(kwargs, "lamp_id"),
arc_id = get_parameter(kwargs, "arc_id"),
sky_id = get_parameter(kwargs, "sky_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status"),
prc_time = get_parameter(kwargs, "prc_time"),
pipeline_id = get_parameter(kwargs, "pipeline_id")
)
existed = self.db.exists(
"select * from ifs_level1_data where filename=?",
(rec.filename,)
)
if existed:
log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute(
'INSERT INTO ifs_level1_data (raw_id,data_type,cor_sci_id,prc_params,flat_id,dark_id,bias_id,lamp_id,arc_id,sky_id,filename,file_path,qc1_status,prc_status,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)',
(rec.raw_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.flat_id, rec.dark_id, rec.bias_id, rec.lamp_id, rec.arc_id, rec.sky_id, rec.filename, rec.file_path, 0, 0, format_time_ms(time.time()),rec.pipeline_id,)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=e.message)
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
log = logging.getLogger('csst')
class Result1Api(object):
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.check_dir()
self.db = DBClient()
def check_dir(self):
if not os.path.exists(self.root_dir):
os.mkdir(self.root_dir)
log.info("using [%s] as root directory", self.root_dir)
if not os.path.exists(os.path.join(self.root_dir, "results1")):
os.mkdir(os.path.join(self.root_dir, "results1"))
def find(self, **kwargs):
'''
parameter kwargs:
file_name = [str],
proc_type = [str]
return list of level 1 record
'''
paths = []
file_name = get_parameter(kwargs, "file_name")
proc_type = get_parameter(kwargs, "proc_type", "default")
sql = ["select * from ifs_result_1 where proc_type='" + proc_type + "'"]
if file_name:
sql = ["select * from ifs_result_1 where filename='" + file_name + "'"]
_, recs = self.db.select_many("".join(sql))
for r in recs:
r['file_path'] = os.path.join(self.root_dir, r['file_path'])
return recs
def get(self, **kwargs):
'''
parameter kwargs:
fits_id = [int]
return dict or None
'''
fits_id = get_parameter(kwargs, "fits_id", -1)
r = self.db.select_one(
"select * from ifs_result_1 where id=?", (fits_id,))
_, result0s = self.db.select_many(
"select result0_id, create_time from ifs_result_0_1 where result1_id=?", (fits_id,))
if r:
r['file_path'] = os.path.join(self.root_dir, r['file_path'])
return r, result0s
def read(self, **kwargs):
''' yield bytes of fits file
parameter kwargs:
fits_id = [int],
file_path = [str],
chunk_size = [int] default 20480
yield bytes of fits file
'''
fits_id = get_parameter(kwargs, "fits_id")
file_path = get_parameter(kwargs, "file_path")
if fits_id is None and file_path is None:
raise Exception("fits_id or file_path need to be defined")
if fits_id is not None:
r = self.db.select_one(
"select * from ifs_result_1 where id=?", (fits_id))
if r is not None:
file_path = r["file_path"]
if file_path is not None:
chunk_size = get_parameter(kwargs, "chunk_size", 20480)
return yield_file_bytes(os.path.join(self.root_dir, file_path), chunk_size)
def write(self, **kwargs):
''' copy a local level 1 file to file storage, and insert a record into database
parameter kwargs:
file_path = [str],
proc_type = [str],
result0_ids = [list]
insert into database
'''
file_path = get_parameter(kwargs, "file_path")
proc_type = get_parameter(kwargs, "proc_type", "default")
result0_ids = get_parameter(kwargs, "result0_ids", [])
if file_path is None:
raise Exception("file_path need to be defined")
new_file_dir = create_dir(os.path.join(self.root_dir, "results1"),
self.sub_system,
"/".join([str(datetime.now().year),"%02d"%(datetime.now().month),"%02d"%(datetime.now().day)]))
file_basename = os.path.basename(file_path)
new_file_path = os.path.join(new_file_dir, file_basename)
shutil.copyfile(file_path, new_file_path)
file_path = new_file_path.replace(self.root_dir, '')
if file_path.index("/") == 0:
file_path = file_path[1:]
self.db.execute(
'INSERT INTO ifs_result_1 (filename, file_path, proc_type, create_time) \
VALUES(?,?,?,?)',
(file_basename, file_path, proc_type, format_time_ms(time.time()),)
)
self.db.end()
result1_id = 1
for id0 in result0_ids:
self.db.execute(
'INSERT INTO ifs_result_0_1 (result0_id, result1_id, create_time) \
VALUES(?,?,?)',
(id0, result1_id, format_time_ms(time.time()))
)
self.db.end()
log.info("result1 fits %s imported.", file_path)
return new_file_path
\ No newline at end of file
from .level1 import Level1DataApi
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.msc import Level1Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1DataApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
raw_id: [int]
data_type: [str]
obs_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
raw_id = get_parameter(kwargs, "raw_id")
data_type = get_parameter(kwargs, "data_type")
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0]
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
filename = get_parameter(kwargs, "filename")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from msc_level1_data where 1=1"
sql_data = f"select * from msc_level1_data where 1=1"
sql_condition = ""
if raw_id:
sql_condition = f"{sql_condition} and raw_id={raw_id}"
if data_type:
sql_condition = f"{sql_condition} and data_type='{data_type}'"
if create_time_start:
sql_condition = f"{sql_condition} and create_time >='{create_time_start}'"
if create_time_end:
sql_condition = f"{sql_condition} and create_time <='{create_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if filename:
sql_condition = f" and filename='{filename}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
id = [int]
return dict or None
'''
try:
fits_id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from msc_level1_data where id=?", (fits_id,))
if r:
return Result.ok_data(data=Level1Record().from_dict(r))
else:
return Result.error(message=f"id:{fits_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from msc_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update msc_level1_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of QC1
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from msc_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update msc_level1_data set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
raw_id : [int]
data_type : [str]
cor_sci_id : [int]
prc_params : [str]
flat_id : [int]
dark_id : [int]
bias_id : [int]
lamp_id : [int]
arc_id : [int]
sky_id : [int]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
return csst_dfs_common.models.Result
'''
try:
rec = Level1Record(
id = 0,
raw_id = get_parameter(kwargs, "raw_id"),
data_type = get_parameter(kwargs, "data_type"),
cor_sci_id = get_parameter(kwargs, "cor_sci_id"),
prc_params = get_parameter(kwargs, "prc_params"),
flat_id = get_parameter(kwargs, "flat_id"),
dark_id = get_parameter(kwargs, "dark_id"),
bias_id = get_parameter(kwargs, "bias_id"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status"),
prc_time = get_parameter(kwargs, "prc_time"),
pipeline_id = get_parameter(kwargs, "pipeline_id")
)
existed = self.db.exists(
"select * from msc_level1_data where filename=?",
(rec.filename,)
)
if existed:
log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute(
'INSERT INTO msc_level1_data (raw_id,data_type,cor_sci_id,prc_params,flat_id,dark_id,bias_id,filename,file_path,qc1_status,prc_status,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)',
(rec.raw_id, rec.data_type, rec.cor_sci_id, rec.prc_params, rec.flat_id, rec.dark_id, rec.bias_id, rec.filename, rec.file_path, 0, 0, format_time_ms(time.time()),rec.pipeline_id,)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
...@@ -27,4 +27,4 @@ install_requires = ...@@ -27,4 +27,4 @@ install_requires =
csst_dfs_api_local.common = *.sql csst_dfs_api_local.common = *.sql
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
csst-dfs-ifs-local-ingest = csst_dfs_api_local.ifs:ingest csst-dfs-ingest = csst_dfs_api_local.common.ingest:ingest
\ No newline at end of file \ No newline at end of file
...@@ -10,5 +10,5 @@ class CommonEphemTestCase(unittest.TestCase): ...@@ -10,5 +10,5 @@ class CommonEphemTestCase(unittest.TestCase):
self.api = CatalogApi() self.api = CatalogApi()
def test_gaia3_query(self): def test_gaia3_query(self):
result = self.api.gaia3_query(ra=56.234, dec=14.4665, radius=1, min_mag=-1, max_mag=-1, obstime=-1, limit=2) result = self.api.gaia3_query(ra=54.234, dec=13.4665, radius=1, min_mag=-1, max_mag=-1, obstime=-1, limit=2)
print('return:', result) print('return:', result)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment