calmerge.py 8.98 KB
Newer Older
Wei Shoulin's avatar
c3  
Wei Shoulin committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import logging
import time, datetime
import shutil

from ..common.db import DBClient
from ..common.utils import *
from csst_dfs_commons.models import Result
from csst_dfs_commons.models.facility import CalMergeRecord
from csst_dfs_commons.models.common import from_dict_list

log = logging.getLogger('csst')

class CalMergeApi(object):
    def __init__(self):
        self.root_dir = os.getenv("CSST_LOCAL_FILE_ROOT", "/opt/temp/csst")
        self.db = DBClient()

    def find(self, **kwargs):
        ''' retrieve calibration merge records from database

        parameter kwargs:
            detector_no: [str]
            ref_type: [str]
            obs_time: (start,end)
            qc1_status : [int]
            prc_status : [int]
            file_name: [str]
            limit: limits returns the number of records,default 0:no-limit

        return: csst_dfs_common.models.Result
        '''
        try:
            detector_no = get_parameter(kwargs, "detector_no")
            ref_type = get_parameter(kwargs, "ref_type")
            exp_time_start = get_parameter(kwargs, "obs_time", [None, None])[0]
            exp_time_end = get_parameter(kwargs, "obs_time", [None, None])[1]              
            qc1_status = get_parameter(kwargs, "qc1_status")
            prc_status = get_parameter(kwargs, "prc_status")
            file_name = get_parameter(kwargs, "file_name")
            limit = get_parameter(kwargs, "limit", 0)

            sql_count = "select count(*) as c from t_cal_merge where 1=1"
            sql_data = f"select * from t_cal_merge where 1=1"

            sql_condition = "" 
            if detector_no:
                sql_condition = f"{sql_condition} and detector_no='{detector_no}'"
            if ref_type:
                sql_condition = f"{sql_condition} and ref_type='{ref_type}'"
            if exp_time_start:
                sql_condition = f"{sql_condition} and obs_time >='{exp_time_start}'"
            if exp_time_end:
                sql_condition = f"{sql_condition} and obs_time <='{exp_time_end}'"
            if qc1_status:
                sql_condition = f"{sql_condition} and qc1_status={qc1_status}"
            if prc_status:
                sql_condition = f"{sql_condition} and prc_status={prc_status}"   

            if file_name:
                sql_condition = f" and filename={file_name}"  

            sql_count = f"{sql_count} {sql_condition}"
            sql_data = f"{sql_data} {sql_condition}"

            log.info(sql_count)
            log.info(sql_data)

            if limit > 0:
                sql_data = f"{sql_data} limit {limit}"   

            totalCount = self.db.select_one(sql_count)
            _, records = self.db.select_many(sql_data)

            return Result.ok_data(data=from_dict_list(CalMergeRecord, records)).append("totalCount", totalCount['c'])

        except Exception as e:
            return Result.error(message=str(e))

    def get(self, **kwargs):
        '''  fetch a record from database

        parameter kwargs:
Wei Shoulin's avatar
Wei Shoulin committed
84
85
            id : [int],
            cal_id : [str]
Wei Shoulin's avatar
c3  
Wei Shoulin committed
86
87
88

        return csst_dfs_common.models.Result
        '''
Wei Shoulin's avatar
Wei Shoulin committed
89
90
91
92
93
94
95
96
97
98
99
100
        id = get_parameter(kwargs, "id", 0)
        cal_id = get_parameter(kwargs, "cal_id", "")

        if id == 0 and cal_id == "":
            return Result.error(message="at least define id or cal_id") 

        if id != 0: 
            return self.get_by_id(id)
        if cal_id != "": 
            return self.get_by_cal_id(cal_id)

    def get_by_id(self, id: str):
Wei Shoulin's avatar
c3  
Wei Shoulin committed
101
102
103
104
105
106
107
108
109
110
111
112
113
        try:
            r = self.db.select_one(
                "select * from t_cal_merge where id=?", (id,))
            if r:

                sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" 
                _, records = self.db.select_many(sql_get_level0_id)
                level0_ids = [r["level0_id"] for r in records]

                rec = CalMergeRecord().from_dict(r)
                rec.level0_ids = level0_ids
                return Result.ok_data(data=rec)
            else:
Wei Shoulin's avatar
Wei Shoulin committed
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
                return Result.error(message=f"id:{id} not found")
                    
        except Exception as e:
            log.error(e)
            return Result.error(message=str(e))

    def get_by_cal_id(self, cal_id: str):
        try:
            r = self.db.select_one(
                "select * from t_cal_merge where cal_id=?", (cal_id,))
            if r:

                sql_get_level0_id = f"select level0_id from t_cal2level0 where merge_id={r['id']}" 
                _, records = self.db.select_many(sql_get_level0_id)
                level0_ids = [r["level0_id"] for r in records]

                rec = CalMergeRecord().from_dict(r)
                rec.level0_ids = level0_ids
                return Result.ok_data(data=rec)
            else:
                return Result.error(message=f"id:{id} not found")
                    
Wei Shoulin's avatar
c3  
Wei Shoulin committed
136
137
138
139
140
141
142
143
144
        except Exception as e:
            log.error(e)
            return Result.error(message=str(e))

    def update_qc1_status(self, **kwargs):
        ''' update the status of reduction

        parameter kwargs:
            id : [int],
Wei Shoulin's avatar
Wei Shoulin committed
145
            cal_id : [str],
Wei Shoulin's avatar
c3  
Wei Shoulin committed
146
147
148
149
            status : [int]

        return csst_dfs_common.models.Result
        '''
Wei Shoulin's avatar
Wei Shoulin committed
150
151
152
153
154
155
156
157
        id = get_parameter(kwargs, "id", 0)
        cal_id = get_parameter(kwargs, "cal_id", "")
        result = self.get(id = id, cal_id = cal_id)

        if not result.success:
            return Result.error(message="not found")

        id = result.data.id
Wei Shoulin's avatar
c3  
Wei Shoulin committed
158
        status = get_parameter(kwargs, "status")
Wei Shoulin's avatar
Wei Shoulin committed
159

Wei Shoulin's avatar
c3  
Wei Shoulin committed
160
161
162
163
164
165
166
167
168
169
        try:
            self.db.execute(
                'update t_cal_merge set qc1_status=?, qc1_time=? where id=?',
                (status, format_time_ms(time.time()), id)
            )  
            self.db.end() 
            return Result.ok_data()
           
        except Exception as e:
            log.error(e)
Wei Shoulin's avatar
Wei Shoulin committed
170
            return Result.error(message=str(e))
Wei Shoulin's avatar
c3  
Wei Shoulin committed
171
172
173
174
175
176

    def update_proc_status(self, **kwargs):
        ''' update the status of reduction

        parameter kwargs:
            id : [int],
Wei Shoulin's avatar
Wei Shoulin committed
177
            cal_id : [str],
Wei Shoulin's avatar
c3  
Wei Shoulin committed
178
179
180
181
            status : [int]

        return csst_dfs_common.models.Result
        '''
Wei Shoulin's avatar
Wei Shoulin committed
182
183
184
185
186
187
188
189
        id = get_parameter(kwargs, "id", 0)
        cal_id = get_parameter(kwargs, "cal_id", "")
        result = self.get(id = id, cal_id = cal_id)

        if not result.success:
            return Result.error(message="not found")

        id = result.data.id
Wei Shoulin's avatar
c3  
Wei Shoulin committed
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
        status = get_parameter(kwargs, "status")

        try:
            existed = self.db.exists(
                "select * from t_cal_merge where id=?",
                (id,)
            )
            if not existed:
                log.warning('%s not found' %(id, ))
                return Result.error(message ='%s not found' %(id, ))
            self.db.execute(
                'update t_cal_merge set prc_status=?, prc_time=? where id=?',
                (status, format_time_ms(time.time()), id)
            )
            self.db.end() 
            return Result.ok_data()
           
        except Exception as e:
            log.error(e)
Wei Shoulin's avatar
Wei Shoulin committed
209
            return Result.error(message=str(e))
Wei Shoulin's avatar
c3  
Wei Shoulin committed
210
211
212
213
214

    def write(self, **kwargs):
        ''' insert a calibration merge record into database
 
        parameter kwargs:
Wei Shoulin's avatar
Wei Shoulin committed
215
            cal_id : [str]
Wei Shoulin's avatar
c3  
Wei Shoulin committed
216
217
218
219
220
221
222
223
224
225
226
227
228
229
            detector_no : [str]
            ref_type : [str]
            obs_time : [str]
            exp_time : [float]
            prc_status : [int]
            prc_time : [str]
            filename : [str]
            file_path : [str]
            level0_ids : [list]
        return csst_dfs_common.models.Result
        '''   

        rec = CalMergeRecord(
            id = 0,
Wei Shoulin's avatar
Wei Shoulin committed
230
            cal_id =  get_parameter(kwargs, "cal_id"),
Wei Shoulin's avatar
c3  
Wei Shoulin committed
231
232
233
234
235
236
            detector_no = get_parameter(kwargs, "detector_no"),
            ref_type = get_parameter(kwargs, "ref_type"),
            obs_time = get_parameter(kwargs, "obs_time"),
            exp_time = get_parameter(kwargs, "exp_time"),
            filename = get_parameter(kwargs, "filename"),
            file_path = get_parameter(kwargs, "file_path"),
Wei Shoulin's avatar
Wei Shoulin committed
237
            prc_status = get_parameter(kwargs, "prc_status", -1),
Wei Shoulin's avatar
c3  
Wei Shoulin committed
238
239
240
241
242
            prc_time = get_parameter(kwargs, "prc_time"),
            level0_ids = get_parameter(kwargs, "level0_ids", [])
        )
        try:
            self.db.execute(
Wei Shoulin's avatar
Wei Shoulin committed
243
244
245
                'INSERT INTO t_cal_merge (cal_id,detector_no,ref_type,obs_time,exp_time,filename,file_path,prc_status,prc_time,create_time) \
                    VALUES(?,?,?,?,?,?,?,?,?,?)',
                (rec.cal_id, rec.detector_no, rec.ref_type, rec.obs_time, rec.exp_time, rec.filename, rec.file_path,rec.prc_status,rec.prc_time,format_time_ms(time.time()))
Wei Shoulin's avatar
c3  
Wei Shoulin committed
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
            )
            self.db.end()
            rec.id = self.db.last_row_id()

            sql_level0_ids = "insert into t_cal2level0 (merge_id,level0_id) values "
            values = ["(%s,%s)"%(rec.id,i) for i in rec.level0_ids]
            _ = self.db.execute(sql_level0_ids + ",".join(values))            

            self.db.end()

            return Result.ok_data(data=rec)

        except Exception as e:
            log.error(e)
            return Result.error(message=str(e))