README.md 12.7 KB
Newer Older
1
This repository provides the following functionalities:
2
3
4
5
6
1. [Read or Download a File From S3 Storage](#1-read-or-download-a-file-from-s3-storage)
2. [Commit For File Processing](#2-commit-for-file-processing)
3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values)
4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state)
5. [Query a Star Catalog](#5-query-a-star-catalog)
7
8
9
6. [Query a Star Catalog Corresponding to Metadata Entries](#6-query-a-star-catalog-corresponding-to-metadata-entries)
7. [Trigger a Pipeline Run](#7-trigger-a-pipeline-run)
8. [Query a Pipeline Run State](#8-query-a-pipeline-run-state)
10
11
12

# 1. Read or Download a File from S3 storage
Supported are two distinct ways of reading from s3 storage.
13
1) [Download to a local file](#从s3下载到本地)
14
15
16
2) [use open() to get a file object](#open-for-read)

## Configuration
qi pan's avatar
qi pan committed
17
**astropy 需升级至 5.3**  
Zheng Gaoshan's avatar
Zheng Gaoshan committed
18
**老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别**
qi pan's avatar
qi pan committed
19

20
## 从s3下载到本地
qi pan's avatar
qi pan committed
21
```python
22

23
def get_path(remote_path: str, local_path: str):
24
25
26
27
    """
    Download a file/folder from s3 to local storage.

    Args:
28
        remote_path: s3 key
29
30
31
        local_path: Local path that will be downloaded to.
    """

32
def info_path(remote_path: str):
33
    """
34
    Get information about a s3 file.
35
36

    Args:
37
        remote_path: s3 key
38
39
40
    """

# Example:
qi pan's avatar
qi pan committed
41
42
from csst_fs import s3_fs
# single file
43
s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', 'v01.fits')
qi pan's avatar
qi pan committed
44
# folder
45
s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0', './', recursive=True)
46
# get file or folder info
47
s3_fs.info_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits')
qi pan's avatar
qi pan committed
48
49
```

50
## Open for read
51
```python
52

53
def open_path(remote_path: str, mode: str = 'r'):
54
    """
55
    Get a readonly file object from a file on s3. Use mode = 'rb' for binary files.
56
57

    Args:
58
59
        remote_path: s3 key
        mode: str = 'r' For binary files: 'rb', default: 'r'
60
61
62
63
64
    Returns:
        File object of the s3 file.
    """

# Example:
65
from csst_fs import s3_fs
66
# open single file (s3 or local)
67
with s3_fs.open_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', mode='rb') as file:
68
69
70
    file.read()
```

Matthias Weidenthaler's avatar
Matthias Weidenthaler committed
71

72
# 2. Commit For File Processing
73

74
Submit a file's content and file name to the ingestion API for further processing.
75
76
The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately.
A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
qi pan's avatar
qi pan committed
77

78
## Function: `start_ingestion_task`
qi pan's avatar
qi pan committed
79
80

```python
81
def start_ingestion_task(files: List[dict]) -> Dict[str, Any]:
82
    """
83
    Submit a list of file contents and file names for ingestion.
84
85

    Args:
86
87
88
89
90
91
92
93
94
95
        [
            {
                file_name (str): The file name for storing the file after ingestion.
                file_content (bytes): The file's content
            },
            {
                ...
            }
        ]

96
    Returns:
97
98
        dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
        Example:
99
100
        {
            "task_id": "5",
101
            "failed": List[str] List of file names for which ingestion failed.
102
        }
103

104
    Raises:
105
        RuntimeError: If committing failed after retries.
106
    """
qi pan's avatar
qi pan committed
107
108
109
```


110
111
# 3. Query a List Of L1/L2 Fits-Files By Metadata Values
Query for file info by metadata values.
qi pan's avatar
qi pan committed
112

113
## Function: `query_metadata`
114
```python
115
def query_metadata(
116
117
    filter: Dict[str, Any],
    key: List[str],
118
    hdu: int = 0
119
120
121
122
123
124
125
) -> List[Dict[str, Any]]:
    """
    Query for file info by metadata values.

    Args:
        filter: The filter dict described below.
        key: A list of string values, corresponding to metadata keys that should be included in the output.
126
        hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
127
128
    Returns:
        A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
129
        E.g. with key = ["CABEND", "qc_status"]
130
131
132
            then returns:
            [
                {
133
                    "urn": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
134
                    "metadata": {
135
136
                        "CABEND": "59785.82529",
                        "qc_status": "0.0"
137
                    },
138
139
140
141
142
143
144
145
146
147
148
                    "removed": false,
                    "created": 1756284502817,
                    "parentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/",
                    "name": "CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
                    "lastModified": 1756284502817,
                    "grandParentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/",
                    "platform": "s3",
                    "tags": [
                        "L1"
                    ]
                }
149
150
            ]
    """
151
```
152
153
154
## Filter Syntax
All filters are combined with logical AND (every clause must match).
1) String equality
qi pan's avatar
qi pan committed
155
```python
156
157
158
159
filter = {
    "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
    "obs_type": "WIDE",
}
qi pan's avatar
qi pan committed
160
161
```

162
163
164
165
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
qi pan's avatar
qi pan committed
166
```python
167
168
169
170
171
172
173
174
filter = {
    "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
    "ra": {
        "gte": 250,
        "lte": 260
    },
    "qc_status": 0,
}
qi pan's avatar
qi pan committed
175
176
```

177
178
3) List of values
The queried data should match one of the values in the list. String or number values are possible.
qi pan's avatar
qi pan committed
179
```python
180
filter = {
181
    "NAXIS": [0, 1]
182
}
qi pan's avatar
qi pan committed
183
184
```

185
186
# 4. Query a L2 Processing Tasks State
Query the processing state of a processing task given a L2 task id.
qi pan's avatar
qi pan committed
187

188
## Function: `query_task_state`
189
```python
190
def query_task_state(
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
    task_id: str
) -> Dict[str, Any]
    """
    Query the processing state of a processing task given a L2 task id.

    Args:
        task_id: Task id of the L2 processing task
    Returns:
        Dictionary of the following format, including information about the current state of the corresponding processing task.
        The following strings are valid state values: tbd
        E.g.
            {
                "state": "submission_pending",
            }
"""
206
207
```

208
209
# 5. Query a Star Catalog
Query a star catalog by column values given a ra, dec and radius preselection.
210

211
## Function: `query_star_catalog`
qi pan's avatar
qi pan committed
212
```python
213
214
215
216
217
218
219
220
221
def query_star_catalog(
    catalog_name: str,
    filter: Dict[str, Any],
    key: List[str],
) -> List[Dict[str, Any]]:
    """
    Query a star catalog by column values given a ra, dec and radius preselection.

    Args:
222
        catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
        filter: The filter dict described below.
            The following keys MUST be set:
            {
                "ra": 40.3,
                "dec": 21.9,
                "radius": 0.2,
            }
            Ra, dec values pinpoint a location, 'radius' defines a radius in [deg] around this point.
            Only star catalog objects withing this area are considered for subsequent filtering.
            Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
        key: A list of string values, corresponding to the colum names that should be present in the return value.
    Returns:
        A List[Dict] of matching star catalog objects, containing key-value pairs for the keys set as 'key' parameter.
        E.g. with key = ["x", "bulge_flux", "ab"]
            then returns:
            [
                {
                    "x": 995.27,
                    "bulge_flux": "3.2",
                    "ab": 1.2,
                },
            ]
    """
qi pan's avatar
qi pan committed
246
```
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299

# 6. Query a Star Catalog Corresponding to Metadata Entries
First queries the metadata catalog, based on that subsequently queries the star catalog.

## Function `query_star_catalog_with_metadata`
```python
def query_star_catalog_with_metadata(
    metadata: Dict[str, Any],
    star_catalog: Dict[str, Any],
) -> List[Dict[str, Any]]:
    """
    Queries the metadata catalog according to the provided filter criteria and HDU value.
    Subsequently queries the star catalog entries corresponding to the metadata results and
    the given additional filters.
    Returns the catalog columns specified in the 'key' list.

    Args:
        metadata: {
            filter: filter dict described below.
            hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
        },
        star_catalog: {
            catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
            filter: filter dict described below.
                The following keys MUST be set:
                {
                    "ra": 40.3,
                    "dec": 21.9,
                    "radius": 0.2,
                }
                Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
            key: A list of string values, corresponding to the column names that should be present in the return value.
        }

        Example:
        from csst_fs import *
        query_star_catalog_with_metadata(
            star_catalog={
                "catalogName": "csst-msc-l1-mbi-catmix",
                "key": ["data_uuid", "obsid", "ra"],
                "filter": {"ra": 130.97, "dec": -20.5, "radius": 0.09, "x": {"lt": 30}},
            },
            metadata={
                "filter": {"priority": {"gte": 2}, "obs_id": 66},
                "hdu": 0,
            }
        )

    Returns:
        A List[Dict] of matching star catalog objects.
    """
```

300
301
302
## Filter Syntax
All filters are combined with logical AND (every clause must match).
1) String equality
qi pan's avatar
qi pan committed
303
```python
304
305
306
307
308
309
310
filter = {
    "ra": 40.3,
    "dec": 21.9,
    "radius": 0.2,
    "msc_photid": "00101000703350610200001812",
    "detector": "06",
}
qi pan's avatar
qi pan committed
311
312
```

313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
```python
filter = {
    "ra": 40.3,
    "dec": 21.9,
    "radius": 0.2,
    "msc_photid": "00101000703350610200001812",
    "x": {
        "gte": 996,
        "lte": 1000,
    },
    "ratio_disk": -9999,
}
329
330
331
```


332
# 7. Trigger a Pipeline Run
333
334
335
336

Trigger execution of a Level-2 data processing pipeline for a specified batch.
This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns the pipeline run_id when successful.

337
The run_id can be used to [query the current state](#8-query-a-pipeline-run-state) of the corresponding run.
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378

## Function: `run_pipeline`
```python
def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
    """
    Submitts a pipeline run for execution with the provided batch.

    Retries up to 3 times on transient/network errors.

    Args:
        batch: A dictionary describing the batch to run.
            {
                "dag_group": str,
                "dag": str,
                "batch_id": str,
            }

    Returns:
        dict: A dictionary containing the run_id of the newly submitted pipeline run.
        Example:
        {
            "run_id": "3"
        }

    Raises:
        RuntimeError: If the pipeline API request fails or returns an invalid response
                      after all retry attempts.
    """

# Example:
from csst_fs import run_pipeline
batch = {
    "dag_group": "dag_group_name",
    "dag": "dag_name",
    "batch_id": "batch_1"
}
result = run_pipeline(batch)
# result: {'run_id': '4'}
```


379
380
# 8. Query a Pipeline Run State
Query the state of a pipeline run given an id (obtained from [run_pipeline](#7-trigger-a-pipeline-run))
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404

## Function: `query_run_state`
```python
def query_run_state(
    run_id: str
) -> Dict[str, Any]
    """
    Query the processing state of a pipeline run given an id.

    Args:
        run_id: Run id of the pipeline run.
    Returns:
        Dictionary of the following format, including information about the current state of the corresponding run.
        Possible values are "running" and "completed"
        E.g.
            {
                "state": "running",
            }
    """
    
# Example
from csst_fs import query_run_state
result = query_run_state("4")
# result: {'state': 'completed'}
405
```