README.md 13.3 KB
Newer Older
1
This repository provides the following functionalities:
2
3
4
5
1. [Read or Download a File From S3 Storage](#1-read-or-download-a-file-from-s3-storage)
2. [Commit For File Processing](#2-commit-for-file-processing)
3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values)
4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state)
6
7
5. [Query a Catalog](#5-query-a-catalog)
6. [Query a Catalog Corresponding to Metadata Entries](#6-query-a-catalog-corresponding-to-metadata-entries)
8
9
7. [Trigger a Pipeline Run](#7-trigger-a-pipeline-run)
8. [Query a Pipeline Run State](#8-query-a-pipeline-run-state)
10

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
## Logging Configuration
By default, log files are written to the current working directory. You can configure a custom logging directory to centralize all csst_fs related log file outputs:

```python
import csst_fs

# Configure logging directory (must be an absolute path)
csst_fs.configure_logging('/var/log/csst_fs')

# All subsequent log files will be written to the configured directory
# This affects all loggers in the csst_fs package
```

**Important Notes:**
- The logging directory must be an absolute path and will be created automatically if it doesn't exist.
- Log files are created **lazily** - they are only created when the first log message is written, not during import.
- Call `configure_logging()` early in your application to ensure all subsequent logs go to the desired directory.

29
30
# 1. Read or Download a File from S3 storage
Supported are two distinct ways of reading from s3 storage.
31
1) [Download to a local file](#从s3下载到本地)
32
33
34
2) [use open() to get a file object](#open-for-read)

## Configuration
qi pan's avatar
qi pan committed
35
**astropy 需升级至 5.3**  
Zheng Gaoshan's avatar
Zheng Gaoshan committed
36
**老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别**
qi pan's avatar
qi pan committed
37

38
## 从s3下载到本地
qi pan's avatar
qi pan committed
39
```python
40

41
def get_path(remote_path: str, local_path: str):
42
43
44
45
    """
    Download a file/folder from s3 to local storage.

    Args:
46
        remote_path: s3 key
47
48
49
        local_path: Local path that will be downloaded to.
    """

50
def info_path(remote_path: str):
51
    """
52
    Get information about a s3 file.
53
54

    Args:
55
        remote_path: s3 key
56
57
58
    """

# Example:
qi pan's avatar
qi pan committed
59
60
from csst_fs import s3_fs
# single file
61
s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', 'v01.fits')
qi pan's avatar
qi pan committed
62
# folder
63
s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0', './', recursive=True)
64
# get file or folder info
65
s3_fs.info_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits')
qi pan's avatar
qi pan committed
66
67
```

68
## Open for read
69
```python
70

71
def open_path(remote_path: str, mode: str = 'r'):
72
    """
73
    Get a readonly file object from a file on s3. Use mode = 'rb' for binary files.
74
75

    Args:
76
77
        remote_path: s3 key
        mode: str = 'r' For binary files: 'rb', default: 'r'
78
79
80
81
82
    Returns:
        File object of the s3 file.
    """

# Example:
83
from csst_fs import s3_fs
84
# open single file (s3 or local)
85
with s3_fs.open_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', mode='rb') as file:
86
87
88
    file.read()
```

Matthias Weidenthaler's avatar
Matthias Weidenthaler committed
89

90
# 2. Commit For File Processing
91

92
Submit a file's content and file name to the ingestion API for further processing.
93
94
The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately.
A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
qi pan's avatar
qi pan committed
95

96
## Function: `start_ingestion_task`
qi pan's avatar
qi pan committed
97
98

```python
99
def start_ingestion_task(files: List[dict]) -> Dict[str, Any]:
100
    """
101
    Submit a list of file contents and file names for ingestion.
102
103

    Args:
104
105
106
107
108
109
110
111
112
113
        [
            {
                file_name (str): The file name for storing the file after ingestion.
                file_content (bytes): The file's content
            },
            {
                ...
            }
        ]

114
    Returns:
115
116
        dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed.
        Example:
117
118
        {
            "task_id": "5",
119
            "failed": List[str] List of file names for which ingestion failed.
120
        }
121

122
    Raises:
123
        RuntimeError: If committing failed after retries.
124
    """
qi pan's avatar
qi pan committed
125
126
127
```


128
129
# 3. Query a List Of L1/L2 Fits-Files By Metadata Values
Query for file info by metadata values.
qi pan's avatar
qi pan committed
130

131
## Function: `query_metadata`
132
```python
133
def query_metadata(
134
135
    filter: Dict[str, Any],
    key: List[str],
136
    hdu: int = 0
137
138
139
140
141
142
143
) -> List[Dict[str, Any]]:
    """
    Query for file info by metadata values.

    Args:
        filter: The filter dict described below.
        key: A list of string values, corresponding to metadata keys that should be included in the output.
144
        hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
145
146
    Returns:
        A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
147
        E.g. with key = ["CABEND", "qc_status"]
148
149
150
            then returns:
            [
                {
151
                    "urn": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
152
                    "metadata": {
153
154
                        "CABEND": "59785.82529",
                        "qc_status": "0.0"
155
                    },
156
157
158
159
160
161
162
163
164
165
166
                    "removed": false,
                    "created": 1756284502817,
                    "parentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/",
                    "name": "CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
                    "lastModified": 1756284502817,
                    "grandParentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/",
                    "platform": "s3",
                    "tags": [
                        "L1"
                    ]
                }
167
168
            ]
    """
169
```
170
171
172
## Filter Syntax
All filters are combined with logical AND (every clause must match).
1) String equality
qi pan's avatar
qi pan committed
173
```python
174
175
176
177
filter = {
    "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
    "obs_type": "WIDE",
}
qi pan's avatar
qi pan committed
178
179
```

180
181
182
183
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
qi pan's avatar
qi pan committed
184
```python
185
186
187
188
189
190
191
192
filter = {
    "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
    "ra": {
        "gte": 250,
        "lte": 260
    },
    "qc_status": 0,
}
qi pan's avatar
qi pan committed
193
194
```

195
196
3) List of values
The queried data should match one of the values in the list. String or number values are possible.
qi pan's avatar
qi pan committed
197
```python
198
filter = {
199
    "NAXIS": [0, 1]
200
}
qi pan's avatar
qi pan committed
201
202
```

203
204
# 4. Query a L2 Processing Tasks State
Query the processing state of a processing task given a L2 task id.
qi pan's avatar
qi pan committed
205

206
## Function: `query_task_state`
207
```python
208
def query_task_state(
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    task_id: str
) -> Dict[str, Any]
    """
    Query the processing state of a processing task given a L2 task id.

    Args:
        task_id: Task id of the L2 processing task
    Returns:
        Dictionary of the following format, including information about the current state of the corresponding processing task.
        The following strings are valid state values: tbd
        E.g.
            {
                "state": "submission_pending",
            }
"""
224
225
```

226
227
# 5. Query a Catalog
Query a catalog by column values given a ra, dec and radius preselection.
228

229
## Function: `query_catalog`
qi pan's avatar
qi pan committed
230
```python
231
def query_catalog(
232
233
234
235
236
    catalog_name: str,
    filter: Dict[str, Any],
    key: List[str],
) -> List[Dict[str, Any]]:
    """
237
    Query a catalog by column values given a ra, dec and radius preselection.
238
239

    Args:
240
        catalog_name: Name of the catalog (e.g. csst-msc-l1-mbi-catmix)
241
242
243
244
245
246
247
248
        filter: The filter dict described below.
            The following keys MUST be set:
            {
                "ra": 40.3,
                "dec": 21.9,
                "radius": 0.2,
            }
            Ra, dec values pinpoint a location, 'radius' defines a radius in [deg] around this point.
249
            Only catalog objects withing this area are considered for subsequent filtering.
250
251
252
            Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
        key: A list of string values, corresponding to the colum names that should be present in the return value.
    Returns:
253
        A List[Dict] of matching catalog objects, containing key-value pairs for the keys set as 'key' parameter.
254
255
256
257
258
259
260
261
262
263
        E.g. with key = ["x", "bulge_flux", "ab"]
            then returns:
            [
                {
                    "x": 995.27,
                    "bulge_flux": "3.2",
                    "ab": 1.2,
                },
            ]
    """
qi pan's avatar
qi pan committed
264
```
265

266
267
# 6. Query a Catalog Corresponding to Metadata Entries
First queries the metadata catalog, based on that subsequently queries the catalog.
268

269
## Function `query_catalog_with_metadata`
270
```python
271
def query_catalog_with_metadata(
272
    metadata: Dict[str, Any],
273
    catalog: Dict[str, Any],
274
275
276
) -> List[Dict[str, Any]]:
    """
    Queries the metadata catalog according to the provided filter criteria and HDU value.
277
    Subsequently queries the catalog entries corresponding to the metadata results and
278
279
280
281
282
283
284
285
    the given additional filters.
    Returns the catalog columns specified in the 'key' list.

    Args:
        metadata: {
            filter: filter dict described below.
            hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
        },
286
287
        catalog: {
            catalog_name: Name of the catalog (e.g. csst-msc-l1-mbi-catmix)
288
289
290
291
292
293
294
295
296
297
298
299
300
            filter: filter dict described below.
                The following keys MUST be set:
                {
                    "ra": 40.3,
                    "dec": 21.9,
                    "radius": 0.2,
                }
                Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
            key: A list of string values, corresponding to the column names that should be present in the return value.
        }

        Example:
        from csst_fs import *
301
302
        query_catalog_with_metadata(
            catalog={
303
304
305
306
307
308
309
310
311
312
313
                "catalogName": "csst-msc-l1-mbi-catmix",
                "key": ["data_uuid", "obsid", "ra"],
                "filter": {"ra": 130.97, "dec": -20.5, "radius": 0.09, "x": {"lt": 30}},
            },
            metadata={
                "filter": {"priority": {"gte": 2}, "obs_id": 66},
                "hdu": 0,
            }
        )

    Returns:
314
        A List[Dict] of matching catalog objects.
315
316
317
    """
```

318
319
320
## Filter Syntax
All filters are combined with logical AND (every clause must match).
1) String equality
qi pan's avatar
qi pan committed
321
```python
322
323
324
325
326
327
328
filter = {
    "ra": 40.3,
    "dec": 21.9,
    "radius": 0.2,
    "msc_photid": "00101000703350610200001812",
    "detector": "06",
}
qi pan's avatar
qi pan committed
329
330
```

331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
```python
filter = {
    "ra": 40.3,
    "dec": 21.9,
    "radius": 0.2,
    "msc_photid": "00101000703350610200001812",
    "x": {
        "gte": 996,
        "lte": 1000,
    },
    "ratio_disk": -9999,
}
347
348
349
```


350
# 7. Trigger a Pipeline Run
351
352
353
354

Trigger execution of a Level-2 data processing pipeline for a specified batch.
This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns the pipeline run_id when successful.

355
The run_id can be used to [query the current state](#8-query-a-pipeline-run-state) of the corresponding run.
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396

## Function: `run_pipeline`
```python
def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]:
    """
    Submitts a pipeline run for execution with the provided batch.

    Retries up to 3 times on transient/network errors.

    Args:
        batch: A dictionary describing the batch to run.
            {
                "dag_group": str,
                "dag": str,
                "batch_id": str,
            }

    Returns:
        dict: A dictionary containing the run_id of the newly submitted pipeline run.
        Example:
        {
            "run_id": "3"
        }

    Raises:
        RuntimeError: If the pipeline API request fails or returns an invalid response
                      after all retry attempts.
    """

# Example:
from csst_fs import run_pipeline
batch = {
    "dag_group": "dag_group_name",
    "dag": "dag_name",
    "batch_id": "batch_1"
}
result = run_pipeline(batch)
# result: {'run_id': '4'}
```


397
398
# 8. Query a Pipeline Run State
Query the state of a pipeline run given an id (obtained from [run_pipeline](#7-trigger-a-pipeline-run))
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422

## Function: `query_run_state`
```python
def query_run_state(
    run_id: str
) -> Dict[str, Any]
    """
    Query the processing state of a pipeline run given an id.

    Args:
        run_id: Run id of the pipeline run.
    Returns:
        Dictionary of the following format, including information about the current state of the corresponding run.
        Possible values are "running" and "completed"
        E.g.
            {
                "state": "running",
            }
    """
    
# Example
from csst_fs import query_run_state
result = query_run_state("4")
# result: {'state': 'completed'}
423
```