dfs.py 3.79 KB
Newer Older
1
2
3
import redis
import toml
import os
BO ZHANG's avatar
BO ZHANG committed
4
5
from csst_dfs_client import plan, level0
from astropy.table import Table
BO ZHANG's avatar
BO ZHANG committed
6
import socket
7
8
9
10

CONFIG = toml.load(os.path.join(os.path.dirname(__file__), "config", "config.toml"))


BO ZHANG's avatar
BO ZHANG committed
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def check_port(ip, port, timeout=3):
    """
    # # 示例:检查 192.168.1.1 的 80 端口是否开放
    # print(check_port("192.168.1.1", 80))  # True/False
    """
    try:
        # 创建 Socket 连接(TCP)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)  # 设置超时时间(秒)
        # 尝试连接
        result = sock.connect_ex((ip, port))
        # 返回状态
        if result == 0:  # 0 表示成功
            return True
        else:
            return False
    except Exception as e:
        print(f"Error: {e}")
        return False
    finally:
        sock.close()  # 确保关闭连接


34
class DFS:
BO ZHANG's avatar
BO ZHANG committed
35
36
    def __init__(self, location=None):
        # try each location
BO ZHANG's avatar
BO ZHANG committed
37
        print("Test all locations:", end="")
BO ZHANG's avatar
BO ZHANG committed
38
39
        status_list = []
        for loc in CONFIG.keys():
BO ZHANG's avatar
BO ZHANG committed
40
            print(f"{loc}...", end="")
BO ZHANG's avatar
BO ZHANG committed
41
42
43
44
45
46
            dfs_ip = CONFIG[loc]["dfs"]["CSST_DFS_GATEWAY"].split(":")[0]
            dfs_port = int(CONFIG[loc]["dfs"]["CSST_DFS_GATEWAY"].split(":")[1])
            redis_ip = CONFIG[loc]["redis"]["host"]
            redis_port = CONFIG[loc]["redis"]["port"]
            this_dfs_status = check_port(dfs_ip, dfs_port, timeout=1)
            this_redis_status = check_port(redis_ip, redis_port, timeout=1)
BO ZHANG's avatar
BO ZHANG committed
47
48
49
50
51
52
53
            this_status = dict(
                location=loc,
                status=this_dfs_status and this_redis_status,
                dfs=this_dfs_status,
                redis=this_redis_status,
            )
            status_list.append(this_status)
BO ZHANG's avatar
BO ZHANG committed
54
55
            # print(this_status)
        print("Done!\n")
BO ZHANG's avatar
BO ZHANG committed
56
57
        status_table = Table(status_list)
        print(status_table)
58

BO ZHANG's avatar
BO ZHANG committed
59
60
61
62
63
64
65
66
67
68
69
70
71
        if status_table["status"].sum() == 0:
            raise ValueError("No DFS location is available")
        elif status_table["status"].sum() > 1:
            print("Multiple DFS locations are available, please specify one")
            raise ValueError("Multiple DFS locations are available")
        else:
            self.location = status_table["location"][status_table["status"]][0]
            self.config = CONFIG[self.location]
            print(f"Using DFS location: {location}")
            for k, v in CONFIG[loc]["dfs"].items():
                os.environ.setdefault(k, str(v))
            # print("Setting redis config:")
            self.redis = Redis(location=loc)
72
73
74


class Redis(redis.Redis):
BO ZHANG's avatar
BO ZHANG committed
75
    def __init__(self, location="naoc", **kwargs):
76
77
78
79
80
81
82
83
        if location not in CONFIG.keys():
            print("Available redis locations: ", list(CONFIG.keys()))
            raise ValueError(f"Unknown redis location: {location}")
        super().__init__(
            host=CONFIG[location]["redis"]["host"],
            port=CONFIG[location]["redis"]["port"],
            db=CONFIG[location]["redis"]["db"],
            password=CONFIG[location]["redis"]["password"],
BO ZHANG's avatar
BO ZHANG committed
84
            **kwargs,
85
86
87
        )
        self.qname = password = CONFIG[location]["redis"]["qname"]
        self.config = CONFIG[location]["redis"]
BO ZHANG's avatar
BO ZHANG committed
88
89
90
        # print("Setting redis config:")
        # for k, v in self.config.items():
        #     print(f" - {k}: {v}")
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

    def push(self, msg):
        self.lpush(self.qname, msg)

    def pop(self):
        return self.rpop(self.qname)

    def get_all(self):
        return self.lrange(self.qname, 0, -1)


# msgs = r.lrange(name, 0, -1)
# for chipid in range(6, 26):
#     this_msg = gen_msg(
#         dag_id="csst-msc-l1-mbi", obsid="11009101682009", chipid=f"{chipid:02d}"
#     )
#     print(this_msg)
#     r.lpush(name, this_msg)
#
# msgs = r.lrange(name, 0, -1)
# print(msgs)
#
# msgs_later = r.lrange(name, 0, -1)
# print(msgs_later)