"csst_dag/git@csst-tb.bao.ac.cn:csst-cicd/csst-dag.git" did not exist on "4fd243b6a3d332982e1fff8c70912d5aa216496b"
dfs.py 3 KB
Newer Older
1
2
3
import redis
import toml
import os
BO ZHANG's avatar
BO ZHANG committed
4
5
from csst_dfs_client import plan, level0
from astropy.table import Table
6
7
8
9
10

CONFIG = toml.load(os.path.join(os.path.dirname(__file__), "config", "config.toml"))


class DFS:
BO ZHANG's avatar
BO ZHANG committed
11
12
13
14
15
16
17
    def __init__(self, location=None):
        # try each location
        print("Test all locations:")
        status_list = []
        for loc in CONFIG.keys():
            # print("Setting DFS config:")
            for k, v in CONFIG[loc]["dfs"].items():
18
                os.environ.setdefault(k, str(v))
BO ZHANG's avatar
BO ZHANG committed
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
            # print("Setting redis config:")
            redis = Redis(location=loc)
            this_dfs_status = plan.find(instrument="").success
            try:
                this_redis_status = redis.ping()
            except BaseException:
                this_redis_status = False
            this_status = dict(
                location=loc,
                status=this_dfs_status and this_redis_status,
                dfs=this_dfs_status,
                redis=this_redis_status,
            )
            status_list.append(this_status)
        status_table = Table(status_list)
        print(status_table)
35

BO ZHANG's avatar
BO ZHANG committed
36
37
38
39
40
41
42
43
44
45
46
47
48
        if status_table["status"].sum() == 0:
            raise ValueError("No DFS location is available")
        elif status_table["status"].sum() > 1:
            print("Multiple DFS locations are available, please specify one")
            raise ValueError("Multiple DFS locations are available")
        else:
            self.location = status_table["location"][status_table["status"]][0]
            self.config = CONFIG[self.location]
            print(f"Using DFS location: {location}")
            for k, v in CONFIG[loc]["dfs"].items():
                os.environ.setdefault(k, str(v))
            # print("Setting redis config:")
            self.redis = Redis(location=loc)
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91


class Redis(redis.Redis):
    def __init__(self, location="naoc"):
        if location not in CONFIG.keys():
            print("Available redis locations: ", list(CONFIG.keys()))
            raise ValueError(f"Unknown redis location: {location}")
        super().__init__(
            host=CONFIG[location]["redis"]["host"],
            port=CONFIG[location]["redis"]["port"],
            db=CONFIG[location]["redis"]["db"],
            password=CONFIG[location]["redis"]["password"],
        )
        self.qname = password = CONFIG[location]["redis"]["qname"]
        self.config = CONFIG[location]["redis"]

        print("Setting redis config:")
        for k, v in self.config.items():
            print(f" - {k}: {v}")

    def push(self, msg):
        self.lpush(self.qname, msg)

    def pop(self):
        return self.rpop(self.qname)

    def get_all(self):
        return self.lrange(self.qname, 0, -1)


# msgs = r.lrange(name, 0, -1)
# for chipid in range(6, 26):
#     this_msg = gen_msg(
#         dag_id="csst-msc-l1-mbi", obsid="11009101682009", chipid=f"{chipid:02d}"
#     )
#     print(this_msg)
#     r.lpush(name, this_msg)
#
# msgs = r.lrange(name, 0, -1)
# print(msgs)
#
# msgs_later = r.lrange(name, 0, -1)
# print(msgs_later)