# -*- coding: utf-8 -*- from abc import abstractmethod, ABCMeta from csst_ifs_common.common import CsstIFSInit from csst_ifs_common.ifs_status import CsstIFSStatus from csst_ifs_common.ifs_result import CsstIFSResult from csst_ifs_common.interface import BasePipelineInterface csst_ifs = CsstIFSInit() class PipIFSL1RSS(BasePipelineInterface): def __init__(self): self.result = CsstIFSStatus self.logger = csst_ifs.logger self.status = CsstIFSResult def run(self): """API, ... Parameters ---------- Returns ------- """ from csst_ifs_rss.rss import run_all self.exit_code = "200" self.additionally_algorithm_01() sci_fits_list = list() sci_list = csst_ifs.config.get_ifs_config("path_l0_rss") for path_sci in sci_list: print(path_sci) fits_output = run_all(path_sci) for i in [2, 6, 7]: sci_fits_list.append(fits_output[i]) csst_ifs.config.set_ifs_config("path_l1_rss", sci_fits_list) def para_set(self): """API, ... Parameters ---------- Returns ------- """ csst_ifs.config_ifs_rss_env() csst_ifs.config.set_ifs_config( "path_RSSlog", "/nfsdata/share/pipeline-unittest/csst_ifs/data/dfs_dummy/L1/IFS_simData_2023-07-25/rss/path_RSSlog.txt") csst_ifs.config.set_ifs_config( "dir_rss_proc", "/nfsdata/share/pipeline-unittest/csst_ifs/data/dfs_dummy/L1/IFS_simData_2023-07-25/rss/") path_list = csst_ifs.get_all_file_path( "/nfsdata/share/pipeline-unittest/csst_ifs/data/dfs_dummy/L0/IFS_simData_2023-07-25/sky_data_case_1") csst_ifs.config.set_ifs_config("path_l0_rss", path_list) def result_check(self): """API, ... Parameters ---------- Returns ------- """ something_wrong = True if something_wrong: status = 2 assert status in [CsstIFSStatus.PERFECT, CsstIFSStatus.WARNING] # 可以或计划被外部调用的方法 def additionally_algorithm_01(self): pass # 内部方法,"_"前缀用于添加隐藏属性 def _additionally_algorithm_02(self): pass