Commit 23c06579 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

init Pipeline

parent 68e469b0
import json
import os.path
import subprocess
from astropy import time
from .logger import get_logger
class Pipeline:
def __init__(
self,
dir_in="/pipeline/input",
dir_out="/pipeline/output",
dir_aux="/pipeline/aux",
dfs_root="/dfsroot",
crds_root="/crdsroot",
crds_cache="/pipeline/crds_cache",
clean=True,
n_jobs_cpu=18,
n_jobs_gpu=9,
device="CPU",
):
self.dir_in = dir_in
self.dir_out = dir_out
self.dir_aux = dir_aux
self.dfs_root = dfs_root
self.crds_root = crds_root
self.crds_cache = crds_cache
self.clean = clean
self.n_jobs_cpu = n_jobs_cpu
self.n_jobs_gpu = n_jobs_gpu
self.device = device
self.pipeline_logger = get_logger(name="pipeline", filename=os.path.join(self.dir_out, "pipeline.log"))
self.module_logger = get_logger(name="module", filename=os.path.join(self.dir_out, "module.log"))
# change working directory
print(f"Change directory to {self.dir_out}")
os.chdir(self.dir_out)
# clean input/output directory
if self.clean:
self.clean_directory(self.dir_in)
self.clean_directory(self.dir_out)
# Frequently used files
self.msg_file = Message(os.path.join(self.dir_out, "msg.txt"))
self.time_stamp = TimeStamp(os.path.join(self.dir_out, "time_stamp.txt"))
self.exit_code = ExitCode(os.path.join(self.dir_out, "exit_code"))
@staticmethod
def clean_directory(d):
print(f"Clean output directory '{d}'...")
r = subprocess.run(f"rm -rf {d}/*", shell=True, capture_output=True)
print("> ", r)
r.check_returncode()
def now(self):
return time.Time.now().isot
class Message:
def __init__(self, file_path=""):
self.file_path = file_path
def __repr__(self):
return f"< Message [{self.file_path}] >"
def write(self, dlist):
with open(self.file_path, "w+") as f:
for d in dlist:
f.write(self.dict2msg(d) + "\n")
@staticmethod
def dict2msg(d):
m = json.dumps(d).replace(" ", "")
return m
@staticmethod
def msg2dict(m):
d = json.loads(m)
return d
class ExitCode:
def __init__(self, file_path=""):
self.file_path = file_path
def __repr__(self):
return f"< ExitCode [{self.file_path}] >"
def truncate(self):
with open(self.file_path, 'w') as file:
file.truncate(0)
def write(self, code=0):
with open(self.file_path, "w+") as f:
f.write(str(code))
print(f"Exit with code {code} (written to '{self.file_path}')")
class TimeStamp:
def __init__(self, file_path=""):
self.file_path = file_path
def __repr__(self):
return f"< TimeStamp [{self.file_path}] >"
def truncate(self):
with open(self.file_path, 'w') as file:
file.truncate(0)
def punch_in(self):
with open(self.file_path, "a+") as f:
f.write(f"{time.Time.now().isot}+00:00\n")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment