Commit b9d13950 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

add Dispatcher.dispatch_obsgroup_detector

parent 8c272e9d
......@@ -543,14 +543,22 @@ class Dispatcher:
return task_list
@staticmethod
def disptach_obsgroup_detector(
def dispatch_obsgroup_detector(
plan_basis: table.Table,
data_basis: table.Table,
n_jobs: int = 1,
):
# unique obsgroup basis
obsgroup_basis = table.unique(
plan_basis[
# parallel
if n_jobs != 1:
task_list = joblib.Parallel(n_jobs=n_jobs)(
joblib.delayed(Dispatcher.dispatch_obsid)(plan_basis, _)
for _ in split_data_basis(data_basis, n_split=n_jobs)
)
return sum(task_list, [])
# unique obsgroup basis (using group_by)
obsgroup_basis = plan_basis.group_by(
keys=[
"dataset",
"instrument",
"obs_type",
......@@ -558,6 +566,79 @@ class Dispatcher:
]
)
# initialize task list
task_list = []
# loop over obsgroup
for i_obsgroup in trange(len(obsgroup_basis.groups), **TQDM_KWARGS):
this_obsgroup_basis = obsgroup_basis.groups[i_obsgroup]
this_obsgroup_obsid = this_obsgroup_basis["obs_id"].data
n_file_expected = this_obsgroup_basis["n_file"].sum()
this_instrument = this_obsgroup_basis["instrument"][0]
effective_detector_names = csst[this_instrument].effective_detector_names
for this_effective_detector_name in effective_detector_names:
this_task = dict(
dataset=this_obsgroup_basis["dataset"][0],
instrument=this_obsgroup_basis["instrument"][0],
obs_type=this_obsgroup_basis["obs_type"][0],
obs_group=this_obsgroup_basis["obs_group"][0],
detector=this_effective_detector_name,
)
this_obsgroup_detector_expected = table.Table(
[
dict(
dataset=this_obsgroup_basis["dataset"][0],
instrument=this_obsgroup_basis["instrument"][0],
obs_type=this_obsgroup_basis["obs_type"][0],
obs_group=this_obsgroup_basis["obs_group"][0],
obs_id=this_obsid,
detector=this_effective_detector_name,
)
for this_obsid in this_obsgroup_obsid
]
)
this_obsgroup_detector_found = table.join(
this_obsgroup_detector_expected,
data_basis,
keys=[
"dataset",
"instrument",
"obs_type",
"obs_group",
"obs_id",
"detector",
],
join_type="inner",
)
n_file_found = len(this_obsgroup_detector_found)
this_success = n_file_found == n_file_expected and set(
this_obsgroup_detector_found["obs_id"]
) == set(this_obsgroup_obsid)
# set n_file_expected and n_file_found
this_task["n_file_expected"] = n_file_expected
this_task["n_file_found"] = n_file_found
# append this task
task_list.append(
dict(
task=this_task,
success=this_success,
relevant_plan=this_obsgroup_basis,
relevant_data=this_obsgroup_detector_found,
n_relevant_plan=len(this_obsgroup_basis),
n_relevant_data=len(this_obsgroup_detector_found),
relevant_data_id_list=(
list(this_obsgroup_detector_found["_id"])
if n_file_found > 0
else []
),
n_file_expected=n_file_expected,
n_file_found=n_file_found,
)
)
return task_list
@staticmethod
def dispatch_obsgroup(
plan_basis: table.Table,
......@@ -658,9 +739,6 @@ class Dispatcher:
)
return task_list
def dispatch_obsgroup_detector(self):
pass
@staticmethod
def load_test_data() -> tuple:
import joblib
......
......@@ -48,6 +48,15 @@ print(
print(task_list_via_obsgroup[0]["relevant_plan"].colnames)
print(task_list_via_obsgroup[0]["relevant_data"].colnames)
# 16 task/s @n_jobs=1, 130*10 tasks/s @n_jobs=10 (max) 🔼
task_list_via_obsgroup_detector = Dispatcher.dispatch_obsgroup_detector(
plan_basis, data_basis, n_jobs=1
)
t = Table(task_list_via_obsgroup_detector)
np.unique(t["n_relevant_plan"], return_counts=True)
np.unique(t["success"], return_counts=True)
print(task_list_via_obsgroup_detector[0]["relevant_plan"].colnames)
print(task_list_via_obsgroup_detector[0]["relevant_data"].colnames)
# relevant plan_basis:
# ['dataset', 'instrument', 'obs_type', 'obs_group', 'obs_id', 'detector', 'n_file', '_id']
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment