__init__.py 2.62 KB
Newer Older
BO ZHANG's avatar
BO ZHANG committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import json
from astropy import time
import numpy as np
import string
import os
from ..msc import MBI_CHIPID, SLS_CHIPID

RUN_ID_DIGITS = 10
DAG_TEMPLATE_DIRECTORY = os.path.join(
    os.path.dirname(os.path.dirname(__file__)), "dags"
)


def gen_dag_run_id(digits=10):
    """
    Generate a unique run_id for a dag.
    """
    now = time.Time.now()
    dag_run_id = now.iso[:10].replace("-", "")

    n = len(string.ascii_lowercase)
    for i in range(digits):
        dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
    return dag_run_id


def gen_mbi_level1_dag_message():
    pass


def gen_level1_dag_message(
    # dag_id: str = "csst-msc-l1-mbi",
    obsid: str = "11009101682009",
    **kwargs,
):
    if obsid.startswith("1"):
        # MSC: + chipid
        assert "chipid" in kwargs.keys(), "chipid is required for MSC obsid"
        assert len(obsid) in (11, 14)

        if obsid[1] not in "01":
            return None

        chipid = kwargs.get("chipid")
        if chipid in MBI_CHIPID:
            dag_id = "csst-msc-l1-mbi"
            run_id = gen_dag_run_id(digits=RUN_ID_DIGITS)
            with open(os.path.join(DAG_TEMPLATE_DIRECTORY, dag_id + ".json"), "r") as f:
                message = json.load(f)
            # set message values
            message["dag_id"] = "csst-msc-l1-mbi"
            message["dag_run_id"] = run_id
            message["message"]["obsid"] = obsid
            message["message"]["chipid"] = chipid
        elif chipid in SLS_CHIPID:
            dag_id = "csst-msc-l1-sls"
            run_id = gen_dag_run_id(digits=RUN_ID_DIGITS)
            with open(os.path.join(DAG_TEMPLATE_DIRECTORY, dag_id + ".json"), "r") as f:
                message = json.load(f)
            # set message values
            message["dag_id"] = "csst-msc-l1-sls"
            message["dag_run_id"] = run_id
            message["message"]["obsid"] = obsid
            message["message"]["chipid"] = chipid
        else:
            raise ValueError(f"Invalid chipid: {chipid}")

    elif obsid.startswith("2"):
        # MCI: + ?
        pass
    elif obsid.startswith("3"):
        # IFS: + ?
        pass
    elif obsid.startswith("4"):
        # CPIC: + ?
        pass
    elif obsid.startswith("5"):
        # HSTDM: + ?
        pass
    else:
        raise ValueError(f"Unknown obsid: {obsid}")

    if kwargs.get("print_query_link", False):
        # print(f"http://localhost:3000/scalebox/run/level0?id={data['dag_run_id']}")
        pass

    if kwargs.get("return_dict", False):
        return message
    else:
        message_string = json.dumps(message, ensure_ascii=False, indent=None)
        return message_string