import glob import os import string import json import numpy as np from astropy import time DAG_RUN_ID_DIGITS = 6 DAG_MESSAGE_TEMPLATE_DIRECTORY = os.path.join(os.path.dirname(__file__), "dag") DAG_YAML_LIST = glob.glob(DAG_MESSAGE_TEMPLATE_DIRECTORY + "/*.yml") DAG_LIST = [os.path.splitext(os.path.basename(_))[0] for _ in DAG_YAML_LIST] # print(DAG_LIST) # [ # "csst-msc-l1-ooc-bias", # "csst-msc-l1-ooc-flat", # "csst-msc-l1-ooc-dark", # "csst-msc-l1-ast", # "csst-msc-l1-qc0", # "csst-hstdm-l1", # "csst-msc-l1-sls", # "csst-msc-l1-mbi", # ] def gen_dag_run_id(digits=6): """ Generate a unique run_id for a dag. """ now = time.Time.now() dag_run_id = now.strftime("%Y%m%d-%H%M%S-") n = len(string.ascii_lowercase) for i in range(digits): dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)] return dag_run_id def get_dag_message_template(dag_id): """ Get the dag message template for a given dag_id. """ if dag_id not in DAG_LIST: raise ValueError(f"Unknown dag_id: {dag_id}") with open(os.path.join(DAG_MESSAGE_TEMPLATE_DIRECTORY, f"{dag_id}.json"), "r") as f: template = json.load(f) return template def gen_msg(dag_id, **kwargs): """ Generate a message """ this_dag_run_id = gen_dag_run_id(DAG_RUN_ID_DIGITS) msg = get_dag_message_template(dag_id) for k, v in kwargs.items(): assert msg.get(k, None) is not None, f"Unknown key: {k}" msg[k] = v return msg