Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
csst-cicd
csst-dag
Commits
3ab5bd4a
Commit
3ab5bd4a
authored
May 05, 2025
by
BO ZHANG
🏀
Browse files
tweaks
parent
554f808c
Changes
3
Show whitespace changes
Inline
Side-by-side
csst_dag/dag.py
→
csst_dag/
_
dag.py
View file @
3ab5bd4a
File moved
csst_dag/dag/_base_dag.py
View file @
3ab5bd4a
from
abc
import
ABC
,
abstractmethod
from
._dag_list
import
DAG_LIST
import
json
import
yaml
import
os
import
glob
import
string
import
json
import
numpy
as
np
from
astropy
import
time
DAG_RUN_ID_DIGITS
=
6
DAG_CONFIG_DIR
=
os
.
path
.
join
(
os
.
path
.
dirname
(
os
.
path
.
dirname
(
__file__
)),
"dag_config"
,
)
"""
- BaseTrigger
...
...
@@ -23,20 +34,33 @@ class BaseDAG(ABC):
self
.
dag
[
"dag_id"
]
==
self
.
dag_id
),
f
"
{
dag_id
}
not consistent with definition in .yml file."
with
open
(
f
"
{
dag_id
}
.json"
,
"r"
)
as
f
:
self
.
task
_template
=
json
.
load
(
f
)
self
.
task_param
s
=
set
(
self
.
task
_template
.
keys
())
self
.
msg
_template
=
json
.
load
(
f
)
self
.
msg_key
s
=
set
(
self
.
msg
_template
.
keys
())
@
abstractmethod
def
trigger
(
self
,
**
kwargs
)
->
None
:
# @abstractmethod
# def trigger(self, **kwargs) -> None:
# pass
#
def
schedule
(
self
,
**
kwargs
):
pass
@
abstractmethod
def
schedule
(
self
,
**
kwargs
)
->
None
:
pass
@
staticmethod
def
gen_dag_run_id
(
digits
=
6
):
"""
Generate a unique run_id for a dag.
"""
now
=
time
.
Time
.
now
()
dag_run_id
=
now
.
strftime
(
"%Y%m%d-%H%M%S-"
)
@
abstractmethod
def
push
(
self
)
->
None
:
pass
n
=
len
(
string
.
ascii_lowercase
)
for
i
in
range
(
digits
):
dag_run_id
+=
string
.
ascii_lowercase
[
np
.
random
.
randint
(
low
=
0
,
high
=
n
)]
return
dag_run_id
#
# @abstractmethod
# def push(self) -> None:
# pass
# def __call__(self, **kwargs):
# self.trigger(**kwargs)
csst_dag/dag/msc.py
View file @
3ab5bd4a
import
json
from
._base_dag
import
BaseDAG
from
csst_dfs_client
import
plan
,
level0
__all__
=
[
"CsstMscL1Mbi"
]
MSC_MBI_CHIPID
=
[
"06"
,
"07"
,
"08"
,
"09"
,
"11"
,
"12"
,
"13"
,
"14"
,
"15"
,
"16"
,
"17"
,
"18"
,
"19"
,
"20"
,
"22"
,
"23"
,
"24"
,
"25"
,
]
class
CsstMscL1Mbi
:
def
__init__
(
self
):
pass
MSC_SLS_CHIPID
=
[
"01"
,
"02"
,
"03"
,
"04"
,
"05"
,
"10"
,
"21"
,
"26"
,
"27"
,
"28"
,
"29"
,
"30"
,
]
class
CsstMscL1Mbi
(
BaseDAG
):
@
staticmethod
def
schedule
(
self
,
dataset
:
str
=
"csst-msc-c9-25sqdeg-v3"
,
obs_type
:
str
=
"WIDE"
,
project_id
=
"none"
,
batch_id
:
str
|
None
=
"default"
,
**
kwargs
,
):
pass
# dataset: str = "csst-msc-c9-25sqdeg-v3"
# obs_type: str = "WIDE"
# project_id = "none"
# batch_id: str | None = "default"
# no need to query plan
# plan.find(
# instrument="MSC",
# dataset=dataset,
# obs_type=obs_type,
# project_id=project_id,
# )
#
# dataset: str = "csst-msc-c9-25sqdeg-v3"
# obs_type: str = "WIDE"
# project_id="none"
# batch_id: str | None = "default"
#
# plan.find(instrument="MSC")
recs
=
level0
.
find
(
instrument
=
"MSC"
,
dataset
=
dataset
,
obs_type
=
obs_type
,
project_id
=
project_id
,
)
assert
recs
.
success
,
recs
.
message
msgs
=
[]
for
rec
in
recs
.
data
:
msg
=
self
.
msg_template
.
copy
()
msg
.
update
(
kwargs
)
msg
.
update
(
dict
(
dataset
=
dataset
,
obs_type
=
obs_type
,
project_id
=
project_id
,
batch_id
=
batch_id
,
obs_id
=
rec
[
"obs_id"
],
chipid
=
rec
[
"detector_no"
],
dag_run_id
=
self
.
gen_dag_run_id
(),
)
)
msgs
.
append
(
msg
)
return
msgs
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment