README.md 9.28 KB
Newer Older
1
This repository provides the following functionalities:
2
3
4
5
6
1. [Read or Download a File From S3 Storage](#1-read-or-download-a-file-from-s3-storage)
2. [Commit For File Processing](#2-commit-for-file-processing)
3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values)
4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state)
5. [Query a Star Catalog](#5-query-a-star-catalog)
7
8
9

# 1. Read or Download a File from S3 storage
Supported are two distinct ways of reading from s3 storage.
10
1) [Download to a local file](#从s3下载到本地)
11
12
13
2) [use open() to get a file object](#open-for-read)

## Configuration
qi pan's avatar
qi pan committed
14
**astropy 需升级至 5.3**  
15
**老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别**  
qi pan's avatar
qi pan committed
16

17
如果需要读S3时,需要传入s3的密钥和endpoint等配置,有两种方法可选
18
19
20
21
22
S3 credentials and options are set to working default values. They can be overwritten if needed, e.g.
S3_KEY=dummy_key
S3_SECRET=dummy_secret
S3_ENDPOINT_URL=http://oss-cn-hangzhou-zjy-d01-a.ops.cloud.zhejianglab.com 
S3_BUCKET=data-and-computing
qi pan's avatar
qi pan committed
23
24


25
## 从s3下载到本地
qi pan's avatar
qi pan committed
26
```python
27

28
def get_path(remote_path: str, local_path: str):
29
30
31
32
    """
    Download a file/folder from s3 to local storage.

    Args:
33
        remote_path: s3 key
34
35
36
        local_path: Local path that will be downloaded to.
    """

37
def info_path(remote_path: str):
38
    """
39
    Get information about a s3 file.
40
41

    Args:
42
        remote_path: s3 key
43
44
45
    """

# Example:
qi pan's avatar
qi pan committed
46
47
from csst_fs import s3_fs
# single file
48
s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', 'v01.fits')
qi pan's avatar
qi pan committed
49
# folder
50
s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0', './', recursive=True)
51
# get file or folder info
52
s3_fs.info_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits')
qi pan's avatar
qi pan committed
53
54
```

55
## Open for read
56
```python
57

58
def open_path(remote_path: str, mode: str = 'r'):
59
    """
60
    Get a readonly file object from a file on s3. Use mode = 'rb' for binary files.
61
62

    Args:
63
64
        remote_path: s3 key
        mode: str = 'r' For binary files: 'rb', default: 'r'
65
66
67
68
69
    Returns:
        File object of the s3 file.
    """

# Example:
70
from csst_fs import s3_fs
71
# open single file (s3 or local)
72
with s3_fs.open_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', mode='rb') as file:
73
74
75
    file.read()
```

Matthias Weidenthaler's avatar
Matthias Weidenthaler committed
76

77
# 2. Commit For File Processing
78

79
Submit a file's content and file name to the ingestion API for further processing.
80
81
The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately.
A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
qi pan's avatar
qi pan committed
82

83
## Configuration
84
85
The helper will send HTTP requests to an external API. The CSST_BACKEND_API_URL env variable should be set accordingly. E.g.
CSST_BACKEND_API_URL=http://10.200.60.199:9010
86

87
## Function: `start_ingestion_task`
qi pan's avatar
qi pan committed
88
89

```python
90
def start_ingestion_task(file_content: str, file_name: str) -> dict:
91
92
93
94
95
96
97
98
99
100
101
102
    """
    Submit a file's content and file name to the ingestion API.

    Args:
        file_content (str): The file's content as string representation
        file_name (str): The file name for storing the file after ingestion.
    Returns:
        dict: A dict containing a task_id, referring the the queued processing task's id.
        E.g. 
        {
            "task_id": "5",
        }
103
104
    Raises:
        RuntimeError: If the ingestion API or data upload fails after retries.
105
    """
qi pan's avatar
qi pan committed
106
107
108
```


109
110
# 3. Query a List Of L1/L2 Fits-Files By Metadata Values
Query for file info by metadata values.
qi pan's avatar
qi pan committed
111

112
## Configuration
113
The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
114

115
## Function: `query_metadata`
116
```python
117
def query_metadata(
118
119
    filter: Dict[str, Any],
    key: List[str],
120
    hdu: int = 0
121
122
123
124
125
126
127
) -> List[Dict[str, Any]]:
    """
    Query for file info by metadata values.

    Args:
        filter: The filter dict described below.
        key: A list of string values, corresponding to metadata keys that should be included in the output.
128
        hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
129
130
    Returns:
        A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
131
        E.g. with key = ["CABEND", "qc_status"]
132
133
134
            then returns:
            [
                {
135
                    "urn": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
136
                    "metadata": {
137
138
                        "CABEND": "59785.82529",
                        "qc_status": "0.0"
139
                    },
140
141
142
143
144
145
146
147
148
149
150
                    "removed": false,
                    "created": 1756284502817,
                    "parentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/",
                    "name": "CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
                    "lastModified": 1756284502817,
                    "grandParentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/",
                    "platform": "s3",
                    "tags": [
                        "L1"
                    ]
                }
151
152
            ]
    """
153
```
154
155
156
## Filter Syntax
All filters are combined with logical AND (every clause must match).
1) String equality
qi pan's avatar
qi pan committed
157
```python
158
159
160
161
filter = {
    "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
    "obs_type": "WIDE",
}
qi pan's avatar
qi pan committed
162
163
```

164
165
166
167
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
qi pan's avatar
qi pan committed
168
```python
169
170
171
172
173
174
175
176
filter = {
    "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2",
    "ra": {
        "gte": 250,
        "lte": 260
    },
    "qc_status": 0,
}
qi pan's avatar
qi pan committed
177
178
```

179
3) Timestamp equality and ranges
qi pan's avatar
qi pan committed
180
```python
181
182
183
184
185
186
187
filter = {
    "created_date": "2015-08-04T11:00:00",
    "obs_date": {
        "gt": "2015-06-01T10:00:00",
        "lt": "2015-07-01T10:00:00",
    },
}
qi pan's avatar
qi pan committed
188
189
```

190
191
# 4. Query a L2 Processing Tasks State
Query the processing state of a processing task given a L2 task id.
qi pan's avatar
qi pan committed
192

193
## Configuration
194
The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
195

196
## Function: `query_task_state`
197
```python
198
def query_task_state(
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
    task_id: str
) -> Dict[str, Any]
    """
    Query the processing state of a processing task given a L2 task id.

    Args:
        task_id: Task id of the L2 processing task
    Returns:
        Dictionary of the following format, including information about the current state of the corresponding processing task.
        The following strings are valid state values: tbd
        E.g.
            {
                "state": "submission_pending",
            }
"""
214
215
```

216
217
# 5. Query a Star Catalog
Query a star catalog by column values given a ra, dec and radius preselection.
218

219
## Configuration
220
The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
qi pan's avatar
qi pan committed
221

222
## Function: `query_star_catalog`
qi pan's avatar
qi pan committed
223
```python
224
225
226
227
228
229
230
231
232
def query_star_catalog(
    catalog_name: str,
    filter: Dict[str, Any],
    key: List[str],
) -> List[Dict[str, Any]]:
    """
    Query a star catalog by column values given a ra, dec and radius preselection.

    Args:
233
        catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
        filter: The filter dict described below.
            The following keys MUST be set:
            {
                "ra": 40.3,
                "dec": 21.9,
                "radius": 0.2,
            }
            Ra, dec values pinpoint a location, 'radius' defines a radius in [deg] around this point.
            Only star catalog objects withing this area are considered for subsequent filtering.
            Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
        key: A list of string values, corresponding to the colum names that should be present in the return value.
    Returns:
        A List[Dict] of matching star catalog objects, containing key-value pairs for the keys set as 'key' parameter.
        E.g. with key = ["x", "bulge_flux", "ab"]
            then returns:
            [
                {
                    "x": 995.27,
                    "bulge_flux": "3.2",
                    "ab": 1.2,
                },
            ]
    """
qi pan's avatar
qi pan committed
257
```
258
259
260
## Filter Syntax
All filters are combined with logical AND (every clause must match).
1) String equality
qi pan's avatar
qi pan committed
261
```python
262
263
264
265
266
267
268
filter = {
    "ra": 40.3,
    "dec": 21.9,
    "radius": 0.2,
    "msc_photid": "00101000703350610200001812",
    "detector": "06",
}
qi pan's avatar
qi pan committed
269
270
```

271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
2) Numeric equality and ranges
Supported inequality operators are:
lt/gt: less/greater than
lte/gte: less/greater than or equal
```python
filter = {
    "ra": 40.3,
    "dec": 21.9,
    "radius": 0.2,
    "msc_photid": "00101000703350610200001812",
    "x": {
        "gte": 996,
        "lte": 1000,
    },
    "ratio_disk": -9999,
}
```