_dag.py 1.53 KB
Newer Older
1
import glob
BO ZHANG's avatar
BO ZHANG committed
2
import os
3
import string
BO ZHANG's avatar
BO ZHANG committed
4
import json
5
6
7
8
9
import numpy as np
from astropy import time

DAG_RUN_ID_DIGITS = 6

BO ZHANG's avatar
BO ZHANG committed
10
DAG_MESSAGE_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag_cfg")
11
12
13
14
15

DAG_YAML_LIST = glob.glob(DAG_MESSAGE_TEMPLATE_DIRECTORY + "/*.yml")

DAG_LIST = [os.path.splitext(os.path.basename(_))[0] for _ in DAG_YAML_LIST]

BO ZHANG's avatar
BO ZHANG committed
16
# print(DAG_MAP)
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# [
#     "csst-msc-l1-ooc-bias",
#     "csst-msc-l1-ooc-flat",
#     "csst-msc-l1-ooc-dark",
#     "csst-msc-l1-ast",
#     "csst-msc-l1-qc0",
#     "csst-hstdm-l1",
#     "csst-msc-l1-sls",
#     "csst-msc-l1-mbi",
# ]


def gen_dag_run_id(digits=6):
    """
BO ZHANG's avatar
BO ZHANG committed
31
    Generate a unique run_id for a dag_cfg.
32
33
34
35
36
37
38
39
40
41
42
43
    """
    now = time.Time.now()
    dag_run_id = now.strftime("%Y%m%d-%H%M%S-")

    n = len(string.ascii_lowercase)
    for i in range(digits):
        dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)]
    return dag_run_id


def get_dag_message_template(dag_id):
    """
BO ZHANG's avatar
BO ZHANG committed
44
    Get the dag_cfg message template for a given dag_cfg.
45
46
    """
    if dag_id not in DAG_LIST:
BO ZHANG's avatar
BO ZHANG committed
47
        raise ValueError(f"Unknown dag_cfg: {dag_id}")
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    with open(os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"{dag_id}.json"), "r") as f:
        template = json.load(f)
    return template


def gen_msg(dag_id, **kwargs):
    """
    Generate a message
    """
    this_dag_run_id = gen_dag_run_id(DAG_RUN_ID_DIGITS)
    msg = get_dag_message_template(dag_id)

    for k, v in kwargs.items():
        assert msg.get(k, None) is not None, f"Unknown key: {k}"
        msg[k] = v
    return msg