This repository provides the following functionalities: 1. [Read or Download a File From S3 Storage](#1-read-or-download-a-file-from-s3-storage) 2. [Commit For File Processing](#2-commit-for-file-processing) 3. [Query a List Of L1/L2 Fits-Files By Metadata Values](#3-query-a-list-of-l1l2-fits-files-by-metadata-values) 4. [Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) 5. [Query a Star Catalog](#5-query-a-star-catalog) 6. [Query a Star Catalog Corresponding to Metadata Entries](#6-query-a-star-catalog-corresponding-to-metadata-entries) 7. [Trigger a Pipeline Run](#7-trigger-a-pipeline-run) 8. [Query a Pipeline Run State](#8-query-a-pipeline-run-state) # 1. Read or Download a File from S3 storage Supported are two distinct ways of reading from s3 storage. 1) [Download to a local file](#从s3下载到本地) 2) [use open() to get a file object](#open-for-read) ## Configuration **astropy 需升级至 5.3** **老写法同时兼容本地nas和云上s3,只要读路径以s3:// 协议开头会自动识别** ## 从s3下载到本地 ```python def get_path(remote_path: str, local_path: str): """ Download a file/folder from s3 to local storage. Args: remote_path: s3 key local_path: Local path that will be downloaded to. """ def info_path(remote_path: str): """ Get information about a s3 file. Args: remote_path: s3 key """ # Example: from csst_fs import s3_fs # single file s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', 'v01.fits') # folder s3_fs.get_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0', './', recursive=True) # get file or folder info s3_fs.info_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits') ``` ## Open for read ```python def open_path(remote_path: str, mode: str = 'r'): """ Get a readonly file object from a file on s3. Use mode = 'rb' for binary files. Args: remote_path: s3 key mode: str = 'r' For binary files: 'rb', default: 'r' Returns: File object of the s3 file. """ # Example: from csst_fs import s3_fs # open single file (s3 or local) with s3_fs.open_path('projects/csst-pipeline/csst_mbi_sample_dataset/L0/10100000000/MS/CSST_MSC_MS_SCIE_20290225043953_20290225044223_10100000000_01_L0_V01.fits', mode='rb') as file: file.read() ``` # 2. Commit For File Processing Submit a file's content and file name to the ingestion API for further processing. The function will return a successfull response as soon as the file content is successfully stored and queued for further processing. Otherwise, the function will handle errors appropriately. A successfull response contains a task_id referring to the queued processing task. This can be used in [4. Query a L2 Processing Tasks State](#4-query-a-l2-processing-tasks-state) for querying a processing task's current state. ## Function: `start_ingestion_task` ```python def start_ingestion_task(files: List[dict]) -> Dict[str, Any]: """ Submit a list of file contents and file names for ingestion. Args: [ { file_name (str): The file name for storing the file after ingestion. file_content (bytes): The file's content }, { ... } ] Returns: dict: A dict containing a task_id referring to the queued processing task as well as a field failed, listing the file names for which ingestion failed. Example: { "task_id": "5", "failed": List[str] List of file names for which ingestion failed. } Raises: RuntimeError: If committing failed after retries. """ ``` # 3. Query a List Of L1/L2 Fits-Files By Metadata Values Query for file info by metadata values. ## Function: `query_metadata` ```python def query_metadata( filter: Dict[str, Any], key: List[str], hdu: int = 0 ) -> List[Dict[str, Any]]: """ Query for file info by metadata values. Args: filter: The filter dict described below. key: A list of string values, corresponding to metadata keys that should be included in the output. hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1. Returns: A List[Dict] of matching documents containing a file_path value and the keys set as 'key' parameter under 'metadata'. E.g. with key = ["CABEND", "qc_status"] then returns: [ { "urn": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits", "metadata": { "CABEND": "59785.82529", "qc_status": "0.0" }, "removed": false, "created": 1756284502817, "parentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/10109300100413/", "name": "CSST_MSC_MS_SCI_20231022050242_20231022050512_10109300100413_14_L1_V01.fits", "lastModified": 1756284502817, "grandParentPath": "s3://csst/testing/L1/MSC/msc-v093-r1/kKwmIwzv/SCI/", "platform": "s3", "tags": [ "L1" ] } ] """ ``` ## Filter Syntax All filters are combined with logical AND (every clause must match). 1) String equality ```python filter = { "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2", "obs_type": "WIDE", } ``` 2) Numeric equality and ranges Supported inequality operators are: lt/gt: less/greater than lte/gte: less/greater than or equal ```python filter = { "dataset": "csst-msc-c11-1000sqdeg-wide-test-v2", "ra": { "gte": 250, "lte": 260 }, "qc_status": 0, } ``` 3) List of values The queried data should match one of the values in the list. String or number values are possible. ```python filter = { "NAXIS": [0, 1] } ``` # 4. Query a L2 Processing Tasks State Query the processing state of a processing task given a L2 task id. ## Function: `query_task_state` ```python def query_task_state( task_id: str ) -> Dict[str, Any] """ Query the processing state of a processing task given a L2 task id. Args: task_id: Task id of the L2 processing task Returns: Dictionary of the following format, including information about the current state of the corresponding processing task. The following strings are valid state values: tbd E.g. { "state": "submission_pending", } """ ``` # 5. Query a Star Catalog Query a star catalog by column values given a ra, dec and radius preselection. ## Function: `query_star_catalog` ```python def query_star_catalog( catalog_name: str, filter: Dict[str, Any], key: List[str], ) -> List[Dict[str, Any]]: """ Query a star catalog by column values given a ra, dec and radius preselection. Args: catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix) filter: The filter dict described below. The following keys MUST be set: { "ra": 40.3, "dec": 21.9, "radius": 0.2, } Ra, dec values pinpoint a location, 'radius' defines a radius in [deg] around this point. Only star catalog objects withing this area are considered for subsequent filtering. Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported. key: A list of string values, corresponding to the colum names that should be present in the return value. Returns: A List[Dict] of matching star catalog objects, containing key-value pairs for the keys set as 'key' parameter. E.g. with key = ["x", "bulge_flux", "ab"] then returns: [ { "x": 995.27, "bulge_flux": "3.2", "ab": 1.2, }, ] """ ``` # 6. Query a Star Catalog Corresponding to Metadata Entries First queries the metadata catalog, based on that subsequently queries the star catalog. ## Function `query_star_catalog_with_metadata` ```python def query_star_catalog_with_metadata( metadata: Dict[str, Any], star_catalog: Dict[str, Any], ) -> List[Dict[str, Any]]: """ Queries the metadata catalog according to the provided filter criteria and HDU value. Subsequently queries the star catalog entries corresponding to the metadata results and the given additional filters. Returns the catalog columns specified in the 'key' list. Args: metadata: { filter: filter dict described below. hdu: The hdu the filter & key arguments refer to. Default is 0. E.g. 0, 1. }, star_catalog: { catalog_name: Name of the star catalog (e.g. csst-msc-l1-mbi-catmix) filter: filter dict described below. The following keys MUST be set: { "ra": 40.3, "dec": 21.9, "radius": 0.2, } Setting ranges with (lt, gt, lte, gte) for ra, dec values is not supported. key: A list of string values, corresponding to the column names that should be present in the return value. } Example: from csst_fs import * query_star_catalog_with_metadata( star_catalog={ "catalogName": "csst-msc-l1-mbi-catmix", "key": ["data_uuid", "obsid", "ra"], "filter": {"ra": 130.97, "dec": -20.5, "radius": 0.09, "x": {"lt": 30}}, }, metadata={ "filter": {"priority": {"gte": 2}, "obs_id": 66}, "hdu": 0, } ) Returns: A List[Dict] of matching star catalog objects. """ ``` ## Filter Syntax All filters are combined with logical AND (every clause must match). 1) String equality ```python filter = { "ra": 40.3, "dec": 21.9, "radius": 0.2, "msc_photid": "00101000703350610200001812", "detector": "06", } ``` 2) Numeric equality and ranges Supported inequality operators are: lt/gt: less/greater than lte/gte: less/greater than or equal ```python filter = { "ra": 40.3, "dec": 21.9, "radius": 0.2, "msc_photid": "00101000703350610200001812", "x": { "gte": 996, "lte": 1000, }, "ratio_disk": -9999, } ``` # 7. Trigger a Pipeline Run Trigger execution of a Level-2 data processing pipeline for a specified batch. This function submits a pipeline run request to the backend API, retries transient failures up to three times, and returns the pipeline run_id when successful. The run_id can be used to [query the current state](#8-query-a-pipeline-run-state) of the corresponding run. ## Function: `run_pipeline` ```python def run_pipeline(batch: PipelineBatch) -> Dict[str, Any]: """ Submitts a pipeline run for execution with the provided batch. Retries up to 3 times on transient/network errors. Args: batch: A dictionary describing the batch to run. { "dag_group": str, "dag": str, "batch_id": str, } Returns: dict: A dictionary containing the run_id of the newly submitted pipeline run. Example: { "run_id": "3" } Raises: RuntimeError: If the pipeline API request fails or returns an invalid response after all retry attempts. """ # Example: from csst_fs import run_pipeline batch = { "dag_group": "dag_group_name", "dag": "dag_name", "batch_id": "batch_1" } result = run_pipeline(batch) # result: {'run_id': '4'} ``` # 8. Query a Pipeline Run State Query the state of a pipeline run given an id (obtained from [run_pipeline](#7-trigger-a-pipeline-run)) ## Function: `query_run_state` ```python def query_run_state( run_id: str ) -> Dict[str, Any] """ Query the processing state of a pipeline run given an id. Args: run_id: Run id of the pipeline run. Returns: Dictionary of the following format, including information about the current state of the corresponding run. Possible values are "running" and "completed" E.g. { "state": "running", } """ # Example from csst_fs import query_run_state result = query_run_state("4") # result: {'state': 'completed'} ```