Commits (2)
...@@ -71,16 +71,17 @@ with s3_fs.open('gaia/data') as file: ...@@ -71,16 +71,17 @@ with s3_fs.open('gaia/data') as file:
# 2. Commit For File Processing # 2. Commit For File Processing
Submit a file's content and file name to the ingestion API for further processing.
The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately. The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately.
A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state. A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
## Configuration ## Configuration
The helper will send HTTP requests to an external API. INGESTION_API_URL env variable should be set accordingly. The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
## Function: `submit_file_for_ingestion` ## Function: `start_ingestion_task`
```python ```python
def submit_file_for_ingestion(file_content: str, file_name: str) -> dict: def start_ingestion_task(file_content: str, file_name: str) -> dict:
""" """
Submit a file's content and file name to the ingestion API. Submit a file's content and file name to the ingestion API.
...@@ -103,11 +104,11 @@ def submit_file_for_ingestion(file_content: str, file_name: str) -> dict: ...@@ -103,11 +104,11 @@ def submit_file_for_ingestion(file_content: str, file_name: str) -> dict:
Query for file info by metadata values. Query for file info by metadata values.
## Configuration ## Configuration
The helper will send HTTP requests to an external API. SEARCH_API_URL env variable should be set accordingly. The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
## Function: `search_with_basic_filters` ## Function: `query_metadata`
```python ```python
def search_with_basic_filters( def query_metadata(
filter: Dict[str, Any], filter: Dict[str, Any],
key: List[str], key: List[str],
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
...@@ -119,18 +120,26 @@ def search_with_basic_filters( ...@@ -119,18 +120,26 @@ def search_with_basic_filters(
key: A list of string values, corresponding to metadata keys that should be included in the output. key: A list of string values, corresponding to metadata keys that should be included in the output.
Returns: Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'. A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"] E.g. with key = ["CABEND", "qc_status"]
then returns: then returns:
[ [
{ {
"file_path": "CSST_L0/MSC/SCI/60310/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_03_L0_V01.fits", "urn": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
"metadata": { "metadata": {
"dataset":"csst-msc-c11-1000sqdeg-wide-test-v2", "CABEND": "59785.82529",
"instrument":"MSC", "qc_status": "0.0"
"obs_group":"W1",
"obs_id":"10200000000"
}, },
}, "removed": false,
"created": 1756284502817,
"parentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/",
"name": "CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
"lastModified": 1756284502817,
"grandParentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/",
"platform": "s3",
"tags": [
"L1"
]
}
] ]
""" """
``` ```
...@@ -174,11 +183,11 @@ filter = { ...@@ -174,11 +183,11 @@ filter = {
Query the processing state of a processing task given a L2 task id. Query the processing state of a processing task given a L2 task id.
## Configuration ## Configuration
The helper will send HTTP requests to an external API. QUERY_TASK_STATE_API_URL env variable should be set accordingly. The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
## Function: `query_processing_task_state` ## Function: `query_task_state`
```python ```python
def query_processing_task_state( def query_task_state(
task_id: str task_id: str
) -> Dict[str, Any] ) -> Dict[str, Any]
""" """
...@@ -200,9 +209,9 @@ def query_processing_task_state( ...@@ -200,9 +209,9 @@ def query_processing_task_state(
Query a star catalog by column values given a ra, dec and radius preselection. Query a star catalog by column values given a ra, dec and radius preselection.
## Configuration ## Configuration
The helper will send HTTP requests to an external API. STAR_CATALOG_SEARCH_API_URL env variable should be set accordingly. The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
## Function: `search_with_basic_filters` ## Function: `query_star_catalog`
```python ```python
def query_star_catalog( def query_star_catalog(
catalog_name: str, catalog_name: str,
......
...@@ -4,6 +4,6 @@ from .fits import fsspec_HDUList ...@@ -4,6 +4,6 @@ from .fits import fsspec_HDUList
from .table import fsspec_table from .table import fsspec_table
from . import s3_fs from . import s3_fs
from . import fs from . import fs
from .commit import level2 from .catalog.metadata import query_metadata
from .star_catalog import search from .catalog.star import query_star_catalog
from .metadata import search from .ingestion.level2 import start_ingestion_task, query_task_state
\ No newline at end of file \ No newline at end of file
...@@ -32,9 +32,9 @@ def query_metadata( ...@@ -32,9 +32,9 @@ def query_metadata(
if not filter: if not filter:
raise ValueError("Filter cannot be empty") raise ValueError("Filter cannot be empty")
api_url = os.getenv("METADATA_SEARCH_API_URL") api_url = os.getenv("CSST_BACKEND_API_URL")
if not api_url: if not api_url:
raise RuntimeError("METADATA_SEARCH_API_URL environment variable is not set") raise RuntimeError("CSST_BACKEND_API_URL environment variable is not set")
endpoint = f"{api_url}/datasync/metadata/query" endpoint = f"{api_url}/datasync/metadata/query"
......
...@@ -33,9 +33,9 @@ def query_star_catalog( ...@@ -33,9 +33,9 @@ def query_star_catalog(
if not key: if not key:
raise ValueError("Key list cannot be empty") raise ValueError("Key list cannot be empty")
api_url = os.getenv("STAR_CATALOG_SEARCH_API_URL") api_url = os.getenv("CSST_BACKEND_API_URL")
if not api_url: if not api_url:
raise RuntimeError("STAR_CATALOG_SEARCH_API_URL environment variable is not set") raise RuntimeError("CSST_BACKEND_API_URL environment variable is not set")
endpoint = f"{api_url}/starcatalog/query" endpoint = f"{api_url}/starcatalog/query"
......
...@@ -5,7 +5,7 @@ import time ...@@ -5,7 +5,7 @@ import time
from typing import Dict, Any from typing import Dict, Any
from csst_fs import s3_fs from csst_fs import s3_fs
def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str, str]: def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
""" """
Submit a file for ingestion by: Submit a file for ingestion by:
1. Requesting an OSS upload path from the ingestion API. 1. Requesting an OSS upload path from the ingestion API.
...@@ -26,9 +26,9 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str, ...@@ -26,9 +26,9 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str,
Raises: Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries. RuntimeError: If the ingestion API or OSS upload fails after retries.
""" """
api_url = os.getenv("INGESTION_API_URL") api_url = os.getenv("CSST_BACKEND_API_URL")
if not api_url: if not api_url:
raise RuntimeError("INGESTION_API_URL environment variable is not set") raise RuntimeError("CSST_BACKEND_API_URL environment variable is not set")
# Step 1: Request an OSS upload path # Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload" request_upload_endpoint = f"{api_url}/datasync/level2/upload"
...@@ -82,7 +82,7 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str, ...@@ -82,7 +82,7 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str,
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}") raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
def query_processing_task_state(task_id: str) -> Dict[str, Any]: def query_task_state(task_id: str) -> Dict[str, Any]:
""" """
Query the processing state of a processing task given a L2 task id. Query the processing state of a processing task given a L2 task id.
...@@ -98,9 +98,9 @@ def query_processing_task_state(task_id: str) -> Dict[str, Any]: ...@@ -98,9 +98,9 @@ def query_processing_task_state(task_id: str) -> Dict[str, Any]:
if not task_id: if not task_id:
raise ValueError("task_id must be provided") raise ValueError("task_id must be provided")
api_url = os.getenv("QUERY_TASK_STATE_API_URL") api_url = os.getenv("CSST_BACKEND_API_URL")
if not api_url: if not api_url:
raise RuntimeError("QUERY_TASK_STATE_API_URL environment variable is not set") raise RuntimeError("CSST_BACKEND_API_URL environment variable is not set")
endpoint = f"{api_url}/datasync/level2/{task_id}" endpoint = f"{api_url}/datasync/level2/{task_id}"
......