Commit 1d1cb6d7 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

major updates

parent e696cd97
import os
from .top_level_interface import *
__version__ = "0.0.1"
PACKAGE_PATH = os.path.dirname(__file__)
a, b, c
1,2,3
4,5,6
\ No newline at end of file
import numpy as np
from astropy import table
from . import PACKAGE_PATH
# the main algorithm
def flip_image(img: np.ndarray):
""" flip an input image
This function uses an awesome algorithm to flip an input image.
Parameters
----------
img:
the input image
Returns
-------
the flipped image
"""
try:
assert img.ndim == 2
except AssertionError:
raise AssertionError("The input image is {}D not 2D!".format(img.ndim))
return img[::-1, ::-1]
# a demo on how to package data
def print_data():
""" print out data """
fp_data = PACKAGE_PATH + "/data/data.csv"
data = table.Table.read(fp_data)
print(data)
return
from .flip_image import flip_image, print_data
__all__ = ["flip_image", "print_data"]
import joblib
from astropy.io import fits
from csst.core.processor import CsstProcessor
from csst.msc.data_manager import CsstMscDataManager
from csst_proto import flip_image
class CsstProcPhotonAbsorption(CsstProcessor):
class CsstProcFlipImage(CsstProcessor):
""" This processor absorbs photons """
def __init__(self, dm: CsstMscDataManager, n_jobs: int = 1):
super(CsstProcPhotonAbsorption, self).__init__()
super(CsstProcFlipImage, self).__init__()
self.dm = dm
self.n_jobs = n_jobs
def run(self, debug=False):
def run(self, debug: bool = False):
""" run this processor
Parameters
----------
debug
debug:
if True, use debug mode
Returns
-------
"""
joblib.Parallel(n_jobs=self.n_jobs)(
joblib.delayed(CsstProcPhotonAbsorption.run_one_chip)(this_ccd_id)
img_flipped_list = joblib.Parallel(n_jobs=self.n_jobs)(
joblib.delayed(CsstProcFlipImage.run_one_chip)(this_ccd_id)
for this_ccd_id in self.dm.target_ccd_ids)
return
return img_flipped_list
def prepare(self, **kwargs):
""" prepare the environment """
......@@ -38,6 +40,14 @@ class CsstProcPhotonAbsorption(CsstProcessor):
return
@staticmethod
def run_one_chip(ccd_id):
""" run for one chip """
return
def run_one_chip(dm, ccd_id):
""" run for one chip
this function
"""
# input file path
fp_input = dm.l1_ccd(ccd_id, post="img.fits")
img_input = fits.getdata(fp_input)
# flip image
img_flipped = flip_image(img_input)
return img_flipped
import numpy as np
# this function need to be finished
# a function to be finished
def cos_to_be_finished(x):
# TODO: to be finished
return
......@@ -28,5 +28,3 @@ def cos(x: float = 0.):
"""
return np.cos(x)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment