import redis import toml import os from astropy.table import Table import socket CONFIG = toml.load(os.path.join(os.path.dirname(__file__), "config.toml")) def check_port(ip, port, timeout=3): """ # # 示例:检查 192.168.1.1 的 80 端口是否开放 # print(check_port("192.168.1.1", 80)) # True/False """ try: # 创建 Socket 连接(TCP) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) # 设置超时时间(秒) # 尝试连接 result = sock.connect_ex((ip, port)) # 返回状态 if result == 0: # 0 表示成功 return True else: return False except Exception as e: print(f"Error: {e}") return False finally: sock.close() # 确保关闭连接 class DFS: def __init__(self, location=None): # try each location print("Test all locations...", end="") status_list = [] for loc in CONFIG.keys(): dfs_ip = CONFIG[loc]["dfs"]["CSST_DFS_GATEWAY"].split(":")[0] dfs_port = int(CONFIG[loc]["dfs"]["CSST_DFS_GATEWAY"].split(":")[1]) redis_ip = CONFIG[loc]["redis"]["host"] redis_port = CONFIG[loc]["redis"]["port"] this_dfs_status = check_port(dfs_ip, dfs_port, timeout=1) this_redis_status = check_port(redis_ip, redis_port, timeout=1) this_status = dict( location=loc, status=this_dfs_status and this_redis_status, dfs=this_dfs_status, redis=this_redis_status, ) status_list.append(this_status) # print(this_status) print("Done!\n") status_table = Table(status_list) print(status_table) print("\n") if status_table["status"].sum() == 0: print("No DFS location is available") elif status_table["status"].sum() > 1: print("Multiple DFS locations are available, please specify one") elif location is None: # set DFS automatically if status_table["status"].sum() == 1: print("One DFS locations are available, good") location = status_table["location"][status_table["status"]][0] elif status_table["status"].sum() == 0: print("No DFS location is available, using csu") location = "csu" else: raise ValueError("Multiple DFS locations are available") self.location = location self.config = CONFIG[self.location] for k, v in CONFIG[self.location]["dfs"].items(): os.environ.setdefault(k, str(v)) # print("Setting redis config:") self.redis = Redis(location=self.location) class Redis(redis.Redis): def __init__(self, location="naoc", **kwargs): if location not in CONFIG.keys(): print("Available redis locations: ", list(CONFIG.keys())) raise ValueError(f"Unknown redis location: {location}") super().__init__( host=CONFIG[location]["redis"]["host"], port=CONFIG[location]["redis"]["port"], db=CONFIG[location]["redis"]["db"], password=CONFIG[location]["redis"]["password"], **kwargs, ) self.qname = password = CONFIG[location]["redis"]["qname"] self.config = CONFIG[location]["redis"] # print("Setting redis config:") # for k, v in self.config.items(): # print(f" - {k}: {v}") def push(self, msg): self.lpush(self.qname, msg) def pop(self): return self.rpop(self.qname) def get_all(self): return self.lrange(self.qname, 0, -1) # msgs = r.lrange(name, 0, -1) # for chipid in range(6, 26): # this_msg = gen_msg( # dag_id="csst-msc-l1-mbi", obsid="11009101682009", chipid=f"{chipid:02d}" # ) # print(this_msg) # r.lpush(name, this_msg) # # msgs = r.lrange(name, 0, -1) # print(msgs) # # msgs_later = r.lrange(name, 0, -1) # print(msgs_later)