Commits (5)
......@@ -93,6 +93,8 @@ def submit_file_for_ingestion(file_content: str, file_name: str) -> dict:
{
"task_id": "5",
}
Raises:
RuntimeError: If the ingestion API or data upload fails after retries.
"""
```
......
......@@ -3,4 +3,7 @@ from .fits import fsspec_header
from .fits import fsspec_HDUList
from .table import fsspec_table
from . import s3_fs
from . import fs
\ No newline at end of file
from . import fs
from .commit import level2
from .star_catalog import search
from .metadata import search
\ No newline at end of file
import os
import base64
import requests
import time
from typing import Dict, Any
from csst_fs import s3_fs
def submit_file_for_ingestion(file_content: bytes, file_name: str) -> Dict[str, str]:
"""
Submit a file for ingestion by:
1. Requesting an OSS upload path from the ingestion API.
2. Uploading the file to the returned OSS path.
3. Reporting the finished upload back to the ingestion API.
Args:
file_content (bytes): The file's content
file_name (str): The file name for storing the file after ingestion.
Returns:
dict: A dict containing a task_id referring to the queued processing task's id.
Example:
{
"task_id": "5",
}
Raises:
RuntimeError: If the ingestion API or OSS upload fails after retries.
"""
api_url = os.getenv("INGESTION_API_URL")
if not api_url:
raise RuntimeError("INGESTION_API_URL environment variable is not set")
# Step 1: Request an OSS upload path
request_upload_endpoint = f"{api_url}/datasync/level2/upload"
payload = {"fileName": file_name}
try:
response = requests.post(request_upload_endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to request OSS upload path: {e}")
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected response while requesting upload path: {data}")
oss_upload_bucket = data["result"].get("bucket")
oss_upload_key = data["result"].get("key")
if not oss_upload_bucket or not oss_upload_key:
raise RuntimeError(f"No OSS upload bucket/key returned from API: {data}")
# Step 2: Upload file to OSS path
max_retries = 3
backoff_factor = 2
for attempt in range(1, max_retries + 1):
try:
with s3_fs.open(f"s3://{oss_upload_bucket}/{oss_upload_key}", "wb") as f:
f.write(file_content)
except e:
if attempt == max_retries:
raise RuntimeError(f"OSS upload failed after {max_retries} attempts: {e}")
else:
wait_time = backoff_factor ** (attempt - 1)
print(f"OSS upload attempt {attempt} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
# Step 3: Report upload completion
try:
report_upload_endpoint = f"{api_url}/datasync/level2/queue"
report_payload = {"bucket": oss_upload_bucket, "key": oss_upload_key, "fileName": file_name}
report_response = requests.post(report_upload_endpoint, json=report_payload, timeout=30)
report_response.raise_for_status()
report_data = report_response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to report completed upload: {e}")
if report_data.get("success") and "result" in report_data and "task_id" in report_data["result"]:
return {"task_id": report_data["result"]["task_id"]}
else:
raise RuntimeError(f"Unexpected response while reporting upload: {report_data}")
def query_processing_task_state(task_id: str) -> Dict[str, Any]:
"""
Query the processing state of a processing task given a L2 task id.
Args:
task_id: Task id of the L2 processing task
Returns:
Dictionary of the format:
{
"state": "submission_pending",
}
"""
if not task_id:
raise ValueError("task_id must be provided")
api_url = os.getenv("QUERY_TASK_STATE_API_URL")
if not api_url:
raise RuntimeError("QUERY_TASK_STATE_API_URL environment variable is not set")
endpoint = f"{api_url}/datasync/level2/{task_id}"
try:
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Failed to query task state: {e}")
if not data.get("success"):
raise RuntimeError(f"Unexpected API response: {data}")
result = data.get("result")
if not result:
return {"state": "not_found"}
sync_state = result.get("syncState")
return {"state": sync_state if sync_state else "unknown"}
import os
import requests
from typing import Dict, Any, List
def query_metadata(
filter: Dict[str, Any],
key: List[str],
) -> List[Dict[str, Any]]:
"""
Query for file info by metadata values.
Args:
filter: The filter dict described below.
key: A list of string values, corresponding to metadata keys that should be included in the output.
Returns:
A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'.
E.g. with key = ["dataset", "instrument", "obs_group", "obs_id"]
then returns:
[
{
"file_path": "CSST_L0/MSC/SCI/60310/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_03_L0_V01.fits",
"metadata": {
"dataset":"csst-msc-c11-1000sqdeg-wide-test-v2",
"instrument":"MSC",
"obs_group":"W1",
"obs_id":"10200000000"
},
},
]
"""
if not filter:
raise ValueError("Filter cannot be empty")
api_url = os.getenv("METADATA_SEARCH_API_URL")
if not api_url:
raise RuntimeError("METADATA_SEARCH_API_URL environment variable is not set")
endpoint = f"{api_url}/datasync/metadata/query"
payload = {
"filter": filter,
"key": key,
}
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Metadata query failed: {e}")
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
import os
import requests
from typing import Dict, Any, List
def query_star_catalog(
catalog_name: str,
filter: Dict[str, Any],
key: List[str],
) -> List[Dict[str, Any]]:
"""
Query a star catalog by column values given a ra, dec and radius preselection.
Args:
catalog_name: Name of the star catalog (e.g. msc_l1_mbi_catmix)
filter: The filter dict described below.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the column names that should be present in the return value.
Returns:
A List[Dict] of matching star catalog objects.
"""
for required_key in ("ra", "dec", "radius"):
if required_key not in filter or isinstance(filter[required_key], dict):
raise ValueError(f"Filter must contain scalar '{required_key}'")
if not key:
raise ValueError("Key list cannot be empty")
api_url = os.getenv("STAR_CATALOG_SEARCH_API_URL")
if not api_url:
raise RuntimeError("STAR_CATALOG_SEARCH_API_URL environment variable is not set")
endpoint = f"{api_url}/starcatalog/query"
payload = {
"catalogName": catalog_name,
"filter": filter,
"key": key,
}
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Star catalog query failed: {e}")
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
......@@ -7,7 +7,8 @@ setup(
install_requires=[
'astropy>=5.3',
'fsspec>=2024.5.0',
's3fs>=2024.5.0'
's3fs>=2024.5.0',
'requests'
],
python_requires='>=3.9',
description='csst pipeline handle file in local file system and remote s3 file system',
......