import json from astropy import time import numpy as np import string import os from ..msc import MBI_CHIPID, SLS_CHIPID RUN_ID_DIGITS = 10 DAG_TEMPLATE_DIRECTORY = os.path.join( os.path.dirname(os.path.dirname(__file__)), "dags" ) def gen_dag_run_id(digits=10): """ Generate a unique run_id for a dag. """ now = time.Time.now() dag_run_id = now.iso[:10].replace("-", "") n = len(string.ascii_lowercase) for i in range(digits): dag_run_id += string.ascii_lowercase[np.random.randint(low=0, high=n)] return dag_run_id def gen_mbi_level1_dag_message(): pass def gen_level1_dag_message( # dag_id: str = "csst-msc-l1-mbi", obsid: str = "11009101682009", **kwargs, ): if obsid.startswith("1"): # MSC: + chipid assert "chipid" in kwargs.keys(), "chipid is required for MSC obsid" assert len(obsid) in (11, 14) if obsid[1] not in "01": return None chipid = kwargs.get("chipid") if chipid in MBI_CHIPID: dag_id = "csst-msc-l1-mbi" run_id = gen_dag_run_id(digits=RUN_ID_DIGITS) with open(os.path.join(DAG_TEMPLATE_DIRECTORY, dag_id + ".json"), "r") as f: message = json.load(f) # set message values message["dag_id"] = "csst-msc-l1-mbi" message["dag_run_id"] = run_id message["message"]["obsid"] = obsid message["message"]["chipid"] = chipid elif chipid in SLS_CHIPID: dag_id = "csst-msc-l1-sls" run_id = gen_dag_run_id(digits=RUN_ID_DIGITS) with open(os.path.join(DAG_TEMPLATE_DIRECTORY, dag_id + ".json"), "r") as f: message = json.load(f) # set message values message["dag_id"] = "csst-msc-l1-sls" message["dag_run_id"] = run_id message["message"]["obsid"] = obsid message["message"]["chipid"] = chipid else: raise ValueError(f"Invalid chipid: {chipid}") elif obsid.startswith("2"): # MCI: + ? pass elif obsid.startswith("3"): # IFS: + ? pass elif obsid.startswith("4"): # CPIC: + ? pass elif obsid.startswith("5"): # HSTDM: + ? pass else: raise ValueError(f"Unknown obsid: {obsid}") if kwargs.get("print_query_link", False): # print(f"http://localhost:3000/scalebox/run/level0?id={data['dag_run_id']}") pass if kwargs.get("return_dict", False): return message else: message_string = json.dumps(message, ensure_ascii=False, indent=None) return message_string