Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-cicd
csst-dag
Commits
1b63883c
Commit
1b63883c
authored
May 09, 2025
by
BO ZHANG
🏀
Browse files
implement general l1
parent
5bc8755f
Changes
1
Hide whitespace changes
Inline
Side-by-side
csst_dag/dag/msc.py
deleted
100644 → 0
View file @
5bc8755f
import
json
from
._base_dag
import
BaseDAG
from
csst_dfs_client
import
plan
,
level0
MSC_MBI_CHIPID
=
[
"06"
,
"07"
,
"08"
,
"09"
,
"11"
,
"12"
,
"13"
,
"14"
,
"15"
,
"16"
,
"17"
,
"18"
,
"19"
,
"20"
,
"22"
,
"23"
,
"24"
,
"25"
,
]
MSC_SLS_CHIPID
=
[
"01"
,
"02"
,
"03"
,
"04"
,
"05"
,
"10"
,
"21"
,
"26"
,
"27"
,
"28"
,
"29"
,
"30"
,
]
MSC_CHIPID
=
MSC_MBI_CHIPID
+
MSC_SLS_CHIPID
CHIPID_MAP
=
{
"csst-msc-l1-mbi"
:
MSC_MBI_CHIPID
,
"csst-msc-l1-sls"
:
MSC_SLS_CHIPID
,
"csst-msc-l1-qc0"
:
MSC_CHIPID
,
}
class
CsstMscL1
(
BaseDAG
):
def
__init__
(
self
,
dag_id
:
str
):
super
().
__init__
(
dag_id
)
self
.
CHIPID
=
CHIPID_MAP
[
dag_id
]
def
schedule
(
self
,
**
kwargs
):
return
self
.
_base_schedule
(
**
kwargs
)
def
_base_schedule
(
self
,
dataset
:
str
=
"csst-msc-c9-25sqdeg-v3"
,
obs_type
:
str
=
"WIDE"
,
project_id
=
"none"
,
batch_id
:
str
|
None
=
"default"
,
initial_prc_status
:
int
=
-
1024
,
final_prc_status
:
int
=
-
2
,
demo
=
True
,
**
kwargs
,
):
# dataset: str = "csst-msc-c9-25sqdeg-v3"
# obs_type: str = "WIDE"
# project_id = "none"
# batch_id: str | None = "default"
# no need to query plan
# plan.find(
# instrument="MSC",
# dataset=dataset,
# obs_type=obs_type,
# project_id=project_id,
# )
recs
=
level0
.
find
(
instrument
=
"MSC"
,
dataset
=
dataset
,
obs_type
=
obs_type
,
project_id
=
project_id
,
prc_status
=
initial_prc_status
,
)
assert
recs
.
success
,
recs
.
message
msgs
=
[]
for
this_rec
in
recs
.
data
:
if
this_rec
[
"detector_no"
]
in
self
.
CHIPID
:
this_msg
=
self
.
gen_msg
(
dataset
=
dataset
,
obs_type
=
obs_type
,
project_id
=
project_id
,
batch_id
=
batch_id
,
obs_id
=
this_rec
[
"obs_id"
],
chip_id
=
this_rec
[
"detector_no"
],
dag_run_id
=
self
.
gen_dag_run_id
(),
**
kwargs
,
)
if
not
demo
:
# push and update
self
.
push
(
this_msg
)
this_update
=
level0
.
update_prc_status
(
level0_id
=
this_rec
[
"level0_id"
],
dag_run_id
=
this_msg
[
"dag_run_id"
],
prc_status
=
final_prc_status
,
dataset
=
dataset
,
)
assert
this_update
.
success
,
this_update
.
message
msgs
.
append
(
this_msg
)
return
msgs
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment