An error occurred while loading the file. Please try again.
-
Wei Shoulin authored
#11599 merged 1 commit into apache : master from sridhar-rl : fix-run-id run_id -> dag_run_id, pipeline_run_id -> dag_id #11599 <here is a image 52677f8958282441-0e06207544895614>
6612f40d
import unittest
from csst_dfs_client import level1
class Level1TestCase(unittest.TestCase):
def setUp(self):
pass
def test_find(self):
result = level1.find()
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code))
self.assertIsNotNone(result.data, "error message: " + result.message)
def test_find_by_brick_id(self):
result = level1.find_by_brick_id(brick_id = 401847)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
self.assertIsNotNone(result.data, "error message: " + result.message)
def test_find_by_level1_id(self):
result = level1.find_by_level1_id(level1_id = "1060940003452925")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
self.assertIsNotNone(result.data, "error message: " + result.message)
def test_update_qc1_status(self):
result = level1.update_qc1_status(level1_id = "1060940003452925", file_type='SCI', qc1_status=1)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_update_prc_status(self):
result = level1.update_prc_status(level1_id = "1060940003452925", file_type='SCI', prc_status=3)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_write(self):
file_path = "/Users/wsl/temp/csst/import/CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L1_V01.fits"
# from io import BytesIO
# with open(file_path, "rb") as file:
# file_path = BytesIO(file.read())
result = level1.write(local_file = file_path,
module_id = "MSC",
level0_id="1010910015799127",
level1_id = "1010910015799127",
dag_id = "csst-msc-l1-mbi",
file_name= "CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L1_V01.fits",
pmapname="csst_000128.pmap",
file_type='SCI',
build=1,
qc1_status=10,
prc_status=12)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_process_list(self):
result = level1.process_list(level1_id="1060940003452925")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_add_process(self):
result = level1.add_process(level1_id="1060940003452925", dag_id="csst-msc-l1-mbi", dag_run_id="202411071002481234", prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="")
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
def test_sls_find_by_qc1_status(self):
result = level1.sls_find_by_qc1_status(qc1_status=0, limit=1)
print(result)
self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)