Commit 0dd6f9d4 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

implement Pipeline.message.write() and Pipeline.timestamp.record()

parent 43be4795
Pipeline #2731 passed with stage
in 0 seconds
...@@ -266,18 +266,21 @@ class Message: ...@@ -266,18 +266,21 @@ class Message:
def __repr__(self): def __repr__(self):
return f"< Message [{self.file_path}] >" return f"< Message [{self.file_path}] >"
def write(self, dlist): def write(self, dlist: list[dict]):
"""Write messages to file."""
with open(self.file_path, "w+") as f: with open(self.file_path, "w+") as f:
for d in dlist: for d in dlist:
f.write(self.dict2msg(d) + "\n") f.write(self.dict2msg(d) + "\n")
@staticmethod @staticmethod
def dict2msg(d): def dict2msg(d: dict):
"""Convert `dict` to JSON format string."""
m = json.dumps(d).replace(" ", "") m = json.dumps(d).replace(" ", "")
return m return m
@staticmethod @staticmethod
def msg2dict(m): def msg2dict(m: str):
"""Convert JSON format string to `dict`."""
d = json.loads(m) d = json.loads(m)
return d return d
...@@ -322,8 +325,8 @@ class Timestamp: ...@@ -322,8 +325,8 @@ class Timestamp:
with open(self.file_path, "w") as file: with open(self.file_path, "w") as file:
file.truncate(0) file.truncate(0)
def touch(self): def record(self):
"""Write a time stamp.""" """Record a time stamp."""
with open(self.file_path, "a+") as f: with open(self.file_path, "a+") as f:
f.write(f"{Time.now().isot}+00:00\n") f.write(f"{Time.now().isot}+00:00\n")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment