Commit 1353f622 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

rm reqs

parent 7191c49c
from .level1 import Level1DataApi
from .level2spectra import Level2SpectraApi
\ No newline at end of file
import os, sys
import argparse
import logging
from astropy.io import fits
import datetime
import shutil
from csst_dfs_api_local.common.db import DBClient
log = logging.getLogger('csst-dfs-api-local')
def ingest():
db = DBClient()
parser = argparse.ArgumentParser(prog=f"{sys.argv[0]}", description="ingest the local files")
parser.add_argument('-i','--infile', dest="infile", help="a file or a directory")
parser.add_argument('-m', '--copyfiles', dest="copyfiles", action='store_true', default=False, help="copy files after import")
args = parser.parse_args(sys.argv[1:])
import_root_dir = args.infile
if os.path.isfile(import_root_dir):
log.info(f"prepare import {import_root_dir}")
ingesst_one(import_root_dir, db, args.copyfiles)
if os.path.isdir(import_root_dir):
for (path, _, file_names) in os.walk(import_root_dir):
for filename in file_names:
if filename.find(".fits") > 0:
file_full_path = os.path.join(path, filename)
log.info(f"prepare import {file_full_path}")
ingesst_one(file_full_path, db, args.copyfiles)
db.close()
def ingesst_one(file_path, db, copyfiles):
dest_root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
hdul = fits.open(file_path)
header = hdul[0].header
obs_id = header["OBSID"]
exp_start_time = f"{header['DATE-OBS']} {header['TIME-OBS']}"
exp_time = header['EXPTIME']
module_id = header["INSTRUME"]
obs_type = header["FILETYPE"]
qc0_status = -1
prc_status = -1
time_now = datetime.datetime.now()
create_time = time_now.strftime('%Y-%m-%d %H:%M:%S')
facility_status_id = 0
module_status_id = 0
existed = db.exists("select * from t_observation where obs_id=?", (obs_id,))
if not existed:
db.execute("insert into t_observation \
(obs_id,obs_time,exp_time,module_id,obs_type,facility_status_id, module_status_id, qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?)",
(obs_id,exp_start_time,exp_time,module_id,obs_type,facility_status_id,module_status_id,qc0_status, prc_status,create_time))
db.end()
#level0
detector = header["DETECTOR"]
filename = header["FILENAME"]
existed = db.exists(
"select * from sls_level0_data where filename=?",
(filename,)
)
if existed:
log.warning('%s has already been imported' %(file_path, ))
db.end()
return
detector_status_id = 0
file_full_path = file_path
if copyfiles:
file_dir = f"{dest_root_dir}/{module_id}/{obs_type.upper()}/{header['EXPSTART']}/{obs_id}/MS"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
file_full_path = f"{file_dir}/{filename}.fits"
level0_id = f"{obs_id}{detector}"
c = db.execute("insert into sls_level0_data \
(level0_id, obs_id, detector_no, obs_type, obs_time, exp_time,detector_status_id, filename, file_path,qc0_status, prc_status,create_time) \
values (?,?,?,?,?,?,?,?,?,?,?,?)",
(level0_id, obs_id, detector, obs_type, exp_start_time, exp_time, detector_status_id, filename, file_full_path, qc0_status, prc_status,create_time))
db.end()
level0_id_id = db.last_row_id()
#level0-header
ra_obj = header["RA_OBJ"]
dec_obj = header["DEC_OBJ"]
db.execute("delete from sls_level0_header where id=?",(level0_id_id,))
db.execute("insert into sls_level0_header \
(id, ra_obj, dec_obj, create_time) \
values (?,?,?,?)",
(level0_id_id, ra_obj, dec_obj, create_time))
if copyfiles:
#copy files
shutil.copyfile(file_path, file_full_path)
db.end()
print(f"{file_path} imported")
if __name__ == "__main__":
ingest()
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.sls import Level1Record
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1DataApi(object):
def __init__(self, sub_system = "sls"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 records from database
parameter kwargs:
level0_id: [str]
data_type: [str]
create_time : (start, end),
qc1_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
return: csst_dfs_common.models.Result
'''
try:
level0_id = get_parameter(kwargs, "level0_id")
data_type = get_parameter(kwargs, "data_type")
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0]
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1]
qc1_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
filename = get_parameter(kwargs, "filename")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from sls_level1_data where 1=1"
sql_data = f"select * from sls_level1_data where 1=1"
sql_condition = ""
if level0_id:
sql_condition = f"{sql_condition} and level0_id='{level0_id}'"
if data_type:
sql_condition = f"{sql_condition} and data_type='{data_type}'"
if create_time_start:
sql_condition = f"{sql_condition} and create_time >='{create_time_start}'"
if create_time_end:
sql_condition = f"{sql_condition} and create_time <='{create_time_end}'"
if qc1_status:
sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if filename:
sql_condition = f" and filename='{filename}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1Record, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
id = [int]
return csst_dfs_common.models.Result
'''
try:
id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from sls_level1_data where id=?", (id,))
if r:
return Result.ok_data(data=Level1Record().from_dict(r))
else:
return Result.error(message=f"id:{id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update sls_level1_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc1_status(self, **kwargs):
''' update the status of QC1
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_level1_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update sls_level1_data set qc1_status=?, qc1_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 record into database
parameter kwargs:
level0_id : [str]
data_type : [str]
prc_params : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
pipeline_id : [str]
refs: [dict]
return csst_dfs_common.models.Result
'''
try:
rec = Level1Record(
id = 0,
level0_id = get_parameter(kwargs, "level0_id"),
data_type = get_parameter(kwargs, "data_type"),
prc_params = get_parameter(kwargs, "prc_params"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
refs = get_parameter(kwargs, "refs", {})
)
existed = self.db.exists(
"select * from sls_level1_data where filename=?",
(rec.filename,)
)
if existed:
log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute(
'INSERT INTO sls_level1_data (level0_id,data_type,prc_params,filename,file_path,qc1_status,prc_status,prc_time,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?,?)',
(rec.level0_id, rec.data_type, rec.prc_params, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,)
)
self.db.end()
rec.id = self.db.last_row_id()
if rec.refs.items():
sql_refs = "insert into sls_level1_ref (level1_id,ref_type,cal_id) values "
values = ["(%s,'%s',%s)"%(rec.id,k,v) for k,v in rec.refs.items()]
_ = self.db.execute(sql_refs + ",".join(values))
self.db.end()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
import os
import logging
import time, datetime
import shutil
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.sls import Level1PrcRecord
from csst_dfs_commons.models.common import from_dict_list
log = logging.getLogger('csst')
class Level1PrcApi(object):
def __init__(self, sub_system = "sls"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def find(self, **kwargs):
''' retrieve level1 procedure records from database
parameter kwargs:
level1_id: [int]
pipeline_id: [str]
prc_module: [str]
prc_status : [int]
return: csst_dfs_common.models.Result
'''
try:
level1_id = get_parameter(kwargs, "level1_id", 0)
pipeline_id = get_parameter(kwargs, "pipeline_id")
prc_module = get_parameter(kwargs, "prc_module")
prc_status = get_parameter(kwargs, "prc_status")
sql_data = f"select * from sls_level1_prc"
sql_condition = f"where level1_id={level1_id}"
if pipeline_id:
sql_condition = sql_condition + " and pipeline_id='" + pipeline_id + "'"
if prc_module:
sql_condition = sql_condition + " and prc_module ='" + prc_module + "'"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
sql_data = f"{sql_data} {sql_condition}"
_, records = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level1PrcRecord, records)).append("totalCount", len(records))
except Exception as e:
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from sls_level1_prc where id=?",
(id,)
)
if not existed:
log.warning('%s not found' %(id, ))
return Result.error(message ='%s not found' %(id, ))
self.db.execute(
'update sls_level1_prc set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level1 procedure record into database
parameter kwargs:
level1_id : [int]
pipeline_id : [str]
prc_module : [str]
params_file_path : [str]
prc_status : [int]
prc_time : [str]
result_file_path : [str]
return csst_dfs_common.models.Result
'''
rec = Level1PrcRecord(
id = 0,
level1_id = get_parameter(kwargs, "level1_id", 0),
pipeline_id = get_parameter(kwargs, "pipeline_id"),
prc_module = get_parameter(kwargs, "prc_module"),
params_file_path = get_parameter(kwargs, "params_file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time"),
result_file_path = get_parameter(kwargs, "result_file_path")
)
try:
self.db.execute(
'INSERT INTO sls_level1_prc (level1_id,pipeline_id,prc_module, params_file_path, prc_status,prc_time,result_file_path) \
VALUES(?,?,?,?,?,?,?)',
(rec.level1_id, rec.pipeline_id, rec.prc_module, rec.params_file_path, rec.prc_status, rec.prc_time, rec.result_file_path)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
DBUtils==1.3
astropy>=4.0
git+https://github.com/astronomical-data-processing/csst-dfs-commons.git
\ No newline at end of file
astropy>=4.0
\ No newline at end of file
......@@ -21,14 +21,9 @@ python_requires = >=3.7
zip_safe = False
setup_requires = setuptools_scm
install_requires =
astropy>=4.0
DBUtils==1.3
[options.package_data]
csst_dfs_api_local.common = *.sql
[options.entry_points]
console_scripts =
csst-msc-ingest-local = csst_dfs_api_local.msc.ingest:ingest
csst-ifs-ingest-local = csst_dfs_api_local.ifs.ingest:ingest
csst-mci-ingest-local = csst_dfs_api_local.mci.ingest:ingest
csst-sls-ingest-local = csst_dfs_api_local.sls.ingest:ingest
csst-cpic-ingest-local = csst_dfs_api_local.cpic.ingest:ingest
\ No newline at end of file
csst-dfs-ingest-local = csst_dfs_api_local.facility.ingest:ingest
\ No newline at end of file
import unittest
from csst_dfs_api_local.mci import Level0DataApi
from csst_dfs_api_local.facility import Level0DataApi
class MCILevel0DataApiTestCase(unittest.TestCase):
class Level0DataApiTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0DataApi()
......
......@@ -2,9 +2,9 @@ import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.mci.level0prc import Level0PrcApi
from csst_dfs_api_local.facility.level0prc import Level0PrcApi
class MCILevel0PrcTestCase(unittest.TestCase):
class Level0PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0PrcApi()
......
import unittest
from csst_dfs_api_local.msc import Level1DataApi
from csst_dfs_api_local.facility import Level1DataApi
class MSCLevel1DataTestCase(unittest.TestCase):
class Level1DataTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1DataApi()
......
......@@ -2,9 +2,9 @@ import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.msc.level1prc import Level1PrcApi
from csst_dfs_api_local.facility.level1prc import Level1PrcApi
class MSCLevel1PrcTestCase(unittest.TestCase):
class Level1PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1PrcApi()
......
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.ifs.calmerge import CalMergeApi
class IFSCalMergeApiTestCase(unittest.TestCase):
def setUp(self):
self.api = CalMergeApi()
def test_find(self):
recs = self.api.find(detector_no='01',
ref_type = "bias",
obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
def test_get_latest_by_l0(self):
rec = self.api.get_latest_by_l0(level0_id='000001301', ref_type = "bias")
print('get:', rec)
def test_get(self):
rec = self.api.get(cal_id='0000231')
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 100, status = 1)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 100, status = 2)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
cal_id='0000231',
detector_no='01',
ref_type = "bias",
obs_time = "2021-06-04 11:12:13",
exp_time = 150,
filename = "/opt/dddasd.params",
file_path = "/opt/dddasd.fits",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
level0_ids = ['0000231','0000232','0000233','0000234'])
print('write:', rec)
\ No newline at end of file
import unittest
from csst_dfs_api_local.ifs.ingest import ingest, ingesst_one
from csst_dfs_api_local.facility.ingest import ingest, ingesst_one
class IFSLevel0DataApiTestCase(unittest.TestCase):
......
import unittest
from csst_dfs_api_local.ifs import Level0DataApi
class IFSLevel0DataApiTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0DataApi()
def test_find(self):
recs = self.api.find(obs_id = '300000055', obs_type = 'sky', limit = 0)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 31)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 31, status = 6)
print('update_proc_status:', rec)
def test_update_qc0_status(self):
rec = self.api.update_qc0_status(id = 31, status = 7)
print('update_qc0_status:', rec)
def test_write(self):
rec = self.api.write(
obs_id = '0000013',
detector_no = "01",
obs_type = "sky",
obs_time = "2021-06-06 11:12:13",
exp_time = 150,
detector_status_id = 3,
filename = "MSC_00001234",
file_path = "/opt/MSC_00001234.fits")
print('write:', rec)
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.ifs.level0prc import Level0PrcApi
class IFSLevel0PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level0PrcApi()
def test_find(self):
recs = self.api.find(level0_id='300000055CCD231-c4')
print('find:', recs)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_write(self):
rec = self.api.write(level0_id='300000055CCD231-c4',
pipeline_id = "P1",
prc_module = "QC0",
params_file_path = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
result_file_path = "/opt/dddasd.header")
print('write:', rec)
\ No newline at end of file
import unittest
from csst_dfs_api_local.ifs import Level1DataApi
class IFSLevel1DataApiTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1DataApi()
def test_find(self):
recs = self.api.find(
level0_id='0000223',
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")
)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 1)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 1, status = 7)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
level0_id='0000223',
data_type = "sci",
cor_sci_id = 2,
prc_params = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-05 11:12:13',
filename = "dddasd223234.fits",
file_path = "/opt/dddasd23.fits",
pipeline_id = "P2",
refs = {'dark': 1, 'bias': 2, 'flat': 3 })
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.ifs.level1prc import Level1PrcApi
class IFSLevel1PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1PrcApi()
def test_find(self):
recs = self.api.find(level1_id=1)
print('find:', recs)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_write(self):
rec = self.api.write(level1_id=1,
pipeline_id = "P1",
prc_module = "QC0",
params_file_path = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
result_file_path = "/opt/dddasd.header")
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.mci.calmerge import CalMergeApi
class MCICalMergeApiTestCase(unittest.TestCase):
def setUp(self):
self.api = CalMergeApi()
def test_find(self):
recs = self.api.find(detector_no='01',
ref_type = "bias",
obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
def test_get_latest_by_l0(self):
rec = self.api.get_latest_by_l0(level0_id='000001301', ref_type = "bias")
print('get:', rec)
def test_get(self):
rec = self.api.get(cal_id='0000231')
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 100, status = 1)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 100, status = 2)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
cal_id='0000231',
detector_no='01',
ref_type = "bias",
obs_time = "2021-06-04 11:12:13",
exp_time = 150,
filename = "/opt/dddasd.params",
file_path = "/opt/dddasd.fits",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
level0_ids = ['0000231','0000232','0000233','0000234'])
print('write:', rec)
\ No newline at end of file
import unittest
from csst_dfs_api_local.mci import Level1DataApi
class MCILevel1DataApiTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1DataApi()
def test_find(self):
recs = self.api.find(
level0_id='0000223',
create_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13")
)
print('find:', recs)
def test_get(self):
rec = self.api.get(id = 1)
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 1, status = 7)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
level0_id='0000223',
data_type = "sci",
cor_sci_id = 2,
prc_params = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-05 11:12:13',
filename = "dddasd223234.fits",
file_path = "/opt/dddasd23.fits",
pipeline_id = "P2",
refs = {'dark': 1, 'bias': 2, 'flat': 3 })
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.mci.level1prc import Level1PrcApi
class MCILevel1PrcTestCase(unittest.TestCase):
def setUp(self):
self.api = Level1PrcApi()
def test_find(self):
recs = self.api.find(level1_id=1)
print('find:', recs)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 1, status = 4)
print('update_proc_status:', rec)
def test_write(self):
rec = self.api.write(level1_id=1,
pipeline_id = "P1",
prc_module = "QC0",
params_file_path = "/opt/dddasd.params",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
result_file_path = "/opt/dddasd.header")
print('write:', rec)
\ No newline at end of file
import os
import unittest
from astropy.io import fits
from csst_dfs_api_local.msc.calmerge import CalMergeApi
class MSCCalMergeApiTestCase(unittest.TestCase):
def setUp(self):
self.api = CalMergeApi()
def test_find(self):
recs = self.api.find(detector_no='01',
ref_type = "bias",
obs_time = ("2021-06-01 11:12:13","2021-06-08 11:12:13"))
print('find:', recs)
def test_get_latest_by_l0(self):
rec = self.api.get_latest_by_l0(level0_id='000001301', ref_type = "bias")
print('get:', rec)
def test_get(self):
rec = self.api.get(cal_id='0000231')
print('get:', rec)
def test_update_proc_status(self):
rec = self.api.update_proc_status(id = 100, status = 1)
print('update_proc_status:', rec)
def test_update_qc1_status(self):
rec = self.api.update_qc1_status(id = 100, status = 2)
print('update_qc1_status:', rec)
def test_write(self):
rec = self.api.write(
cal_id='0000231',
detector_no='01',
ref_type = "bias",
obs_time = "2021-06-04 11:12:13",
exp_time = 150,
filename = "/opt/dddasd.params",
file_path = "/opt/dddasd.fits",
prc_status = 3,
prc_time = '2021-06-04 11:12:13',
level0_ids = ['0000231','0000232','0000233','0000234'])
print('write:', rec)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment