test_dag.py 1.33 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import unittest
from csst_dfs_client import dag

class DAGTestCase(unittest.TestCase):

    def setUp(self):
        pass


    def test_new_group_run(self):
        dag_group_run = {"dag_group": "csst_dag.cli.msc_l1",
            "dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
            "batch_id": "inttest",
            "priority": 1
        }
        dag_run_list = [{
            "dag_group": "csst_dag.cli.msc_l1",
            "dag_group_run": "195244ff176f923aec9a9328c75ecaeb4a8c4345",
            "batch_id": "inttest",
            "priority": 1,
            "dag": "csst-msc-l1-mbi",
            "dag_run": "61b622a5d256806082c668b2d1273668a1eee3ec",
            "dataset": "csst-msc-c9-25sqdeg-v3",
            "obs_type": "WIDE",
            "obs_group": "none",
            "obs_id": "10100232366",
            "detector": "09"
        }]
        result = dag.new_dag_group_run(dag_group_run = dag_group_run, dag_run_list = dag_run_list)
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message) 

    def test_find_dag_group_run(self):
        result = dag.find_group_run(dag_group = "csst_dag.cli.msc_l1", batch_id = "inttest")
        print(result)
        self.assertEqual(result.code, 200, "error code: " + str(result.code) + ", message: " + result.message)