import os import socket from urllib.parse import urlparse import csst_fs from astropy import table from csst_dfs_client import plan, level0, level1, catalog from ._csst import csst from .dag_utils import override_common_keys # ----------------- # INPUT PARAMETERS # ----------------- DFS1_PLAN_PARAMS = { "dataset": None, "instrument": None, "obs_type": None, "obs_group": None, "obs_id": None, "proposal_id": None, } DFS1_LEVEL0_PARAMS = { "dataset": None, "instrument": None, "obs_type": None, "obs_group": None, "obs_id": None, "detector": None, "filter": None, "prc_status": None, "qc_status": None, # "data_model" } DFS1_LEVEL1_PARAMS = { "dataset": None, "instrument": None, "obs_type": None, "obs_group": None, "obs_id": None, "detector": None, "prc_status": None, "qc_status": None, # special keys for data products "data_model": None, "batch_id": "default_batch", # "build": None, # "pmapname": None, } # PROC_PARAMS = { # "priority": 1, # "batch_id": "default_batch", # "pmapname": "pmapname", # "final_prc_status": -2, # "demo": False, # # should be capable to extend # } # ----------------- # OUTPUT PARAMETERS # ----------------- # plan basis keys DFS1_PLAN_BASIS_KEYS = ( "dataset", "instrument", "obs_type", "obs_group", "obs_id", "n_file", "_id", ) # data basis keys DFS1_LEVEL0_BASIS_KEYS = ( "dataset", "instrument", "obs_type", "obs_group", "obs_id", "detector", "file_name", "_id", "prc_status", "qc_status", ) DFS1_LEVEL1_BASIS_KEYS = ( "dataset", "instrument", "obs_type", "obs_group", "obs_id", "detector", "file_name", "_id", "prc_status", "qc_status", "data_model", "batch_id", "build", "pmapname", ) # DFS2 META DFS2_META_BASIS_KEYS = ( "dataset", "instrument", "obs_type", "obs_group", "obs_id", "detector", "filter", "pmapname", "ref_cat", "custom_id", "batch_id", "dag_group", "dag_group_run", "dag", "dag_run", "priority", "data_list", "extra_kwargs", "create_time", "rerun", "data_model", "data_uuid", "qc_status", "docker_image", "build", "object", "proposal_id", "ra", "dec", "obs_date", "prc_date", ) DFS2_META_PARAMS = {k: None for k in DFS2_META_BASIS_KEYS} def assert_env_exists(env_var: str): """Assert that an environment variable exists. Parameters ---------- env_var : str Name of the environment variable to check. Raises ------ AssertionError If the environment variable is not set. """ assert env_var in os.environ, f"Environment variable {env_var} is not set." def check_url_accessibility(url: str, timeout=3, raise_error: bool = True): """Check if a URL is accessible. Parameters ---------- url : str URL to check. timeout : int, optional Timeout in seconds, by default 3. raise_error : bool, optional Whether to raise an error if the URL is not accessible, by default True. Raises ------ AssertionError If the URL is not accessible and raise_error is True. Returns ------- bool True if the URL is accessible, False otherwise. """ try: if not url.startswith("http"): url = f"http://{url}" parsed_url = urlparse(url) ip, port = parsed_url.hostname, parsed_url.port print(ip, port) if port is None: port = 80 # 创建 Socket 连接(TCP) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) # 设置超时时间(秒) # 尝试连接 result = sock.connect_ex((ip, port)) # 返回状态 if result == 0: # 0 表示成功 return True else: if raise_error: raise AssertionError(f"URL {url} is not accessible.") else: return False finally: sock.close() # 确保关闭连接 # join_type for data x plan PLAN_JOIN_TYPE = "inner" """ References: - https://docs.astropy.org/en/stable/api/astropy.table.join.html - https://docs.astropy.org/en/stable/table/operations.html#join Typical types: - inner join: Only matching rows from both tables - left join: All rows from left table, matching rows from right table - right join: All rows from right table, matching rows from left table - outer join: All rows from both tables - cartesian join: Every combination of rows from both tables """ # def extract_basis_table(dlist: list[dict], basis_keys: tuple) -> table.Table: # """Extract basis key-value pairs from a list of dictionaries.""" # return table.Table([{k: d.get(k, "") for k in basis_keys} for d in dlist]) # def split_data_basis(data_basis: table.Table, n_split: int = 1) -> list[table.Table]: # """Split data basis into n_split parts via obs_id""" # assert ( # np.unique(data_basis["dataset"]).size == 1 # ), "Only one dataset is allowed for splitting." # # sort # data_basis.sort(keys=["dataset", "obs_id"]) # # get unique obsid # u_obsid, i_obsid, c_obsid = np.unique( # data_basis["obs_id"].data, return_index=True, return_counts=True # ) # # set chunk size # chunk_size = int(np.fix(len(u_obsid) / n_split)) # # initialize chunks # chunks = [] # for i_split in range(n_split): # if i_split < n_split - 1: # chunks.append( # data_basis[ # i_obsid[i_split * chunk_size] : i_obsid[(i_split + 1) * chunk_size] # ] # ) # else: # chunks.append(data_basis[i_obsid[i_split * chunk_size] :]) # # np.unique(table.vstack(chunks)["_id"]) # # np.unique(table.vstack(chunks)["obs_id"]) # return chunks class DFS: # plan table # dfs1_plan_find = plan.find dfs1_level0_find = level0.find dfs1_level1_find = level1.find # dfs1_dag_find = dag.find dfs1_catalog = catalog # file search dfs2_product_find = csst_fs.query_metadata def __init__(self, raise_error: bool = True): assert_env_exists("CSST_DFS_GATEWAY") assert_env_exists("CSST_DFS_TOKEN") assert_env_exists("CSST_BACKEND_API_URL") # CSST_DFS_GATEWAY=10.200.60.246:28000 # CSST_BACKEND_API_URL=http://10.200.60.199:9010 # check DFS accessibility # DFS1 check_url_accessibility(os.environ["CSST_DFS_GATEWAY"], raise_error=raise_error) # DFS2 check_url_accessibility( os.environ["CSST_BACKEND_API_URL"], raise_error=raise_error ) @staticmethod def dfs1_find_plan(**kwargs) -> table.Table: """Find plan data from DFS1.""" # query prompt = "DFS1.plan" qr_kwargs = override_common_keys(DFS1_PLAN_PARAMS, kwargs) qr = plan.find(**qr_kwargs) assert qr.success, qr print(f">>> [{prompt}] query kwargs: {qr_kwargs}") print(f">>> [{prompt}] {len(qr.data)} records found.") # plan basis / obsid basis try: for _ in qr.data: this_instrument = _["instrument"] if this_instrument == "HSTDM": if _["params"]["detector"] == "SIS12": this_n_file = len(_["params"]["exposure_start"]) * 2 else: this_n_file = len(_["params"]["exposure_start"]) else: # count effective detectors of this instrument this_n_file = len(csst[this_instrument].effective_detector_names) _["n_file"] = this_n_file except KeyError: print(f"`n_epec_frame` is not found in {_}") raise KeyError(f"`n_epec_frame` is not found in {_}") return table.Table(qr.data) @staticmethod def dfs1_find_plan_basis(**kwargs) -> table.Table: """Extract plan basis from plan data.""" plan_data = DFS.dfs1_find_plan(**kwargs) plan_basis = plan_data[DFS1_PLAN_BASIS_KEYS] return plan_basis @staticmethod def dfs1_find_level0(**kwargs) -> table.Table: # query prompt = "DFS1.level0" qr_kwargs = override_common_keys(DFS1_LEVEL0_PARAMS, kwargs) qr = level0.find(**qr_kwargs) assert qr.success, qr print(f">>> [{prompt}] query kwargs: {qr_kwargs}") print(f">>> [{prompt}] {len(qr.data)} records found.") return table.Table(qr.data) @staticmethod def dfs1_find_level0_basis(**kwargs) -> table.Table: level0_data = DFS.dfs1_find_level0(**kwargs) level0_basis = level0_data[DFS1_LEVEL0_BASIS_KEYS] return level0_basis @staticmethod def dfs1_find_level1(**kwargs) -> table.Table: # query prompt = "DFS1.level1" qr_kwargs = override_common_keys(DFS1_LEVEL1_PARAMS, kwargs) qr = level1.find(**qr_kwargs) assert qr.success, qr print(f">>> [{prompt}] query kwargs: {qr_kwargs}") print(f">>> [{prompt}] {len(qr.data)} records found.") return table.Table(qr.data) @staticmethod def dfs1_find_level1_basis(**kwargs) -> table.Table: level1_data = DFS.dfs1_find_level1(**kwargs) level1_basis = level1_data[DFS1_LEVEL1_BASIS_KEYS] return level1_basis # TODO: DFS2 META query @staticmethod def dfs2_find_meta(**kwargs) -> table.Table: """Find meta data from DFS2.""" # query prompt = "DFS2.meta" qr_kwargs = override_common_keys(DFS2_META_PARAMS, kwargs) qr = csst_fs.query_metadata(**qr_kwargs) assert qr.success, qr print(f">>> [{prompt}] query kwargs: {qr_kwargs}") print(f">>> [{prompt}] {len(qr.data)} records found.") return table.Table(qr.data)