Commit c6c642a7 authored by Matthias Weidenthaler's avatar Matthias Weidenthaler
Browse files

Add metadata based star catalog query functionality

parent df84a03f
...@@ -4,6 +4,9 @@ This repository provides the following functionalities: ...@@ -4,6 +4,9 @@ This repository provides the following functionalities:
3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values) 3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values)
4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) 4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state)
5. [Query a Star Catalog](#5-query-a-star-catalog) 5. [Query a Star Catalog](#5-query-a-star-catalog)
6. [Query a Star Catalog Corresponding to Metadata Entries](#6-query-a-star-catalog-corresponding-to-metadata-entries)
7. [Trigger a Pipeline Run](#7-trigger-a-pipeline-run)
8. [Query a Pipeline Run State](#8-query-a-pipeline-run-state)
# 1. Read or Download a File from S3 storage # 1. Read or Download a File from S3 storage
Supported are two distinct ways of reading from s3 storage. Supported are two distinct ways of reading from s3 storage.
...@@ -241,6 +244,59 @@ def query_star_catalog( ...@@ -241,6 +244,59 @@ def query_star_catalog(
] ]
""" """
``` ```
# 6. Query a Star Catalog Corresponding to Metadata Entries
First queries the metadata catalog, based on that subsequently queries the star catalog.
## Function `query_star_catalog_with_metadata`
```python
def query_star_catalog_with_metadata(
metadata: Dict[str, Any],
star_catalog: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""
Queries the metadata catalog according to the provided filter criteria and HDU value.
Subsequently queries the star catalog entries corresponding to the metadata results and
the given additional filters.
Returns the catalog columns specified in the 'key' list.
Args:
metadata: {
filter: filter dict described below.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
},
star_catalog: {
catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
filter: filter dict described below.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the column names that should be present in the return value.
}
Example:
from csst_fs import *
query_star_catalog_with_metadata(
star_catalog={
"catalogName": "csst-msc-l1-mbi-catmix",
"key": ["data_uuid", "obsid", "ra"],
"filter": {"ra": 130.97, "dec": -20.5, "radius": 0.09, "x": {"lt": 30}},
},
metadata={
"filter": {"priority": {"gte": 2}, "obs_id": 66},
"hdu": 0,
}
)
Returns:
A List[Dict] of matching star catalog objects.
"""
```
## Filter Syntax ## Filter Syntax
All filters are combined with logical AND (every clause must match). All filters are combined with logical AND (every clause must match).
1) String equality 1) String equality
...@@ -273,12 +329,12 @@ filter = { ...@@ -273,12 +329,12 @@ filter = {
``` ```
# 6. Trigger a Pipeline Run # 7. Trigger a Pipeline Run
Trigger execution of a Level-2 data processing pipeline for a specified batch. Trigger execution of a Level-2 data processing pipeline for a specified batch.
This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns the pipeline run_id when successful. This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns the pipeline run_id when successful.
The run_id can be used to [query the current state](#7-query-a-pipeline-run-state) of the corresponding run. The run_id can be used to [query the current state](#8-query-a-pipeline-run-state) of the corresponding run.
## Function: `run_pipeline` ## Function: `run_pipeline`
```python ```python
...@@ -320,8 +376,8 @@ result = run_pipeline(batch) ...@@ -320,8 +376,8 @@ result = run_pipeline(batch)
``` ```
# 7. Query a Pipeline Run State # 8. Query a Pipeline Run State
Query the state of a pipeline run given an id (obtained from [run_pipeline](#6-trigger-a-pipeline-run)) Query the state of a pipeline run given an id (obtained from [run_pipeline](#7-trigger-a-pipeline-run))
## Function: `query_run_state` ## Function: `query_run_state`
```python ```python
......
...@@ -5,6 +5,6 @@ from .table import fsspec_table ...@@ -5,6 +5,6 @@ from .table import fsspec_table
from . import s3_fs from . import s3_fs
from . import fs from . import fs
from .catalog.metadata import query_metadata from .catalog.metadata import query_metadata
from .catalog.star import query_star_catalog from .catalog.star import query_star_catalog, query_star_catalog_with_metadata
from .ingestion.level2 import start_ingestion_task, query_task_state from .ingestion.level2 import start_ingestion_task, query_task_state
from .pipeline.pipeline import run_pipeline, PipelineBatch, query_run_state from .pipeline.pipeline import run_pipeline, PipelineBatch, query_run_state
\ No newline at end of file
import os import os
import requests import requests
from typing import Dict, Any, List from typing import Dict, Any, List
from pydantic import BaseModel, Field, validator
from ..s3_config import load_backend_settings from ..s3_config import load_backend_settings
class StarCatalogQuery(BaseModel):
catalogName: str
key: List[str]
filter: Dict[str, Any]
@validator("key")
def key_not_empty(cls, v):
if not v:
raise ValueError("key list cannot be empty")
return v
class MetadataQuery(BaseModel):
filter: Dict[str, Any]
hdu: int = Field(default=0)
class CombinedCatalogMetadataRequest(BaseModel):
starCatalog: StarCatalogQuery
metadata: MetadataQuery
def query_star_catalog_with_metadata(
metadata: Dict[str, Any],
star_catalog: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""
Queries the metadata catalog according to the provided filter criteria and HDU value.
Subsequently queries the star catalog entries corresponding to the metadata results and
the given additional filters.
Returns the catalog columns specified in the 'key' list.
Args:
metadata: {
filter: filter dict described in the README filter syntax section.
hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1.
},
star_catalog: {
catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix)
filter: filter dict described in the README filter syntax section.
The following keys MUST be set:
{
"ra": 40.3,
"dec": 21.9,
"radius": 0.2,
}
Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported.
key: A list of string values, corresponding to the column names that should be present in the return value.
}
Example:
from csst_fs import *
query_starcatalog_with_metadata(
... star_catalog={
... "catalogName": "csst-msc-l1-mbi-catmix",
... "key": ["data_uuid", "obsid", "ra"],
... "filter": {"ra": 130.97, "dec": -20.5, "radius": 0.09, "x": {"lt": 30}},
... },
... metadata={
... "filter": {"priority": {"gte": 2}, "obs_id": 66},
... "hdu": 0,
... }
... )
Returns:
A List[Dict] of matching star catalog objects.
"""
api_url = load_backend_settings()['backend_url']
if not api_url:
raise RuntimeError("CSST backend api url is not set")
endpoint = f"{api_url}/starcatalog/query/metadata"
star_catalog_model = StarCatalogQuery(**star_catalog)
metadata_model = MetadataQuery(**metadata)
payload = CombinedCatalogMetadataRequest(
starCatalog=star_catalog_model,
metadata=metadata_model,
).dict()
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
if not data.get("success") or "result" not in data:
raise RuntimeError(f"Unexpected API response: {data}")
return data["result"]
def query_star_catalog( def query_star_catalog(
catalog_name: str, catalog_name: str,
filter: Dict[str, Any], filter: Dict[str, Any],
......
...@@ -8,7 +8,8 @@ setup( ...@@ -8,7 +8,8 @@ setup(
'astropy>=5.3', 'astropy>=5.3',
'fsspec>=2024.5.0', 'fsspec>=2024.5.0',
's3fs>=2024.5.0', 's3fs>=2024.5.0',
'requests' 'requests',
'pydantic'
], ],
python_requires='>=3.9', python_requires='>=3.9',
description='csst pipeline handle file in local file system and remote s3 file system', description='csst pipeline handle file in local file system and remote s3 file system',
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment