Commit de5fce99 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add data_id_set

parent 4e275394
...@@ -82,11 +82,12 @@ dag_group_run = BaseDAG.gen_dag_group_run( ...@@ -82,11 +82,12 @@ dag_group_run = BaseDAG.gen_dag_group_run(
print("\n"*2) print("\n"*2)
print(">>> Matching DAGs ...") print(">>> Matching DAGs ...")
dag_run_list = [] dag_run_list = []
data_id_set = set()
for dag in [ for dag in [
"csst-cpic-l1", "csst-cpic-l1",
"csst-cpic-l1-qc0", "csst-cpic-l1-qc0",
]: ]:
this_dag_run_list = CSST_DAGS[dag].schedule( this_dag_run_list, this_data_id_set = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run, dag_group_run=dag_group_run,
plan_basis=plan_basis, plan_basis=plan_basis,
data_basis=data_basis, data_basis=data_basis,
...@@ -94,6 +95,7 @@ for dag in [ ...@@ -94,6 +95,7 @@ for dag in [
) )
print(f"- [{dag}] : {len(this_dag_run_list)} dag_runs") print(f"- [{dag}] : {len(this_dag_run_list)} dag_runs")
dag_run_list.extend(this_dag_run_list) dag_run_list.extend(this_dag_run_list)
data_id_set.union(this_data_id_set)
# print dag_group_run and dag_run_list # print dag_group_run and dag_run_list
print("\n") print("\n")
......
...@@ -123,13 +123,14 @@ dag_group_run = BaseDAG.gen_dag_group_run( ...@@ -123,13 +123,14 @@ dag_group_run = BaseDAG.gen_dag_group_run(
print("\n"*2) print("\n"*2)
print(">>> Matching DAGs ...") print(">>> Matching DAGs ...")
dag_run_list = [] dag_run_list = []
data_id_set = set()
for dag in [ for dag in [
"csst-msc-l1-mbi", "csst-msc-l1-mbi",
"csst-msc-l1-ast", "csst-msc-l1-ast",
"csst-msc-l1-sls", "csst-msc-l1-sls",
"csst-msc-l1-qc0", "csst-msc-l1-qc0",
]: ]:
this_dag_run_list = CSST_DAGS[dag].schedule( this_dag_run_list, this_data_id_set = CSST_DAGS[dag].schedule(
dag_group_run=dag_group_run, dag_group_run=dag_group_run,
plan_basis=plan_basis, plan_basis=plan_basis,
data_basis=data_basis, data_basis=data_basis,
...@@ -138,6 +139,7 @@ for dag in [ ...@@ -138,6 +139,7 @@ for dag in [
) )
print(f"- [{dag}] : {len(this_dag_run_list)} dag_runs") print(f"- [{dag}] : {len(this_dag_run_list)} dag_runs")
dag_run_list.extend(this_dag_run_list) dag_run_list.extend(this_dag_run_list)
data_id_set.union(this_data_id_set)
# print dag_group_run and dag_run_list # print dag_group_run and dag_run_list
print("\n") print("\n")
......
...@@ -103,22 +103,29 @@ class BaseDAG: ...@@ -103,22 +103,29 @@ class BaseDAG:
dag_group_run: dict, dag_group_run: dict,
data_basis: table.Table, data_basis: table.Table,
plan_basis: table.Table, plan_basis: table.Table,
force_success: bool = False,
**kwargs, **kwargs,
) -> list[dict]: ) -> tuple[list, set]:
# filter plan and data basis
filtered_plan_basis, filtered_data_basis = self.filter_basis( filtered_plan_basis, filtered_data_basis = self.filter_basis(
plan_basis, data_basis plan_basis, data_basis
) )
# dispatch tasks
task_list = self.dispatcher(filtered_plan_basis, filtered_data_basis) task_list = self.dispatcher(filtered_plan_basis, filtered_data_basis)
dag_run_list = [] # convert tasks to dag_run_list
dag_run_list = [] # each element is a dag_run (dict)
data_id_set = set() # each element is a data_id (str)
for this_task in task_list: for this_task in task_list:
dag_run = self.gen_dag_run( # only convert success tasks
**dag_group_run, if force_success or this_task["success"]:
**this_task["task"], dag_run = self.gen_dag_run(
**kwargs, **dag_group_run,
) **this_task["task"],
dag_run_list.append(dag_run) **kwargs,
)
return dag_run_list dag_run_list.append(dag_run)
data_id_set.union(this_task["relevant_data"]["_id"])
return dag_run_list, data_id_set
@staticmethod @staticmethod
def generate_sha1(): def generate_sha1():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment