Commit cb2e0a9e authored by Wei Shoulin's avatar Wei Shoulin
Browse files

reconstruct

parent 24af0302
import os
import datetime
import sqlite3
from dbutils.persistent_db import PersistentDB
from DBUtils.PersistentDB import PersistentDB
from .utils import singleton
import logging
......@@ -38,19 +38,28 @@ class DBClient(object):
def __init_db(self):
with open(os.path.join(os.path.abspath(os.path.dirname(__file__)), "db.sql")) as f:
self.execute(f.read())
statements = f.read().split(";")
for s in statements:
self.execute(s)
def select_one(self, sql, param=()):
"""查询单个结果"""
count = self.__execute(sql, param)
_ = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
# result = self.__dict_datetime_obj_to_str(result)
if result is None:
return None
result = {
key[0]: col for key, col in zip(self._cursor.description, result)
}
return count, result
return result
def exists(self, sql, param=()):
_ = self.__execute(sql, param)
result = self._cursor.fetchone()
return result is not None
def select_many(self, sql, param=()):
"""
查询多个结果
......@@ -74,14 +83,10 @@ class DBClient(object):
count = self.__execute(sql, param)
return count
def begin(self):
"""开启事务"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""结束事务"""
if option == 'commit':
self._conn.autocommit()
self._conn.commit()
else:
self._conn.rollback()
......
create table if not exists "t_rawfits" (
"id" TEXT,
"obs_time" NUMERIC,
"ccd_num" NUMERIC,
"type" TEXT,
"path" TEXT,
PRIMARY KEY("id")
/*==============================================================*/
/* Table: ifs_result_0_1 */
/*==============================================================*/
create table ifs_result_0_1 (
result0_id INT not null,
result1_id INT not null,
create_time DATETIME null,
primary key (result0_id, result1_id)
);
/*==============================================================*/
/* Table: ifs_raw_ref */
/*==============================================================*/
create table ifs_raw_ref (
fit_id INT not null,
ref_id INT not null,
create_time DATETIME null,
primary key (fit_id, ref_id)
);
/*==============================================================*/
/* Table: ifs_rawfits */
/*==============================================================*/
create table ifs_rawfits (
id integer PRIMARY KEY autoincrement,
filename VARCHAR(100) null,
obs_time INT null,
ccd_num INT null,
exp_time DATETIME null,
file_path VARCHAR(128) null,
qc0_status tinyint(1) null,
qc0_time DATETIME null,
prc_status tinyint(1) null,
prc_time DATETIME null,
create_time DATETIME null
);
/*==============================================================*/
/* Table: ifs_ref_fits */
/*==============================================================*/
create table ifs_ref_fits (
id integer PRIMARY KEY autoincrement,
filename VARCHAR(128) null,
obs_time INT null,
exp_time DATETIME null,
ccd_num INT null,
file_path VARCHAR(256) null,
ref_type VARCHAR(32) null,
create_time DATETIME null,
status tinyint(1) null
);
/*==============================================================*/
/* Table: ifs_result_0 */
/*==============================================================*/
create table ifs_result_0 (
id integer PRIMARY KEY autoincrement,
filename VARCHAR(100) null,
raw_id INT null,
file_path VARCHAR(128) null,
create_time DATETIME null,
proc_type VARCHAR(32) null
);
/*==============================================================*/
/* Table: ifs_result_1 */
/*==============================================================*/
create table ifs_result_1 (
id integer PRIMARY KEY autoincrement,
filename VARCHAR(100) null,
file_path VARCHAR(128) null,
create_time DATETIME null,
proc_type VARCHAR(32) null
);
import os
from datetime import datetime
import time
......@@ -47,4 +48,10 @@ def singleton(cls):
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return inner
\ No newline at end of file
return inner
def create_dir(root_dir, sub_system, time_str: list):
the_dir = os.path.join(root_dir, sub_system, time_str)
if not os.path.exists(the_dir):
os.makedirs(os.path.join(root_dir, sub_system, time_str))
return the_dir
\ No newline at end of file
import os
import numpy as np
import pandas as pd
from csst_dfs_api_local.ifs import FitsApi
class RSS():
def __init__(self, rawdata):
self.fitsapi = FitsApi()
try:
self.raw = self.fitsapi.find(fits_id=rawdata)
if not os.path.exists(self.raw):
print(self.raw, 'does not exist')
self.raw = False
except:
print(rawdata, 'not in database')
self.raw = False
def set_bias(self, bias_file=None):
try:
self.bias = self.fitsapi.find(fits_id=bias_file)
if not os.path.exists(self.bias):
print(self.bias, 'does not exist')
self.bias = False
except:
print(bias_file, 'not in database')
self.bias = False
def set_flat(self, flat_file=None):
try:
self.flat = self.fitsapi.find(fits_id=flat_file)
if not os.path.exists(self.flat):
print(self.flat, 'does not exist')
self.flat = False
except:
print(flat_file, 'not in database')
self.flat = False
def set_arc(self, arc_file=None):
try:
self.arc = self.fitsapi.find(fits_id=arc_file)
if not os.path.exists(self.arc):
print(self.arc, 'does not exist')
self.arc = False
except:
print(arc_file, 'not in database')
self.arc = False
def set_sky(self, sky_file=None):
try:
self.sky = self.fitsapi.find(fits_id=sky_file)
if not os.path.exists(self.sky):
print(self.sky, 'does not exist')
self.sky = False
except:
print(sky_file, 'not in database')
self.sky = False
def makecube(self, outfile):
refiles = [self.raw, self.arc, self.flat, self.bias, self.sky]
print('reference files: ', refiles)
df = pd.DataFrame(refiles)
df.to_pickle(outfile)
if __name__ == '__main__':
rss1 = RSS('CCD1_ObsTime_600_ObsNum_30.fits') # raw data
rss1.bias() # currently no Bias file
rss1.flat(flat_file='Flat_flux.fits') # flat file
rss1.arc(arc_file='HgAr_flux.fits') # arc file
rss1.sky(sky_file='sky_noise_With_wavelength.fits') # sky file
rss1.makecube('rss_demo.pkl')
RSS_demo
输入数据:
(1)原始数据
文件名:CCD1_ObsTime_600_ObsNum_30.fits
说明:原始数据文件名不能为None;按照文件名找不到对应文件的时候,不报错,设置该文件状态为False。
(2)参考文件
平场参考文件: Flat_flux.fits
灯谱文件:HgAr_flux.fits
天光背景文件:sky_noise_With_wavelength.fits
说明:参考文件名可以为None,比如程序中的bias_file=None;文件名不为None,但是找不到对应文件的时候,不报错,设置该文件状态为False。
输出数据:
rss_demo.pkl (记录原始数据和四个参考文件的状态)
from .RSS_demo import RSS
\ No newline at end of file
from .fits import FitsApi
from .reffits import RefFitsApi
from .result0 import Result0Api
\ No newline at end of file
from .result0 import Result0Api
from .result1 import Result1Api
def ingest():
import os
root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
paths = {}
fitsApi = FitsApi()
refApi = RefFitsApi()
for (path, _, file_names) in os.walk(root_dir):
for filename in file_names:
if filename.find(".fits") > 0:
file_full_path = os.path.join(path, filename)
file_type = 'None'
if 'obs' in filename.lower():
file_type = 'obs'
elif 'flat' in filename.lower():
file_type = 'flat'
elif 'bias' in filename.lower():
file_type = 'bias'
elif 'hgar' in filename.lower():
file_type = 'arc'
elif 'sky' in filename.lower():
file_type = 'sky'
if file_type in ['obs']:
fitsApi.import2db(file_path = file_full_path.replace(root_dir, '')[1:])
if file_type in ['flat', 'bias', 'arc','hgar', 'sky']:
refApi.import2db(file_path = file_full_path.replace(root_dir, '')[1:])
return paths
import logging
import os
from os.path import join
import shutil
import time, datetime
import shutil
from glob import glob
from astropy.io import fits
from ..common.db import DBClient
from ..common.utils import get_parameter
from ..common.utils import get_parameter, format_time_ms, create_dir
log = logging.getLogger('csst')
class FitsApi(object):
def __init__(self):
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.check_dir()
self.db = DBClient()
......@@ -22,56 +26,73 @@ class FitsApi(object):
if not os.path.exists(self.root_dir):
os.mkdir(self.root_dir)
log.info("using [%s] as root directory", self.root_dir)
# if not os.path.exists(os.path.join(self.root_dir, "fits")):
# os.mkdir(os.path.join(self.root_dir, "fits"))
# if not os.path.exists(os.path.join(self.root_dir, "refs")):
# os.mkdir(os.path.join(self.root_dir, "refs"))
# if not os.path.exists(os.path.join(self.root_dir, "results")):
# os.mkdir(os.path.join(self.root_dir, "results"))
if not os.path.exists(os.path.join(self.root_dir, "fits")):
os.mkdir(os.path.join(self.root_dir, "fits"))
if not os.path.exists(os.path.join(self.root_dir, "refs")):
os.mkdir(os.path.join(self.root_dir, "refs"))
if not os.path.exists(os.path.join(self.root_dir, "results")):
os.mkdir(os.path.join(self.root_dir, "results"))
def find(self, **kwargs):
'''
parameter kwargs:
obs_time = [int]
type = [str]
fits_id = [str]
obs_time = [int]
file_name = [str]
exp_time = (start, end)
ccd_num = [int]
qc0_status = [int]
prc_status = [int]
return list of paths
'''
paths = []
obs_time = get_parameter(kwargs, "obs_time")
type = get_parameter(kwargs, "type")
fits_id = get_parameter(kwargs, "fits_id")
file_name = get_parameter(kwargs, "file_name")
exp_time = get_parameter(kwargs, "exp_time", (None, format_time_ms(time.time())))
ccd_num = get_parameter(kwargs, "ccd_num")
qc0_status = get_parameter(kwargs, "qc0_status")
prc_status = get_parameter(kwargs, "prc_status")
sql = []
sql.append("select * from ifs_rawfits where exp_time<='" + exp_time[1]+"'")
if exp_time[0] is not None:
sql.append(" and exp_time>='" + exp_time[0] + "'")
if obs_time is not None:
sql.append(" and obs_time=" + obs_time)
if ccd_num is not None:
sql.append(" and ccd_num=" + ccd_num)
if qc0_status is not None:
sql.append(" and qc0_status=" + qc0_status)
if prc_status is not None:
sql.append(" and prc_status=" + prc_status)
if file_name:
sql = ["select * from ifs_rawfits where filename='" + file_name + "'"]
_, r = self.db.select_many("".join(sql))
if (obs_time is None or type is None) and fits_id is None:
raise Exception('obs_time and type need to be defind')
if fits_id is None:
c, r = self.db.select_many(
'select * from t_rawfits where obs_time=? and type=?',
(obs_time, type)
)
if len(r) < 1:
raise Exception('not found')
for items in r:
paths.append(os.path.join(self.root_dir, items['path']))
else:
c, r = self.db.select_many(
'select * from t_rawfits where id=?',
(fits_id,),
)
if len(r) < 1:
raise Exception('not found')
return os.path.join(self.root_dir, r[0]['path'])
return paths
return r
def get(self, **kwargs):
'''
parameter kwargs:
fits_id = [int]
return dict or None
'''
fits_id = get_parameter(kwargs, "fits_id", -1)
r = self.db.select_one(
"select * from ifs_rawfits where id=?", (fits_id,))
return r
def read(self, **kwargs):
'''
parameter kwargs:
fits_id = [str]
file_path = [str]
chunk_size = [int]
fits_id = [int]
file_path = [str]
chunk_size = [int]
yield bytes of fits file
'''
......@@ -82,77 +103,119 @@ class FitsApi(object):
raise Exception("fits_id or file_path need to be defined")
if fits_id is not None:
c, r = self.db.select_one(
"select * from t_rawfits where id=?", (fits_id))
if c == 1:
file_path = os.path.join(self.root_dir, r["path"])
r = self.db.select_one(
"select * from ifs_rawfits where id=?", (fits_id,))
if r is not None:
file_path = os.path.join(self.root_dir, r["file_path"])
if file_path is not None:
path = os.path.join(self.root_dir, file_path)
chunk_size = get_parameter(kwargs, "chunk_size", 1024)
with open(path, 'r') as f:
with open(path, 'rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data
def update_status(self, **kwargs):
pass
def update_proc_status(self, **kwargs):
'''
parameter kwargs:
fits_id = [int]
status = [0 or 1]
'''
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status")
existed = self.db.exists(
"select * from ifs_rawfits where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return
self.db.execute(
'update ifs_rawfits set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
def upload(self, **kwargs):
def update_qc0_status(self, **kwargs):
'''
parameter kwargs:
file_path = [str]
fits_id = [int]
'''
upload to database and copy to csstpath
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status")
existed = self.db.exists(
"select * from ifs_rawfits where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return
self.db.execute(
'update ifs_rawfits set qc0_status=?, qc0_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
def import2db(self, **kwargs):
'''
parameter kwargs:
file_path = [str]
upload to database
'''
file_path = get_parameter(kwargs, "file_path")
if file_path is None:
raise Exception("file_path need to be defined")
file_full_path = os.path.join(self.root_dir, file_path)
if not os.path.exists(file_full_path):
raise Exception("%s not found" % (file_full_path))
basename = os.path.basename(file_path)
name = basename.split('.fits')[0].lower()
c, r = self.db.select_many(
"select * from t_rawfits where id=?",
(name,)
file_name = os.path.basename(file_path)
existed = self.db.exists(
"select * from ifs_rawfits where filename=?",
(file_name,)
)
if len(r) >= 1:
print('already upload', basename)
if existed:
log.warning('%s has already been imported' %(file_path, ))
return
hu = fits.getheader(os.path.join(self.root_dir, file_path))
obs_time = hu['obst'] if 'obst' in hu else 0
hu = fits.getheader(file_full_path)
obs_time = hu['obst'] if 'obst' in hu else '1'
ccd_num = hu['ccd_num'] if 'ccd_num' in hu else 0
# print(obs_time, ccd_num)
if 'obs' in name:
type = 'obs'
elif 'flat' in name:
type = 'flat'
elif 'bias' in name:
type = 'bias'
elif 'hgar' in name:
type = 'arc'
elif 'sky' in name:
type = 'sky'
else:
type = 'None'
exp_time = format_time_ms(time.time())
self.db.execute(
'INSERT INTO t_rawfits VALUES(?,?,?,?,?)',
(basename, obs_time, ccd_num, type, file_path)
'INSERT INTO ifs_rawfits (filename, obs_time, ccd_num, exp_time, file_path, qc0_status, prc_status, create_time) \
VALUES (?,?,?,?,?,?,?,?)',
(file_name, obs_time, ccd_num, exp_time, file_path, 0, 0, format_time_ms(time.time()),)
)
self.db._conn.commit()
log.info("%s imported.", file_path)
def scan2db(self):
paths = {}
for (path, _, file_names) in os.walk(self.root_dir):
for filename in file_names:
if filename.find(".fits") > 0:
filename = os.path.join(path, filename)
self.upload(file_path=filename.replace(self.root_dir, ''))
return paths
self.db.end()
log.info("raw fits %s imported.", file_path)
def write(self, **kwargs):
file_path = get_parameter(kwargs, "file_path")
if not file_path:
log.error("file_path is None")
return
new_file_dir = create_dir(os.path.join(self.root_dir, "fits"),
self.sub_system,
"/".join([str(datetime.datetime.now().year),"%02d"%(datetime.datetime.now().month),"%02d"%(datetime.datetime.now().day)]))
file_basename = os.path.basename(file_path)
new_file_path = os.path.join(new_file_dir, file_basename)
shutil.copyfile(file_path, new_file_path)
self.import2db(file_path = new_file_path.replace(self.root_dir, '')[1:])
\ No newline at end of file
import logging
import os
import time, datetime
import shutil
from astropy.io import fits
from ..common.db import DBClient
from ..common.utils import get_parameter
from ..common.utils import get_parameter, format_time_ms,create_dir
from . import FitsApi
log = logging.getLogger('csst')
class RefFitsApi(object):
REF_FITS_BIAS = "bias"
REF_FITS_FLAT = "flat"
REF_FITS_DARK = "dark"
REF_FITS_SKY = "sky"
REF_FITS_ARC = "arc"
REF_FITS_TYPES = [REF_FITS_BIAS, REF_FITS_FLAT, REF_FITS_DARK, REF_FITS_SKY, REF_FITS_ARC]
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.check_dir()
self.db = DBClient()
def check_dir(self):
if not os.path.exists(self.root_dir):
os.mkdir(self.root_dir)
log.info("using [%s] as root directory", self.root_dir)
if not os.path.exists(os.path.join(self.root_dir, "refs")):
os.mkdir(os.path.join(self.root_dir, "refs"))
def find(self, **kwargs):
'''
parameter kwargs:
obs_time = [int]
file_name = [str]
exp_time = (start, end)
status = [int]
ref_type = [str]
return list of paths
'''
obs_time = get_parameter(kwargs, "obs_time")
file_name = get_parameter(kwargs, "file_name")
exp_time = get_parameter(kwargs, "exp_time", (None, format_time_ms(time.time())))
ccd_num = get_parameter(kwargs, "ccd_num")
status = get_parameter(kwargs, "status")
ref_type = get_parameter(kwargs, "ref_type")
sql = []
sql.append("select * from ifs_ref_fits where exp_time<='" + exp_time[1] + "'")
if exp_time[0] is not None:
sql.append(" and exp_time>='" + exp_time[0] + "'")
if obs_time is not None:
sql.append(" and obs_time=" + obs_time)
if ccd_num is not None:
sql.append(" and ccd_num=" + ccd_num)
if ref_type is not None:
sql.append(" and ref_type='" + ref_type + "'")
if status is not None:
sql.append(" and status=" + status)
class RefFitsApi(FitsApi):
def upload(self, **kwargs):
if file_name:
sql = ["select * from ifs_ref_fits where filename='" + file_name + "'"]
sql.append(" order by exp_time desc")
_, r = self.db.select_many("".join(sql))
return r
def get(self, **kwargs):
'''
parameter kwargs:
file_path = [str]
fits_id = [int]
return dict or None
'''
fits_id = get_parameter(kwargs, "fits_id", -1)
r = self.db.select_one(
"select * from ifs_ref_fits where id=?", (fits_id,))
return r
upload to database and copy to csstpath
def read(self, **kwargs):
'''
parameter kwargs:
fits_id = [int]
file_path = [str]
chunk_size = [int]
yield bytes of fits file
'''
fits_id = get_parameter(kwargs, "fits_id")
file_path = get_parameter(kwargs, "file_path")
if fits_id is None and file_path is None:
raise Exception("fits_id or file_path need to be defined")
if fits_id is not None:
r = self.db.select_one(
"select * from ifs_ref_fits where id=?", (fits_id))
if r is not None:
file_path = os.path.join(self.root_dir, r["file_path"])
if file_path is not None:
path = os.path.join(self.root_dir, file_path)
chunk_size = get_parameter(kwargs, "chunk_size", 1024)
with open(path, 'rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data
def update_status(self, **kwargs):
'''
parameter kwargs:
fits_id = [int]
'''
fits_id = get_parameter(kwargs, "fits_id")
status = get_parameter(kwargs, "status")
existed = self.db.exists(
"select * from ifs_ref_fits where id=?",
(fits_id,)
)
if existed:
log.warning('%s not found' %(fits_id, ))
return
self.db.execute(
'update ifs_ref_fits set status=? where id=?',
(status, fits_id)
)
self.db.end()
def import2db(self, **kwargs):
'''
parameter kwargs:
file_path = [str]
ref_type = [str]
insert into database
'''
file_path = get_parameter(kwargs, "file_path")
if file_path is None:
raise Exception("file_path need to be defined")
file_full_path = os.path.join(self.root_dir, file_path)
if not os.path.exists(file_full_path):
raise Exception("%s not found"%(file_full_path))
basename = os.path.basename(file_path)
name = basename.split('.fits')[0]
c, r = self.db.select_many(
"select * from t_rawfits where id=?",
(name,)
file_name = os.path.basename(file_path)
existed = self.db.exists(
"select * from ifs_ref_fits where filename=?",
(file_name,)
)
if len(r) >= 1:
print('already upload', name)
if existed:
log.warning('%s has already been imported' %(file_path, ))
return
hu = fits.getheader(file_full_path)
obs_time = hu['obst'] if 'obst' in hu else ''
ccd_num = hu['ccd_num'] if 'ccd_num' in hu else 0
exp_time = format_time_ms(time.time())
ref_type = get_parameter(kwargs, "ref_type")
hu = fits.getheader(file_path)
obs_time = hu['obst']
ccd_num = hu['ccd_num']
type = name.split('_')[1]
save_path = os.path.join(self.root_dir, 'refs')
save_path = os.path.join(save_path, basename)
if ref_type is None:
if 'flat' in file_name.lower():
ref_type = 'flat'
elif 'bias' in file_name.lower():
ref_type = 'bias'
elif 'hgar' in file_name.lower():
ref_type = 'arc'
elif 'sky' in file_name.lower():
ref_type = 'sky'
else:
ref_type = ""
self.db.execute(
'INSERT INTO t_rawfits VALUES(?,?,?,?,?)',
(basename, obs_time, ccd_num, type, save_path)
'INSERT INTO ifs_ref_fits (filename, obs_time, ccd_num, exp_time, file_path, ref_type, status, create_time) \
VALUES(?,?,?,?,?,?,?,?)',
(file_name, obs_time, ccd_num, exp_time, file_path, ref_type, 1, format_time_ms(time.time()))
)
self.db._conn.commit()
if file_path != save_path:
shutil.copyfile(file_path, save_path)
log.info("%s imported.", save_path)
def scan2db(self):
paths = {}
for (path, _, file_names) in os.walk(os.path.join(self.root_dir, "refs")):
for filename in file_names:
if filename.find(".fits") > 0:
self.upload(file_path=os.path.join(path, filename))
return paths
self.db.end()
log.info("ref fits %s imported.", file_path)
def write(self, **kwargs):
file_path = get_parameter(kwargs, "file_path")
new_file_dir = create_dir(os.path.join(self.root_dir, "refs"),
self.sub_system,
"/".join([str(datetime.datetime.now().year),"%02d"%(datetime.datetime.now().month),"%02d"%(datetime.datetime.now().day)]))
file_basename = os.path.basename(file_path)
new_file_path = os.path.join(new_file_dir, file_basename)
shutil.copyfile(file_path, new_file_path)
self.import2db(file_path = new_file_path.replace(self.root_dir, '')[1:])
def associate_raw(self, **kwargs):
raw_fits_ids = get_parameter(kwargs, "raw_fits_ids")
ref_fits_id = get_parameter(kwargs, "ref_fits_id")
if raw_fits_ids is None or ref_fits_id is None:
raise Exception("raw_fits_ids or ref_fits_id is None")
sql = ['INSERT INTO ifs_raw_ref (fit_id, ref_id, create_time) values ']
values = ["(%s,%s,%s)"%(i,ref_fits_id,format_time_ms(time.time())) for i in raw_fits_ids]
self.db.execute(sql + ",".join())
self.db.end()
log.info("%s associate to %s imported.", raw_fits_ids, ref_fits_id)
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import get_parameter, format_time_ms, create_dir
log = logging.getLogger('csst')
class Result0Api(object):
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
def upload(self, **kwargs):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.check_dir()
self.db = DBClient()
def check_dir(self):
if not os.path.exists(self.root_dir):
os.mkdir(self.root_dir)
log.info("using [%s] as root directory", self.root_dir)
if not os.path.exists(os.path.join(self.root_dir, "results0")):
os.mkdir(os.path.join(self.root_dir, "results0"))
def find(self, **kwargs):
'''
parameter kwargs:
fits_id = [str]
file_path = [str]
chunk_size = [int]
raw_id = [int]
file_name = [str]
proc_type = [str]
yield bytes of fits file
return list of paths
'''
pass
def find(self, **kwargs):
pass
paths = []
raw_id = get_parameter(kwargs, "raw_id", -1)
file_name = get_parameter(kwargs, "file_name")
proc_type = get_parameter(kwargs, "proc_type")
sql = []
sql.append("select * from ifs_result_0 where raw_id=%d" %(raw_id,))
if proc_type is not None:
sql.append(" and proc_type='" + proc_type + "'")
if file_name:
sql = ["select * from ifs_result_0 where filename='" + file_name + "'"]
_, r = self.db.select_many("".join(sql))
return r
def get(self, **kwargs):
'''
parameter kwargs:
fits_id = [int]
return dict or None
'''
fits_id = get_parameter(kwargs, "fits_id", -1)
r = self.db.select_one(
"select * from ifs_result_0 where id=?", (fits_id,))
return r
def read(self, **kwargs):
pass
'''
parameter kwargs:
fits_id = [int]
file_path = [str]
chunk_size = [int]
yield bytes of fits file
'''
fits_id = get_parameter(kwargs, "fits_id")
file_path = get_parameter(kwargs, "file_path")
if fits_id is None and file_path is None:
raise Exception("fits_id or file_path need to be defined")
if fits_id is not None:
r = self.db.select_one(
"select * from ifs_result_0 where id=?", (fits_id))
if r is not None:
file_path = os.path.join(self.root_dir, r["file_path"])
if file_path is not None:
path = os.path.join(self.root_dir, file_path)
chunk_size = get_parameter(kwargs, "chunk_size", 1024)
with open(path, 'rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data
def write(self, **kwargs):
'''
parameter kwargs:
raw_id = [int]
file_path = [str]
proc_type = [str]
insert into database
'''
raw_id = get_parameter(kwargs, "raw_id")
file_path = get_parameter(kwargs, "file_path")
proc_type = get_parameter(kwargs, "proc_type", "default")
if file_path is None:
raise Exception("file_path need to be defined")
new_file_dir = create_dir(os.path.join(self.root_dir, "results0"),
self.sub_system,
"/".join([str(datetime.datetime.now().year),"%02d"%(datetime.datetime.now().month),"%02d"%(datetime.datetime.now().day)]))
file_basename = os.path.basename(file_path)
new_file_path = os.path.join(new_file_dir, file_basename)
shutil.copyfile(file_path, new_file_path)
self.db.execute(
'INSERT INTO ifs_result_0 (filename, file_path, raw_id, proc_type, create_time) \
VALUES(?,?,?,?,?)',
(file_basename, new_file_path.replace(self.root_dir, '')[1:], raw_id, proc_type, format_time_ms(time.time()))
)
self.db.end()
def wirte(self, **kwargs):
pass
\ No newline at end of file
log.info("result0 fits %s imported.", file_path)
return new_file_path
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import get_parameter, format_time_ms, create_dir
log = logging.getLogger('csst')
class Result1Api(object):
def __init__(self, sub_system = "ifs"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.check_dir()
self.db = DBClient()
def check_dir(self):
if not os.path.exists(self.root_dir):
os.mkdir(self.root_dir)
log.info("using [%s] as root directory", self.root_dir)
if not os.path.exists(os.path.join(self.root_dir, "results1")):
os.mkdir(os.path.join(self.root_dir, "results1"))
def find(self, **kwargs):
'''
parameter kwargs:
file_name = [str]
proc_type = [str]
return list of paths
'''
paths = []
file_name = get_parameter(kwargs, "file_name")
proc_type = get_parameter(kwargs, "proc_type", "default")
sql = ["select * from ifs_result_1 where proc_type='" + proc_type + "'"]
if file_name:
sql = ["select * from ifs_result_1 where filename='" + file_name + "'"]
_, r = self.db.select_many("".join(sql))
return r
def get(self, **kwargs):
'''
parameter kwargs:
fits_id = [int]
return dict or None
'''
fits_id = get_parameter(kwargs, "fits_id", -1)
r = self.db.select_one(
"select * from ifs_result_1 where id=?", (fits_id,))
_, result0s = self.db.select_many(
"select result0_id, create_time from ifs_result_0_1 where result1_id=?", (fits_id,))
return r, result0s
def read(self, **kwargs):
'''
parameter kwargs:
fits_id = [int]
file_path = [str]
chunk_size = [int]
yield bytes of fits file
'''
fits_id = get_parameter(kwargs, "fits_id")
file_path = get_parameter(kwargs, "file_path")
if fits_id is None and file_path is None:
raise Exception("fits_id or file_path need to be defined")
if fits_id is not None:
r = self.db.select_one(
"select * from ifs_result_1 where id=?", (fits_id))
if r is not None:
file_path = os.path.join(self.root_dir, r["file_path"])
if file_path is not None:
path = os.path.join(self.root_dir, file_path)
chunk_size = get_parameter(kwargs, "chunk_size", 1024)
with open(path, 'rb') as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data
def write(self, **kwargs):
'''
parameter kwargs:
file_path = [str]
proc_type = [str]
result0_ids = [list]
insert into database
'''
file_path = get_parameter(kwargs, "file_path")
proc_type = get_parameter(kwargs, "proc_type", "default")
result0_ids = get_parameter(kwargs, "result0_ids", [])
if file_path is None:
raise Exception("file_path need to be defined")
new_file_dir = create_dir(os.path.join(self.root_dir, "results1"),
self.sub_system,
"/".join([str(datetime.datetime.now().year),"%02d"%(datetime.datetime.now().month),"%02d"%(datetime.datetime.now().day)]))
file_basename = os.path.basename(file_path)
new_file_path = os.path.join(new_file_dir, file_basename)
shutil.copyfile(file_path, new_file_path)
self.db.execute(
'INSERT INTO ifs_result_1 (filename, file_path, proc_type, create_time) \
VALUES(?,?,?,?)',
(file_basename, new_file_path.replace(self.root_dir, '')[1:], proc_type, format_time_ms(time.time()),)
)
self.db.end()
result1_id = 1
for id0 in result0_ids:
self.db.execute(
'INSERT INTO ifs_result_0_1 (result0_id, result1_id, create_time) \
VALUES(?,?,?)',
(id0, result1_id, format_time_ms(time.time()))
)
self.db.end()
log.info("result1 fits %s imported.", file_path)
return new_file_path
\ No newline at end of file
import os
import numpy as np
import pandas as pd
import logging
from astropy.io import fits
from csst_dfs_api_local.ifs import FitsApi, RefFitsApi, Result0Api, Result1Api
log = logging.getLogger('csst')
class RSS(object):
def __init__(self, file_name):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.fitsApi = FitsApi()
self.refFitsApi = RefFitsApi()
self.result0Api = Result0Api()
self.result1Api = Result1Api()
try:
self.raw = self.fitsApi.find(file_name=file_name)
self.raw = self.raw[0] if self.raw else None
if self.raw is None:
log.error('raw %s not found' %(file_name,))
except Exception as e:
log.error('raw %s not found' %(file_name,),e)
def set_bias(self, file_name=None):
try:
self.bias = self.refFitsApi.find(file_name=file_name, ref_type=RefFitsApi.REF_FITS_BIAS)
self.bias = self.bias[0] if self.bias else None
if self.bias is None:
log.error('bias %s not found' %(file_name,))
except Exception as e:
log.error('bias %s not found' %(file_name,),e)
def set_flat(self, file_name=None):
try:
self.flat = self.refFitsApi.find(file_name=file_name, ref_type=RefFitsApi.REF_FITS_FLAT)
self.flat = self.flat[0] if self.flat else None
if self.flat is None:
log.error('flat %s not found' %(file_name,))
except Exception as e:
log.error('flat %s not found' %(file_name,),e)
def set_arc(self, file_name = None):
try:
self.arc = self.refFitsApi.find(file_name=file_name, ref_type=RefFitsApi.REF_FITS_ARC)
self.arc = self.arc[0] if self.arc else None
if self.arc is None:
log.error('arc %s not found' %(file_name,))
except Exception as e:
log.error('arc %s not found' %(file_name,),e)
def set_sky(self, file_name = None):
try:
self.sky = self.refFitsApi.find(file_name=file_name, ref_type=RefFitsApi.REF_FITS_SKY)
self.sky = self.sky[0] if self.sky else None
if self.sky is None:
log.error('sky %s not found' %(file_name,))
except Exception as e:
log.error('sky %s not found' %(file_name,),e)
def makecube(self, outfile):
hdul_raw = fits.open(os.path.join(self.root_dir, self.raw['file_path']))
hdul_arc = fits.open(os.path.join(self.root_dir, self.arc['file_path']))
hdul_flat = fits.open(os.path.join(self.root_dir, self.flat['file_path']))
hdul_sky = fits.open(os.path.join(self.root_dir, self.sky['file_path']))
hdul_raw.append(hdul_arc[0])
hdul_raw.append(hdul_flat[0])
hdul_raw.append(hdul_sky[0])
hdul_raw.writeto(outfile, overwrite=True)
self.result0Api.write(raw_id = self.raw['id'], file_path = outfile, proc_type = 'default')
def makecube2(self, outfile):
refiles = [self.raw, self.arc, self.flat, self.bias, self.sky]
raw_segments = self.fitsApi.read(self.raw['id'])
arc_segments = self.refFitsApi.read(self.arc['id'])
flat_segments = self.refFitsApi.read(self.flat['id'])
sky_segments = self.refFitsApi.read(self.sky['id'])
hdul_raw = fits.HDUList.fromstring(b''.join(raw_segments))
hdul_arc = fits.HDUList.fromstring(b''.join(arc_segments))
hdul_flat = fits.HDUList.fromstring(b''.join(flat_segments))
hdul_sky = fits.HDUList.fromstring(b''.join(sky_segments))
hdul_raw.append(hdul_arc[0])
hdul_raw.append(hdul_flat[0])
hdul_raw.append(hdul_sky[0])
hdul_raw.writeto(outfile, overwrite=True)
self.result0Api.write(raw_id = self.raw['id'], file_path = outfile, proc_type = 'default')
if __name__ == '__main__':
rss1 = RSS('CCD1_ObsTime_300_ObsNum_1.fits') # raw data
# rss1.set_bias() # currently no Bias file
rss1.set_flat(file_name = 'Flat_flux.fits') # flat file
rss1.set_arc(file_name = 'HgAr_flux.fits') # arc file
rss1.set_sky(file_name = 'sky_noise_With_wavelength.fits') # sky file
rss1.makecube('/opt/temp/csst_ifs/rss_demo.fits')
......@@ -23,4 +23,7 @@ setup_requires = setuptools_scm
install_requires =
astropy>=4.0
[options.package_data]
csst_dfs_api_local.common = *.sql
\ No newline at end of file
csst_dfs_api_local.common = *.sql
[options.entry_points]
console_scripts =
csst-dfs-ifs-local-ingest = csst_dfs_api_local.ifs:ingest
\ No newline at end of file
import logging
from csst_dfs_api_local.ifs import FitsApi
api = FitsApi()
# api.scan2db()
c, r = api.db.select_one(
"select * from t_rawfits where id=?",
('CCD1_Flat_img.fits', )
)
print(r)
\ No newline at end of file
import logging
import unittest
import os
from csst_dfs_api_local.entity import RSS
log = logging.getLogger('csst')
class RSS_TestCase(unittest.TestCase):
def setUp(self):
self.rss = RSS('CCD1_ObsTime_600_ObsNum_30.fits')
self.rss.set_bias()
self.rss.set_flat(flat_file='Flat_flux.fits')
self.rss.set_arc(arc_file='HgAr_flux.fits')
self.rss.set_sky(sky_file='sky_noise_With_wavelength.fits')
def test_init(self):
assert self.rss.raw
# def test_bias(self):
# self.rss.set_bias()
# assert self.rss.bias
def test_flat(self):
assert self.rss.flat
def test_arc(self):
assert self.rss.arc
def test_sky(self):
assert self.rss.sky
def test_makecube(self):
self.rss.makecube('rss_demo.pkl')
assert os.path.exists('rss_demo.pkl')
os.remove('rss_demo.pkl')
\ No newline at end of file
......@@ -8,5 +8,15 @@ class DBClientTestCase(unittest.TestCase):
def test_db_init(self):
db = DBClient()
db.select_one("select * from t_rawfits")
r = db.select_one("select count(*) as c from ifs_rawfits")
if r is not None:
print("ifs_rawfits count:", r['c'])
r = db.exists("select * from ifs_rawfits where id=2323")
if r:
print("existed")
else:
print("not existed")
import logging
import unittest
import os
from astropy.io import fits
from csst_dfs_api_local.ifs import FitsApi
log = logging.getLogger('csst')
class IFSFitsTestCase(unittest.TestCase):
def setUp(self):
self.api = FitsApi()
# self.api.scan2db()
def test_find(self):
path1 = self.api.find(obs_time=900, type='obs')
log.info('find', path1)
assert len(path1) > 0
path2 = self.api.find(fits_id='CCD2_ObsTime_600_ObsNum_8.fits')
log.info('find', path2)
assert 'CCD2_ObsTime_600_ObsNum_8.fits' in path2
recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits')
print('find:', recs)
assert len(recs) == 1
recs = self.api.find()
print('find:', recs)
assert len(recs) > 1
def test_read(self):
file = self.api.read(fits_id='CCD2_ObsTime_600_ObsNum_8.fits')
log.info('read', str(type(file)))
path = self.api.find(obs_time=900, type='obs')
file = self.api.read(file_path=path)
log.info('read', str(type(file)))
recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits')
print("The full path: ", os.path.join(self.api.root_dir, recs[0]['file_path']))
file_segments = self.api.read(file_path=recs[0]['file_path'])
file_bytes = b''.join(file_segments)
hdul = fits.HDUList.fromstring(file_bytes)
print(hdul.info())
hdr = hdul[0].header
print(repr(hdr))
def test_update_proc_status(self):
recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits')
self.api.update_proc_status(fits_id=recs[0]['id'],status=1)
rec = self.api.get(fits_id=recs[0]['id'])
assert rec['prc_status'] == 1
def test_update_qc0_status(self):
recs = self.api.find(file_name='CCD1_ObsTime_300_ObsNum_7.fits')
self.api.update_qc0_status(fits_id=recs[0]['id'],status=1)
rec = self.api.get(fits_id=recs[0]['id'])
assert rec['qc0_status'] == 1
def test_write(self):
recs = self.api.write(file_path='/opt/temp/csst_ifs/CCD2_ObsTime_1200_ObsNum_40.fits')
recs = self.api.find(file_name='CCD2_ObsTime_1200_ObsNum_40.fits')
rec = self.api.get(fits_id=recs[0]['id'])
print(rec)
\ No newline at end of file
import logging
import unittest
import os
from csst_dfs_api_local.ifs import ingest
log = logging.getLogger('csst')
class IFSIngestTestCase(unittest.TestCase):
def setUp(self):
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
def test_ingest(self):
ingest()
# import logging
# import unittest
# from csst_dfs_api_local.ifs import RefFitsApi
# log = logging.getLogger('csst')
# class IFSFitsTestCase(unittest.TestCase):
# def setUp(self):
# self.api = RefFitsApi()
# self.api.scan2db()
# def test_find(self):
# path = self.api.find(obs_time=300, type='Flat')
# log.info('find', path)
# path = self.api.find(fits_id='CCD1_Flat_img.fits')
# print(path)
# log.info('find', path)
# def test_read(self):
# file = self.api.read(fits_id='CCD1_Flat_img.fits')
# log.info('read', str(type(file)))
# path = self.api.find(obs_time=300, type='Flat')
# file = self.api.read(file_path=path)
# log.info('read', str(type(file)))
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.ifs import RefFitsApi
class IFSFitsTestCase(unittest.TestCase):
def setUp(self):
self.api = RefFitsApi()
def test_find(self):
recs = self.api.find(exp_time=('2021-03-19 13:42:22', '2021-03-19 15:28:00'), ref_type=RefFitsApi.REF_FITS_FLAT)
print('find:', recs)
assert len(recs) > 1
recs = self.api.find()
print('=' * 80)
print('find:', recs)
assert len(recs) > 1
def test_read(self):
recs = self.api.find(file_name='CCD2_Flat_img.fits')
print("The full path: ", os.path.join(self.api.root_dir, recs[0]['file_path']))
file_segments = self.api.read(file_path=recs[0]['file_path'])
file_bytes = b''.join(file_segments)
hdul = fits.HDUList.fromstring(file_bytes)
print(hdul.info())
hdr = hdul[0].header
print(repr(hdr))
def test_update_status(self):
recs = self.api.find(file_name='CCD2_Flat_img.fits')
self.api.update_status(fits_id=recs[0]['id'],status=1)
rec = self.api.get(fits_id=recs[0]['id'])
assert rec['status'] == 1
def test_write(self):
recs = self.api.write(file_path='/opt/temp/csst_ifs/CCD3_Flat_img.fits')
recs = self.api.find(file_name='CCD3_Flat_img.fits')
rec = self.api.get(fits_id=recs[0]['id'])
print(rec)
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.ifs import Result0Api
class IFSResult0TestCase(unittest.TestCase):
def setUp(self):
self.api = Result0Api()
def test_find(self):
recs = self.api.find(file_name='CCD2_ObsTime_1200_ObsNum_40.fits')
print('find:', recs)
assert len(recs) == 1
recs = self.api.find()
print('find:', recs)
assert len(recs) > 1
def test_read(self):
recs = self.api.find(file_name='CCD2_ObsTime_1200_ObsNum_40.fits')
print("The full path: ", os.path.join(self.api.root_dir, recs[0]['file_path']))
file_segments = self.api.read(file_path=recs[0]['file_path'])
file_bytes = b''.join(file_segments)
hdul = fits.HDUList.fromstring(file_bytes)
print(hdul.info())
hdr = hdul[0].header
print(repr(hdr))
def test_write(self):
self.api.write(raw_id = 1,
file_path='/opt/temp/csst_ifs/CCD2_ObsTime_1200_ObsNum_40.fits',
proc_type = 'default')
recs = self.api.find(raw_id=1)
print()
print(recs)
print("="*80)
recs = self.api.find(file_name='CCD2_ObsTime_1200_ObsNum_40.fits')
print(recs)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment