Commit 4a427749 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

add pipeline_id

parent 6665b215
...@@ -314,7 +314,9 @@ create table msc_level2_data ( ...@@ -314,7 +314,9 @@ create table msc_level2_data (
prc_time datetime null, prc_time datetime null,
qc2_status tinyint(1), qc2_status tinyint(1),
qc2_time datetime null, qc2_time datetime null,
create_time datetime null create_time datetime null,
import_status tinyint(1) default 0,
pipeline_id VARCHAR(64) null
); );
create table msc_level2_header ( create table msc_level2_header (
...@@ -323,6 +325,27 @@ create table msc_level2_header ( ...@@ -323,6 +325,27 @@ create table msc_level2_header (
dec_obj float8 null, dec_obj float8 null,
constraint PK_MSC_LEVEL2_HEADER primary key (id) constraint PK_MSC_LEVEL2_HEADER primary key (id)
); );
create table msc_level2co_data (
id integer PRIMARY KEY autoincrement,
data_type VARCHAR(64) not null,
filename VARCHAR(128) null,
file_path VARCHAR(256) null,
prc_status tinyint(1),
prc_time datetime null,
qc2_status tinyint(1),
qc2_time datetime null,
create_time datetime null,
import_status tinyint(1) default 0,
pipeline_id VARCHAR(64) null
);
create table msc_level2co_header (
id BIGINT not null,
ra_obj float8 null,
dec_obj float8 null,
constraint PK_MSC_LEVEL2CO_HEADER primary key (id)
);
/*===========================ifs===================================*/ /*===========================ifs===================================*/
create table ifs_level0_data create table ifs_level0_data
( (
......
...@@ -143,7 +143,7 @@ class Level2DataApi(object): ...@@ -143,7 +143,7 @@ class Level2DataApi(object):
log.warning('%s not found' %(fits_id, )) log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, )) return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute( self.db.execute(
'update msc_level2_data set qc1_status=?, qc1_time=? where id=?', 'update msc_level2_data set qc2_status=?, qc2_time=? where id=?',
(status, format_time_ms(time.time()), fits_id) (status, format_time_ms(time.time()), fits_id)
) )
self.db.end() self.db.end()
...@@ -174,7 +174,8 @@ class Level2DataApi(object): ...@@ -174,7 +174,8 @@ class Level2DataApi(object):
filename = get_parameter(kwargs, "filename"), filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"), file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1), prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())) prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id", "")
) )
existed = self.db.exists( existed = self.db.exists(
"select * from msc_level2_data where filename=?", "select * from msc_level2_data where filename=?",
...@@ -185,9 +186,9 @@ class Level2DataApi(object): ...@@ -185,9 +186,9 @@ class Level2DataApi(object):
return Result.error(message=f'{rec.filename} has already been existed') return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute( self.db.execute(
'INSERT INTO msc_level2_data (level1_id,data_type,filename,file_path,qc2_status,prc_status,prc_time,create_time) \ 'INSERT INTO msc_level2_data (level1_id,data_type,filename,file_path,qc2_status,prc_status,prc_time,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?)', VALUES(?,?,?,?,?,?,?,?,?)',
(rec.level1_id, rec.data_type, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),) (rec.level1_id, rec.data_type, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id)
) )
self.db.end() self.db.end()
rec.id = self.db.last_row_id() rec.id = self.db.last_row_id()
......
import os
import logging
import time, datetime
import shutil
from traceback import print_stack
from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.common import from_dict_list
from csst_dfs_commons.models.msc import Level2CoRecord, Level2CatalogRecord
log = logging.getLogger('csst')
class Level2CoApi(object):
def __init__(self, sub_system = "msc"):
self.sub_system = sub_system
self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
self.db = DBClient()
def catalog_query(self, **kwargs):
return Result.error(message = 'level2 catalog not support in the local mode' )
def find(self, **kwargs):
''' retrieve level2 records from database
:param kwargs: Parameter dictionary, key items support:
data_type: [str]
create_time : (start, end),
qc2_status : [int],
prc_status : [int],
filename: [str]
limit: limits returns the number of records,default 0:no-limit
:returns: csst_dfs_common.models.Result
'''
try:
data_type = get_parameter(kwargs, "data_type")
create_time_start = get_parameter(kwargs, "create_time", [None, None])[0]
create_time_end = get_parameter(kwargs, "create_time", [None, None])[1]
qc2_status = get_parameter(kwargs, "qc1_status")
prc_status = get_parameter(kwargs, "prc_status")
filename = get_parameter(kwargs, "filename")
limit = get_parameter(kwargs, "limit", 0)
sql_count = "select count(*) as c from msc_level2co_data where 1=1"
sql_data = f"select * from msc_level2co_data where 1=1"
sql_condition = ""
if data_type:
sql_condition = f"{sql_condition} and data_type='{data_type}'"
if create_time_start:
sql_condition = f"{sql_condition} and create_time >='{create_time_start}'"
if create_time_end:
sql_condition = f"{sql_condition} and create_time <='{create_time_end}'"
if qc2_status:
sql_condition = f"{sql_condition} and qc2_status={qc2_status}"
if prc_status:
sql_condition = f"{sql_condition} and prc_status={prc_status}"
if filename:
sql_condition = f" and filename='{filename}'"
sql_count = f"{sql_count} {sql_condition}"
sql_data = f"{sql_data} {sql_condition}"
if limit > 0:
sql_data = f"{sql_data} limit {limit}"
totalCount = self.db.select_one(sql_count)
_, recs = self.db.select_many(sql_data)
return Result.ok_data(data=from_dict_list(Level2CoRecord, recs)).append("totalCount", totalCount['c'])
except Exception as e:
return Result.error(message=str(e))
def get(self, **kwargs):
'''
parameter kwargs:
id = [int]
return csst_dfs_common.models.Result
'''
try:
the_id = get_parameter(kwargs, "id", -1)
r = self.db.select_one(
"select * from msc_level2co_data where id=?", (the_id,))
if r:
return Result.ok_data(data=Level2CoRecord().from_dict(r))
else:
return Result.error(message=f"id:{the_id} not found")
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_proc_status(self, **kwargs):
''' update the status of reduction
parameter kwargs:
id : [int],
status : [int]
return csst_dfs_common.models.Result
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from msc_level2co_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update msc_level2co_data set prc_status=?, prc_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def update_qc2_status(self, **kwargs):
''' update the status of QC2
parameter kwargs:
id : [int],
status : [int]
'''
fits_id = get_parameter(kwargs, "id")
status = get_parameter(kwargs, "status")
try:
existed = self.db.exists(
"select * from msc_level2co_data where id=?",
(fits_id,)
)
if not existed:
log.warning('%s not found' %(fits_id, ))
return Result.error(message ='%s not found' %(fits_id, ))
self.db.execute(
'update msc_level2co_data set qc2_status=?, qc2_time=? where id=?',
(status, format_time_ms(time.time()), fits_id)
)
self.db.end()
return Result.ok_data()
except Exception as e:
log.error(e)
return Result.error(message=str(e))
def write(self, **kwargs):
''' insert a level2 record into database
parameter kwargs:
level1_id: [int]
data_type : [str]
filename : [str]
file_path : [str]
prc_status : [int]
prc_time : [str]
return csst_dfs_common.models.Result
'''
try:
rec = Level2CoRecord(
id = 0,
data_type = get_parameter(kwargs, "data_type"),
filename = get_parameter(kwargs, "filename"),
file_path = get_parameter(kwargs, "file_path"),
prc_status = get_parameter(kwargs, "prc_status", -1),
prc_time = get_parameter(kwargs, "prc_time", format_datetime(datetime.now())),
pipeline_id = get_parameter(kwargs, "pipeline_id", "")
)
existed = self.db.exists(
"select * from msc_level2co_data where filename=?",
(rec.filename,)
)
if existed:
log.error(f'{rec.filename} has already been existed')
return Result.error(message=f'{rec.filename} has already been existed')
self.db.execute(
'INSERT INTO msc_level2co_data (data_type,filename,file_path,qc2_status,prc_status,prc_time,create_time,pipeline_id) \
VALUES(?,?,?,?,?,?,?,?,?)',
(rec.data_type, rec.filename, rec.file_path, -1, rec.prc_status, rec.prc_time, format_time_ms(time.time()),rec.pipeline_id,)
)
self.db.end()
rec.id = self.db.last_row_id()
return Result.ok_data(data=rec)
except Exception as e:
log.error(e)
return Result.error(message=str(e))
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment