An error occurred while loading the file. Please try again.
test_level1.py 3.21 KiB
import unittest
from csst_dfs_client import level1
class Level1TestCase(unittest.TestCase):
    def setUp(self):
        pass
    def test_find(self):
        result = level1.find()
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code))
        self.assertIsNotNone(result.data, "error message: " + result.message)
    def test_find_by_brick_id(self):
        result = level1.find_by_brick_id(brick_id = 401847)
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
        self.assertIsNotNone(result.data, "error message: " + result.message)
    def test_find_by_level1_id(self):
        result = level1.find_by_level1_id(level1_id = "1060940003452925")
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
        self.assertIsNotNone(result.data, "error message: " + result.message)
    def test_update_qc1_status(self):
        result = level1.update_qc1_status(level1_id = "1060940003452925", file_type='SCI', qc1_status=1)
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
    def test_update_prc_status(self):
        result = level1.update_prc_status(level1_id = "1060940003452925", file_type='SCI', prc_status=3)
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
    def test_write(self):
        file_path = "/Users/wsl/temp/csst/import/CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L1_V01.fits"
        # from io import BytesIO
        # with open(file_path, "rb") as file:
        #     file_path = BytesIO(file.read())
        result = level1.write(local_file = file_path, 
            module_id = "MSC",
            level0_id="1010910015799127",
            level1_id = "1010910015799127", 
            dag_id = "csst-msc-l1-mbi",
            file_name= "CSST_MSC_MS_SCI_20240609181116_20240609181347_10109100157991_27_L1_V01.fits",
            pmapname="csst_000128.pmap",
            file_type='SCI', 
            build=1,
            qc1_status=10,
            prc_status=12)
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
    def test_process_list(self):
        result = level1.process_list(level1_id="1060940003452925")
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)
    def test_add_process(self):
        result = level1.add_process(level1_id="1060940003452925", dag_id="csst-msc-l1-mbi", dag_run_id="202411071002481234", prc_time="2024-11-07 10:24:12", prc_status=1, prc_module="MSC", message="")
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)   
    def test_sls_find_by_qc1_status(self):
        result = level1.sls_find_by_qc1_status(qc1_status=0, limit=1)
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)