Commit 3fc7615f authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

tackle len(plan_basis)==0 or len(data_basis)==0

parent 69a4724c
...@@ -4,6 +4,9 @@ from typing import Callable, Optional ...@@ -4,6 +4,9 @@ from typing import Callable, Optional
import yaml import yaml
from astropy import table, time from astropy import table, time
import csst_fs
import numpy as np
import functools
from ._dispatcher import Dispatcher from ._dispatcher import Dispatcher
from ..dag_utils import ( from ..dag_utils import (
...@@ -26,6 +29,15 @@ class BaseDAG: ...@@ -26,6 +29,15 @@ class BaseDAG:
and task scheduling. and task scheduling.
""" """
def __init__(self):
# Load default DAG run template
json_path = os.path.join(DAG_CONFIG_DIR, f"default-dag-run.json") # unified
with open(json_path, "r") as f:
self.dag_run_template = json.load(f)
self.dag = ""
@staticmethod @staticmethod
def generate_sha1(): def generate_sha1():
"""Generate a unique SHA1 hash based on current timestamp. """Generate a unique SHA1 hash based on current timestamp.
...@@ -71,6 +83,42 @@ class BaseDAG: ...@@ -71,6 +83,42 @@ class BaseDAG:
created_time=time.Time.now().isot, created_time=time.Time.now().isot,
) )
def generate_dag_run(self, **kwargs) -> dict:
"""Generate a complete DAG run message.
Parameters
----------
**kwargs : Any
Additional keyword arguments to override.
Returns
-------
dict
Complete DAG run message
Raises
------
AssertionError
If any key is not in the message template
"""
# copy template
dag_run = self.dag_run_template.copy()
# update values
dag_run = override_common_keys(dag_run, kwargs)
# set hash
dag_run = override_common_keys(
dag_run,
{
"dag": self.dag,
"dag_run": self.generate_sha1(),
},
)
# It seems that the dag_run_template is already stringified,
# so we don't need to force_string here.
# force values to be string
dag_run = self.force_string_and_int(dag_run)
return dag_run
@staticmethod @staticmethod
def force_string_and_int(d: dict): def force_string_and_int(d: dict):
return force_string_and_int(d) return force_string_and_int(d)
...@@ -86,7 +134,7 @@ class Level2DAG(BaseDAG): ...@@ -86,7 +134,7 @@ class Level2DAG(BaseDAG):
""" """
def __init__(self): def __init__(self):
pass super().__init__()
def schedule(self, plan_basis: table.Table, data_basis: table.Table): def schedule(self, plan_basis: table.Table, data_basis: table.Table):
"""Schedule the DAG for execution. """Schedule the DAG for execution.
...@@ -100,16 +148,6 @@ class Level2DAG(BaseDAG): ...@@ -100,16 +148,6 @@ class Level2DAG(BaseDAG):
""" """
pass pass
def generate_dag_run(self):
"""Generate a DAG run configuration.
Returns
-------
dict
Dictionary containing DAG run configuration
"""
pass
class Level1DAG(BaseDAG): class Level1DAG(BaseDAG):
"""Level 1 DAG base class. """Level 1 DAG base class.
...@@ -147,6 +185,7 @@ class Level1DAG(BaseDAG): ...@@ -147,6 +185,7 @@ class Level1DAG(BaseDAG):
dag : str dag : str
DAG name, must exist in DAG_MAP DAG name, must exist in DAG_MAP
""" """
super().__init__()
# Set DAG name # Set DAG name
self.dag = dag self.dag = dag
self.pattern = pattern self.pattern = pattern
...@@ -154,15 +193,12 @@ class Level1DAG(BaseDAG): ...@@ -154,15 +193,12 @@ class Level1DAG(BaseDAG):
# Load yaml and json config # Load yaml and json config
yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.yml") yml_path = os.path.join(DAG_CONFIG_DIR, f"{dag}.yml")
json_path = os.path.join(DAG_CONFIG_DIR, f"default-dag-run.json") # unified
with open(yml_path, "r") as f: with open(yml_path, "r") as f:
self.dag_cfg = yaml.safe_load(f) self.dag_cfg = yaml.safe_load(f)
assert ( assert (
self.dag_cfg["name"] == self.dag self.dag_cfg["name"] == self.dag
), f"{self.dag_cfg['name']} != {self.dag}" # , f"{dag_cfg} not consistent with definition in .yml file." ), f"{self.dag_cfg['name']} != {self.dag}" # , f"{dag_cfg} not consistent with definition in .yml file."
with open(json_path, "r") as f:
self.dag_run_template = json.load(f)
def run( def run(
self, self,
...@@ -182,7 +218,7 @@ class Level1DAG(BaseDAG): ...@@ -182,7 +218,7 @@ class Level1DAG(BaseDAG):
filter: str | None = None, filter: str | None = None,
prc_status: str | None = None, prc_status: str | None = None,
qc_status: str | None = None, qc_status: str | None = None,
# prc paramters # prc parameters
pmapname: str = "", pmapname: str = "",
ref_cat: str = "", ref_cat: str = "",
extra_kwargs: Optional[dict] = None, extra_kwargs: Optional[dict] = None,
...@@ -191,7 +227,7 @@ class Level1DAG(BaseDAG): ...@@ -191,7 +227,7 @@ class Level1DAG(BaseDAG):
force_success: bool = False, force_success: bool = False,
return_details: bool = False, return_details: bool = False,
# no custom_id # no custom_id
): ) -> tuple[dict, list]:
if self.dispatcher is Dispatcher.dispatch_obsgroup: if self.dispatcher is Dispatcher.dispatch_obsgroup:
assert ( assert (
obs_group is not None obs_group is not None
...@@ -225,6 +261,9 @@ class Level1DAG(BaseDAG): ...@@ -225,6 +261,9 @@ class Level1DAG(BaseDAG):
prc_status=prc_status, prc_status=prc_status,
qc_status=qc_status, qc_status=qc_status,
) )
if len(plan_basis) == 0 or len(data_basis) == 0:
return dag_group_run, []
plan_basis, data_basis = self.filter_basis(plan_basis, data_basis) plan_basis, data_basis = self.filter_basis(plan_basis, data_basis)
dag_run_list = self.schedule( dag_run_list = self.schedule(
dag_group_run=dag_group_run, dag_group_run=dag_group_run,
...@@ -330,39 +369,3 @@ class Level1DAG(BaseDAG): ...@@ -330,39 +369,3 @@ class Level1DAG(BaseDAG):
else: else:
this_task["dag_run"] = None this_task["dag_run"] = None
return task_list return task_list
def generate_dag_run(self, **kwargs) -> dict:
"""Generate a complete DAG run message.
Parameters
----------
kwargs : dict
Additional keyword arguments to override.
Returns
-------
dict
Complete DAG run message
Raises
------
AssertionError
If any key is not in the message template
"""
# copy template
dag_run = self.dag_run_template.copy()
# update values
dag_run = override_common_keys(dag_run, kwargs)
# set hash
dag_run = override_common_keys(
dag_run,
{
"dag": self.dag,
"dag_run": self.generate_sha1(),
},
)
# It seems that the dag_run_template is already stringified,
# so we don't need to force_string here.
# force values to be string
dag_run = self.force_string_and_int(dag_run)
return dag_run
...@@ -4,7 +4,7 @@ from urllib.parse import urlparse ...@@ -4,7 +4,7 @@ from urllib.parse import urlparse
import csst_fs import csst_fs
from astropy import table from astropy import table
from csst_dfs_client import plan, level0, level1, catalog from csst_dfs_client import plan, level0, level1, catalog, dag
from ._csst import csst from ._csst import csst
from .dag_utils import override_common_keys from .dag_utils import override_common_keys
...@@ -264,6 +264,8 @@ class DFS: ...@@ -264,6 +264,8 @@ class DFS:
dfs1_level1_find = level1.find dfs1_level1_find = level1.find
# dfs1_dag_find = dag.find # dfs1_dag_find = dag.find
dfs1_catalog = catalog dfs1_catalog = catalog
# DAG
dag = dag
# file search # file search
dfs2_product_find = csst_fs.query_metadata dfs2_product_find = csst_fs.query_metadata
...@@ -317,6 +319,8 @@ class DFS: ...@@ -317,6 +319,8 @@ class DFS:
def dfs1_find_plan_basis(**kwargs) -> table.Table: def dfs1_find_plan_basis(**kwargs) -> table.Table:
"""Extract plan basis from plan data.""" """Extract plan basis from plan data."""
plan_data = DFS.dfs1_find_plan(**kwargs) plan_data = DFS.dfs1_find_plan(**kwargs)
if len(plan_data) == 0:
return table.Table()
plan_basis = plan_data[DFS1_PLAN_BASIS_KEYS] plan_basis = plan_data[DFS1_PLAN_BASIS_KEYS]
return plan_basis return plan_basis
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment