Commits (2)
......@@ -71,16 +71,17 @@ with s3_fs.open('gaia/data') as file:
# 2. Commit For File Processing
Submit a file's content and file name to the ingestion API for further processing.
The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately.
A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state.
## Configuration
The helper will send HTTP requests to an external API. INGESTION_API_URL env variable should be set accordingly.
The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
## Function: `submit_file_for_ingestion`
## Function: `start_ingestion_task`
```python
def submit_file_for_ingestion(file_content: str, file_name: str) -> dict:
def start_ingestion_task(file_content: str, file_name: str) -> dict:
"""
Submit a file's content and file name to the ingestion API.
......@@ -103,11 +104,11 @@ def submit_file_for_ingestion(file_content: str, file_name: str) -> dict:
Query for file info by metadata values.
## Configuration
The helper will send HTTP requests to an external API. SEARCH_API_URL env variable should be set accordingly.
The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
## Function: `search_with_basic_filters`
## Function: `query_metadata`
```python
def search_with_basic_filters(
def query_metadata(
filter: Dict[str, Any],
key: List[str],
) -> List[Dict[str, Any]]:
......@@ -119,18 +120,26 @@ def search_with_basic_filters(
key: A list of string values, corresponding to metadata keys that should be included in the output.
Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"]
E.g. with key = ["CABEND", "qc_status"]
then returns:
[
{
"file_path": "CSST_L0/MSC/SCI/60310/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_03_L0_V01.fits",
"urn": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
"metadata": {
"dataset":"csst-msc-c11-1000sqdeg-wide-test-v2",
"instrument":"MSC",
"obs_group":"W1",
"obs_id":"10200000000"
"CABEND": "59785.82529",
"qc_status": "0.0"
},
},
"removed": false,
"created": 1756284502817,
"parentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/",
"name": "CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits",
"lastModified": 1756284502817,
"grandParentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/",
"platform": "s3",
"tags": [
"L1"
]
}
]
"""
```
......@@ -174,11 +183,11 @@ filter = {
Query the processing state of a processing task given a L2 task id.
## Configuration
The helper will send HTTP requests to an external API. QUERY_TASK_STATE_API_URL env variable should be set accordingly.
The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
## Function: `query_processing_task_state`
## Function: `query_task_state`
```python
def query_processing_task_state(
def query_task_state(
task_id: str
) -> Dict[str, Any]
"""
......@@ -200,9 +209,9 @@ def query_processing_task_state(
Query a star catalog by column values given a ra, dec and radius preselection.
## Configuration
The helper will send HTTP requests to an external API. STAR_CATALOG_SEARCH_API_URL env variable should be set accordingly.
The helper will send HTTP requests to an external API. CSST_BACKEND_API_URL env variable should be set accordingly.
## Function: `search_with_basic_filters`
## Function: `query_star_catalog`
```python
def query_star_catalog(
catalog_name: str,
......
......@@ -4,6 +4,6 @@ from .fits import fsspec_HDUList
from .table import fsspec_table
from . import s3_fs
from . import fs
from .commit import level2
from .star_catalog import search
from .metadata import search
\ No newline at end of file
from .catalog.metadata import query_metadata
from .catalog.star import query_star_catalog
from .ingestion.level2 import start_ingestion_task, query_task_state
\ No newline at end of file
......@@ -32,9 +32,9 @@ def query_metadata(
if not filter:
raise ValueError("Filter cannot be empty")
api_url = os.getenv("METADATA_SEARCH_API_URL")
api_url = os.getenv("CSST_BACKEND_API_URL")
if not api_url:
raise RuntimeError("METADATA_SEARCH_API_URL environment variable is not set")
raise RuntimeError("CSST_BACKEND_API_URL environment variable is not set")
endpoint = f"{api_url}/datasync/metadata/query"
......
......@@ -33,9 +33,9 @@ def query_star_catalog(
if not key:
raise ValueError("Key list cannot be empty")
api_url = os.getenv("STAR_CATALOG_SEARCH_API_URL")
api_url = os.getenv("CSST_BACKEND_API_URL")
if not api_url:
raise RuntimeError("STAR_CATALOG_SEARCH_API_URL environment variable is not set")
raise RuntimeError("CSST_BACKEND_API_URL environment variable is not set")
endpoint = f"{api_url}/starcatalog/query"
......
......@@ -5,7 +5,7 @@ import time
from typing import Dict, Any
from csst_fs import s3_fs
def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str, str]:
def start_ingestion_task(file_content: bytes, file_name: str) -> Dict[str, str]:
"""
Submit a file for ingestion by:
1. Requesting an OSS upload path from the ingestion API.
......@@ -26,9 +26,9 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str,
Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries.
"""
api_url = os.getenv("INGESTION_API_URL")
api_url = os.getenv("CSST_BACKEND_API_URL")
if not api_url:
raise RuntimeError("INGESTION_API_URL environment variable is not set")
raise RuntimeError("CSST_BACKEND_API_URL environment variable is not set")
# Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload"
......@@ -82,7 +82,7 @@ def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str,
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
def query_processing_task_state(task_id: str) -> Dict[str, Any]:
def query_task_state(task_id: str) -> Dict[str, Any]:
"""
Query the processing state of a processing task given a L2 task id.
......@@ -98,9 +98,9 @@ def query_processing_task_state(task_id: str) -> Dict[str, Any]:
if not task_id:
raise ValueError("task_id must be provided")
api_url = os.getenv("QUERY_TASK_STATE_API_URL")
api_url = os.getenv("CSST_BACKEND_API_URL")
if not api_url:
raise RuntimeError("QUERY_TASK_STATE_API_URL environment variable is not set")
raise RuntimeError("CSST_BACKEND_API_URL environment variable is not set")
endpoint = f"{api_url}/datasync/level2/{task_id}"
......