diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/__init__.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..77cfb644e906a1cdb3e36c4603cf5b82bbf24d6d --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/__init__.py @@ -0,0 +1,3 @@ +import os +from .csst_msc_mbi_instrument import core_msc_l1_mbi_instrument +__all__ = ["core_msc_l1_mbi_instrument"] \ No newline at end of file diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/config.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/config.py new file mode 100644 index 0000000000000000000000000000000000000000..fbf9650e3d86029c6d4dce2e9e49bc114ea9dc09 --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/config.py @@ -0,0 +1,820 @@ +""" +Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/config.py +Name: config.py +Description: Configuration classes. +Author: Li Shao (shaoli@nao.cas.cn) +Created: 2023-11-16 +Modified-History: + 2023-11-16, Li Shao, created + 2023-11-23, Li Shao, change the way to get CCDArrayConfig + 2023-12-21, Li Shao, add CameraConfig + 2024-02-06, Li Shao, move to csst_msc_instrument + 2024-03-05, Li Shao, update flag and CCDArrayConfig +""" + +import numpy as np +from astropy.table import Table + +__all__ = ["FlagConfig", "CameraConfig", "CCDArrayConfig", + "TestCCD", "SimCCD", "GM1201APrototype", "get_array_config"] + + +class FlagConfig: + """ + Defines the universal bit flag value. + + It can return the name and the value of each flag definition, giving a safe way to handle flags. + + Attributes + ---------- + dtype : type + The data type of the flag. + data : tuple[list] + Data including the bit values, names and comments. + + Methods + ------- + get_value(name) + Get the flag value. + get_comment(name) + Get the comment string of the flag. + get_name(bit) + Get the name of the flag of the given bit. + make_mask(names) + Make a bitmask from a name list of masked bits. + + Examples + -------- + flagconf = CCDFlagConfig() + name = flagconf.get_name(0) # "no_data" + value = flagconf('no_data') # 1 + value = flagconf.get_value('no_data') # 1 + comment = flagconf.get_comment('dead_pixel') # "permanently damaged pixel" + mask = flagconf.make_mask('no_data', 'dead_pixel') # 3 = 0b11 + """ + + def __init__(self): + self._version = 'sim_202312' + self._comment = '' + + self.dtype = np.uint32 + + self.data = ([0, 'no_data', 'no data'], + [1, 'dead_pixel', 'permanently damaged pixel'], + [2, 'no_gain', 'no gain value is available'], + [3, 'poor_gain', 'gain value is unreliable'], + [4, 'no_bias', 'no bias value is available'], + [5, 'poor_bias', 'bias value is unreliable'], + [6, 'noisy_readout', 'readout noise is exceptionally large'], + [7, 'no_dark', 'no dark current value is available'], + [8, 'poor_dark', 'dark current value is unreliable'], + [9, 'hot_pixel', 'pixel with exceptionally large dark current'], + [10, 'warm_pixel', 'pixel with moderately large dark current'], + [11, 'no_flat', 'no flat value is available'], + [12, 'poor_flat', 'flat value is unreliable'], + [13, 'dark_pixel', 'pixel with relatively low QE'], + [14, 'poor_nonlinearity', 'pixel with large nonlinearity'], + [15, 'saturated', 'saturated pixel'], + [16, 'saturated_crosstalk', 'pixel affected by crosstalk of saturated pixel'], + [17, 'sink_pixel', 'pixel with exceptionaly high trap density'], + [18, 'poor_cti_corr', 'pixel affected by poor CTI correction'], + [19, 'poor_shutter_shade', 'pixel affected by poor shutter shade correction'], + [20, 'cosmic_ray', 'identified cosmic ray'], + [21, 'poor_cosmic_ray', 'pixel affected by poor cosmic ray correction'], + [22, 'poor_fringe', 'pixel affected by poor fringe correction'], + [23, 'ghost', 'optics ghost'], + [24, 'satellite_trail', 'trail caused by passby satellite'], + [25, 'stray_light', 'other serious stray light feature'], + [26, 'open_pixel', 'pixel with poor electron bounding capability'], + [27, 'adj_open_pixel', 'neighboring pixel of any open pixel'], + [28, 'undefined', 'not defined'], + [29, 'phot_detection', 'source detection region (0 order source if on spectroscopic image)'], + [30, 'spec_detection', 'spectra detection region'], + [31, 'spec_contamination', 'pixel contaminated by other higher/lower order spectra'], + ) + + def __call__(self, + name: str, + ) -> np.unsignedinteger: + return self.get_value(name) + + def get_value(self, + name: str, + ) -> np.unsignedinteger: + """ + Return the value of the flag. + + The input flag name should be in lower case. The value will be returned in format of self.dtype. + If no name is matched, then it will raise an exception. + + Parameters + ---------- + name : str + The name of the flag. + + Returns + ------- + numpy.unsignedinteger + The value of the flag. + """ + + value = None + for entry in self.data: + if entry[1] == name: + value = self.dtype(2 ** entry[0]) + if value is None: + raise KeyError('Invalid flag name: {}.'.format(name)) + + return value + + def get_comment(self, + name: str, + ) -> str: + """ + Return the comment string of the flag. + + The input flag name should be in lower case. + If no name is matched, then it will raise an exception. + + Parameters + ---------- + name : str + The name of the flag. + + Returns + ------- + str + The value of the flag. + """ + + comment = None + for entry in self.data: + if entry[1] == name: + comment = entry[2] + if comment is None: + raise KeyError('Invalid flag name: {}.'.format(name)) + + return comment + + def get_name(self, + bit: int, + ) -> str | None: + """ + Return the flag name of the given bit. + + The function searches all defined bit. If the bit is not defined, then None will be returned. + + Parameters + ---------- + bit : int + The bit value. + + Returns + ------- + str or None + The name of the flag. If the bit is undefined, return None. + """ + + name = None + for entry in self.data: + if entry[0] == bit: + name = entry[1] + + return name + + def print(self) -> None: + """ + Print the bitmask definition table. + """ + + print('Bitmask definition:') + for i in range(np.iinfo(self.dtype).bits): + name = self.get_name(i) + if name is not None: + value = self(name) + comment = self.get_comment(name) + print(" {:2d}\t{:15d}\t\t{:20s}\t\t{}".format(i, value, name, comment)) + + def make_mask(self, + *names: str, + ) -> np.unsignedinteger: + """ + Make a bitmask from a name list of masked bits. + + All bits correponding to the given names will be flagged. Used for combination of several flags. + + Parameters + ---------- + *names : str + Names of bit masks to be masked. Variable length. + + Returns + ------- + numpy.unsignedinteger + The mask (in unsigned integer format). + """ + + outmask = self.dtype(0) + for name in set(names): + value = self.get_value(name) + outmask += value + + return outmask + + +class CameraConfig: + """ + Define camera configurations: chip layout, LED, etc. + + It inclues the filter, the pixel array configuration, LED, etc. + + Attributes + ---------- + chip : astropy.table.Table + The table containing the chip information. + led : astropy.table.Table + The table containing the LED information. + """ + + def __init__(self): + self._version = 'sim_202312' + self._comment = '' + + self.chip = Table() + self.chip['index'] = np.arange(30, dtype=int) + self.chip['chipid'] = ['01', '02', '03', '04', '05', + '06', '07', '08', '09', '10', + '11', '12', '13', '14', '15', + '16', '17', '18', '19', '20', + '21', '22', '23', '24', '25', + '26', '27', '28', '29', '30'] + self.chip['row'] = [0, 0, 0, 0, 0, + 1, 1, 1, 1, 1, + 2, 2, 2, 2, 2, + 3, 3, 3, 3, 3, + 4, 4, 4, 4, 4, + 5, 5, 5, 5, 5] + self.chip['column'] = [0, 1, 2, 3, 4, + 0, 1, 2, 3, 4, + 0, 1, 2, 3, 4, + 0, 1, 2, 3, 4, + 0, 1, 2, 3, 4, + 0, 1, 2, 3, 4] + self.chip['filter'] = ['GI', 'GV', 'GU', 'GU', 'GV', + 'y', 'i', 'g', 'r', 'GI', + 'z', 'NUV', 'NUV', 'u', 'y', + 'y', 'u', 'NUV', 'NUV', 'z', + 'GI', 'r', 'g', 'i', 'y', + 'GV', 'GU', 'GU', 'GV', 'GI'] + self.chip['chiplab'] = ['GI-1', 'GV-1', 'GU-1', 'GU-2', 'GV-2', + 'y-1', 'i-1', 'g-1', 'r-1', 'GI-2', + 'z-1', 'NUV-1', 'NUV-2', 'u-1', 'y-2', + 'y-3', 'u-2', 'NUV-3', 'NUV-4', 'z-2', + 'GI-3', 'r-2', 'g-2', 'i-2', 'y-4', + 'GV-3', 'GU-3', 'GU-4', 'GV-4', 'GI-4'] + self.chip['array_config'] = ['CCDArrayConfig', ] * 30 + + self.led = Table() + self.led['index'] = np.arange(28) + self.led['dev'] = np.append(['main'] * 14, ['back'] * 14) + self.led['id'] = [i + 1 for i in range(14)] * 2 + self.led['name'] = ['LED{:02d}'.format(i + 1) for i in range(14)] * 2 + self.led['wave'] = np.array([275, 310, 430, 505, 545, 590, 670, + 760, 880, 940, 1050, 1550, 340, 365] * 2).astype(float) + self.led['filter_phot'] = ['NUV', 'NUV', 'g', 'g', 'g', 'r', 'r', + 'i', 'z', 'y', 'J', 'H', 'u', 'u'] * 2 + self.led['filter_spec'] = ['GU', 'GU', 'GV', 'GV', 'GV', 'GV', 'GI', + 'GI', 'GI', 'GI', 'J', 'H', 'GU', 'GU'] * 2 + + +class CCDArrayConfig: + """ + Defines pixel array configuration. + + Basic pixel array configuration class. A parent class for all other array configurations. + Default values are from e2v CCD290-99. + + Parameters + ---------- + image_format : str + Data format: + "raw": readout oriented raw format (default); + "cube": data cube, all channels are stacked on z direction; + "sky": sky oriented mosaic format. + nx : int, optional + The number of pixels in each channel in x (serial readout) direction. + nx_active : int + The number of active pixels in each channel in x (serial readout) direction. + nx_prescan_physical : int or list or tuple or numpy.ndarray, optional + The number of physical prescan pixels in each channel in x (serial readout) direction. + nx_prescan : int or list or tuple or numpy.ndarray, optional + The number of prescan pixels in each channel in x (serial readout) direction. + ny : int, optional + The number of pixels in each channel in y (parallel readout) direction. + ny_active : int + The number of active pixels in each channel in y (serial readout) direction. + ny_prescan_physical : int or list or tuple or numpy.ndarray, optional + The number of physical prescan pixels in each channel in y (parallel readout) direction. + ny_prescan : int or list or tuple or numpy.ndarray, optional + The number of prescan pixels in each channel in y (parallel readout) direction. + version : str, optional + The version tag. + comment : str, optional + Comment to this version. + + Attributes + ---------- + nchan : int + The number of channels. + nx_overscan : numpy.ndarray + A list of numbers of overscan pixels in each channel in x (serial readout) direction. + ny_overscan : numpy.ndarray + A list of numbers of overscan pixels in each channel in y (parallel readout) direction. + + Methods + ------- + has_valid_overscan(minwidth, axis='x') + To judge if there are enough overscan region for overscan correction. + get_shape(full=True) + Return the shape of array (numpy default definition). + get_boundary(channel, rtype='active', full=True) + Calculate boundary indices for active, prescan, overscan, channel and physical pixel regions. + get_cutout(channel, image, rtype='active', flip=True) + Get image cutout of specified region (active, prescan, overscan or channel). + set_cutout(channel, image, cutout, rtype='active', flip=True) + Paste the cutout into specific region of the image. + + Examples + -------- + conf0 = CCDArrayConfig("raw") + conf1 = CCDArrayConfig("sky") + conf0.has_valid_overscan(100, axis='y') # False + conf1.get_shape(full=False) # 1152, 4616 + conf0.get_range(1, rtype='channel') # 0, 1250, 0, 4700 + conf1.get_range(1, rtype='active') # 0, 1152, 0, 4616 + cutout = conf0.get_cutout(4, image, rtype='x_overscan') # serial overscan region: numpy array (4616, 71) + conf0.set_cutout(4, image2, cutout, rtype='x_overscan') # paste the cutout back to some other image + """ + + def __init__(self, + image_format: str, + nx: int = 1250, + nx_active: int = 1152, + nx_prescan_physical: int | list | tuple | np.ndarray = 27, + nx_prescan: int | list | tuple | np.ndarray = 27, + ny: int = 4700, + ny_active: int = 4616, + ny_prescan_physical: int | list | tuple | np.ndarray = 0, + ny_prescan: int | list | tuple | np.ndarray = 0, + version: str = '', + comment: str = '', + ): + + self.image_format = image_format + if self.image_format not in ['raw', 'cube', 'sky']: + raise ValueError('Invalid image type: "{}". Must be "raw", "cube" or "sky".'.format(self.image_format)) + self.version = version + self.comment = comment + + # physical configuration + self.nchan = 16 + self.nx_active = nx_active + self.ny_active = ny_active + + # prescan + if type(nx_prescan_physical) is int: + self.nx_prescan_physical = np.full(self.nchan, nx_prescan_physical) + else: + self.nx_prescan_physical = np.asarray(nx_prescan_physical).astype(int) + if type(ny_prescan_physical) is int: + self.ny_prescan_physical = np.full(self.nchan, ny_prescan_physical) + else: + self.ny_prescan_physical = np.asarray(ny_prescan_physical).astype(int) + + # single channel configuration + # always follow the readout sequence: prescan -> active -> overscan + if self.image_format == 'sky': + self.nx, self.ny = self.nx_active, self.ny_active # no prescan or overscan + self.nx_prescan = np.full(self.nchan, 0, dtype=int) + self.ny_prescan = np.full(self.nchan, 0, dtype=int) + else: + self.nx, self.ny = nx, ny # include prescan and overscan + if type(nx_prescan) is int: + self.nx_prescan = np.full(self.nchan, nx_prescan) + else: + self.nx_prescan = np.asarray(nx_prescan).astype(int) + if type(ny_prescan) is int: + self.ny_prescan = np.full(self.nchan, ny_prescan) + else: + self.ny_prescan = np.asarray(ny_prescan).astype(int) + self.nx_overscan = self.nx - self.nx_prescan - self.nx_active + self.ny_overscan = self.ny - self.ny_prescan - self.ny_active + self.shape = (self.ny, self.nx) + + # image configuration + # xflip and yflip decide if the channel cutout should be flipped when making full image + if self.image_format == 'raw': + self.full_shape = (self.ny, self.nx * 16) + self.col = np.arange(16, dtype=int) + self.row = np.zeros(16, dtype=int) + self.xflip = np.zeros(16, dtype=bool) + self.yflip = np.zeros(16, dtype=bool) + elif self.image_format == 'cube': + self.full_shape = (16, self.ny, self.nx) + self.col = np.zeros(16, dtype=int) + self.row = np.zeros(16, dtype=int) + self.xflip = np.zeros(16, dtype=bool) + self.yflip = np.zeros(16, dtype=bool) + else: + self.full_shape = (self.ny * 2, self.nx * 8) + self.col = np.asarray([0, 1, 2, 3, 4, 5, 6, 7, 7, 6, 5, 4, 3, 2, 1, 0]) + self.row = np.asarray([0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1]) + self.xflip = np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0]).astype(bool) + self.yflip = np.asarray([0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1]).astype(bool) + + def print(self): + """ + Print configuration. + """ + print('CLASS: {}'.format(self.__class__.__name__)) + for key in self.__dict__: + if type(self.__dict__[key]) is str: + print(' {} = "{}"'.format(key, self.__dict__[key])) + else: + print(' {} = {}'.format(key, self.__dict__[key])) + + def has_valid_overscan(self, + minwidth: int, + axis: str = 'x', + ) -> bool: + """ + Judge if there are enough overscan region for overscan correction. + + Judge if there are enough overscan region for overscan correction. + The function can be used for exception raising when overscan is used. + + Parameters + ---------- + minwidth : int + Minimum width of valid overscan region. + axis : str, optional + Overscan direction, "x" (default) or "y". + + Returns + ------- + bool + If True, the configuration contains valid overscan region. + """ + + if axis == 'x': + index = self.nx_overscan < minwidth + else: + index = self.ny_overscan < minwidth + if index.sum() > 0: + flag = False + else: + flag = True + + return flag + + def get_shape(self, + full: bool = True, + ) -> tuple: + """ + Return the shape of array (numpy default definition). + + This function returns the image shape, for both full image and channel image. + + Parameters + ---------- + full : bool, optional + If True, return the shape of full image; otherwise return the shape of single channel. + + Returns + ------- + tuple + The shape of image, in format of tuple (ny, nx) or (nz, ny, nx). + """ + + if full: + return self.full_shape + else: + return self.shape + + def get_boundary(self, + channel: int, + rtype: str = 'active', + full: bool = True, + ) -> tuple[int, int, int, int]: + """ + Calculate boundary indices for active, prescan, overscan, channel and physical pixel regions. + + This function can return corner indices for different kind of regions. It could be used for image cutout. + The indices are in python format (start from 0). + + Parameters + ---------- + channel : int + Channel number (start from 1). + rtype : str, optional + Region type: + "active": area exposed to external light (default); + "channel": the whole channel; + "x_prescan": serial prescan region (including artificial and physical pixels); + "y_prescan": parallel prescan region; + "x_overscan": serial overscan region; + "y_overscan": parallel overscan region; + "physical": region with physical pixels, including physical part of prescan and active region; + "cti": region affected by charge transfer inefficiency, i.e. all pixels except artificial prescan. + full : bool, optional + If True (default), return the coordinates on the full image; + otherwise return the coordinates on single channel cutout. + + Returns + ------- + x1 : int + X start coordinate of the desired region (python indexing). + x2 : int + X end coordinate of the desired region (python indexing). + y1 : int + Y start coordinates of the desired region (python indexing). + y2 : int + Y end coordinates of the desired region (python indexing). + """ + + xcoords = [0, self.nx_prescan[channel - 1], self.nx_prescan[channel - 1] + self.nx_active, self.nx] + ycoords = [0, self.ny_prescan[channel - 1], self.ny_prescan[channel - 1] + self.ny_active, self.ny] + + # one single channel cutout: readout at (0, 0) + if rtype == 'active': + x1, x2 = xcoords[1], xcoords[2] + y1, y2 = ycoords[1], ycoords[2] + elif rtype == 'channel': + x1, x2 = xcoords[0], xcoords[3] + y1, y2 = ycoords[0], ycoords[3] + elif rtype in ['x_prescan', 'x_overscan']: + y1, y2 = ycoords[1], ycoords[2] + if rtype == 'x_prescan': + x1, x2 = xcoords[0], xcoords[1] + else: + x1, x2 = xcoords[2], xcoords[3] + elif rtype in ['y_prescan', 'y_overscan']: + x1, x2 = xcoords[1], xcoords[2] + if rtype == 'y_prescan': + y1, y2 = ycoords[0], ycoords[1] + else: + y1, y2 = ycoords[2], ycoords[3] + elif rtype == 'physical': + x1, x2 = xcoords[1] - self.nx_prescan_physical[channel - 1], xcoords[2] + y1, y2 = ycoords[1] - self.ny_prescan_physical[channel - 1], ycoords[2] + elif rtype == 'cti': + x1, x2 = xcoords[1] - self.nx_prescan_physical[channel - 1], xcoords[3] + y1, y2 = ycoords[1] - self.ny_prescan_physical[channel - 1], ycoords[3] + else: + raise ValueError('Invalid region type. Please use one of them: ' + + '"active", "x_prescan", "y_prescan", "x_overscan", "y_overscan", ' + + '"channel", "physical", "cti"') + + # if on full mosaic image, flip and shift the coordinates + if full: + if self.xflip[channel - 1]: + x1, x2 = self.nx - x2, self.nx - x1 + if self.yflip[channel - 1]: + y1, y2 = self.ny - y2, self.ny - y1 + x1 = self.col[channel - 1] * self.nx + x1 + x2 = self.col[channel - 1] * self.nx + x2 + y1 = self.row[channel - 1] * self.ny + y1 + y2 = self.row[channel - 1] * self.ny + y2 + + return x1, x2, y1, y2 + + def get_cutout(self, + channel: int, + image: np.ndarray, + rtype: str = 'active', + flip: bool = True, + ) -> np.ndarray: + """ + Get image cutout of specified region (active, prescan, overscan or channel). + + A 2D image cutout will be extracted by requirement. + The input image should be either full image or single channel image. + + Parameters + ---------- + channel : int + Channel number (start from 1). + image : numpy.ndarray + Input image. + rtype : str, optional + Region type. See get_boundary() for more details. + flip : bool, optional + If True (default), return the flipped image cutout according to xflip/yflip parameter; + otherwise return the cutout with the same orientation as input image. + It only works if the input image is full image. + + Returns + ------- + numpy.ndarray + 2D image cutout of desired region. + """ + + if np.array_equal(image.shape, self.get_shape(full=True)): + full = True + elif np.array_equal(image.shape, self.get_shape(full=False)): + full = False + else: + raise ValueError('Invalid input image shape: {}.'.format(image.shape)) + + x1, x2, y1, y2 = self.get_boundary(channel, rtype=rtype, full=full) + if x1 == x2 or y1 == y2: + raise IndexError('No such region exists. Please check your configuration attributes.') + + if len(image.shape) == 3: + cutout = image[channel - 1, y1:y2, x1:x2] + else: + cutout = image[y1:y2, x1:x2] + + if full and flip: + if self.xflip[channel - 1]: + cutout = cutout[:, ::-1] + if self.yflip[channel - 1]: + cutout = cutout[::-1, :] + + return cutout + + def set_cutout(self, + channel: int, + image: np.ndarray, + cutout: np.ndarray, + rtype: str = 'active', + flip: bool = True, + ) -> None: + """ + Paste a 2D image cutout to a specified region of image. + + The specific region of the image will be overwritten by the cutout. Caution should be taken here. + + Parameters + ---------- + channel : int + Channel number (start from 1). + image : numpy.ndarray + Image to be updated. + cutout : numpy.ndarray + Input cutout. It must be a 2D array. + rtype : str, optional + Region type. See get_boundary() for more details. + flip : bool, optional + If True (default), return the flipped image cutout according to xflip/yflip parameter; + otherwise return the cutout with the same orientation as input image. + It only works if the input image is full image. + """ + + if np.array_equal(image.shape, self.get_shape(full=True)): + full = True + elif np.array_equal(image.shape, self.get_shape(full=False)): + full = False + else: + raise ValueError('Invalid input image shape: {}.'.format(image.shape)) + + x1, x2, y1, y2 = self.get_boundary(channel, rtype=rtype, full=full) + if x1 == x2 or y1 == y2: + raise IndexError('No such region exists. Please check your configuration attributes.') + if not np.array_equal(cutout.shape, (y2 - y1, x2 - x1)): + raise ValueError('Invalid input cutout shape: {}. It should be {}'.format(cutout.shape, (y2 - y1, x2 - x1))) + + cutout_paste = cutout.copy() + if full and flip: + if self.xflip[channel - 1]: + cutout_paste = cutout_paste[:, ::-1] + if self.yflip[channel - 1]: + cutout_paste = cutout_paste[::-1, :] + + if len(image.shape) == 3: + image[channel - 1, y1:y2, x1:x2] = cutout_paste + else: + image[y1:y2, x1:x2] = cutout_paste + + +class TestCCD(CCDArrayConfig): + """ + Defines pixel array configuration for testing. + + Pixel array configuration class for testing. A subclass of CCDArrayConfig. + + Parameters + ---------- + image_format : str + Data format. See CCDArrayConfig for more details. + """ + + def __init__(self, + image_format: str, + ): + super().__init__(image_format, + nx=200, + ny=300, + nx_active=50, + ny_active=150, + nx_prescan_physical=np.full(4, 27, dtype=int), + nx_prescan=np.full(4, 27, dtype=int), + ny_prescan_physical=np.full(4, 4, dtype=int), + ny_prescan=np.full(4, 4, dtype=int), + version='test_202403', + comment='for function testing only') + self.nchan = 4 + if self.image_format == 'raw': + self.full_shape = (self.ny, self.nx * 4) + self.col = np.arange(4, dtype=int) + self.row = np.zeros(4, dtype=int) + self.xflip = np.zeros(4, dtype=bool) + self.yflip = np.zeros(4, dtype=bool) + elif self.image_format == 'cube': + self.full_shape = (4, self.ny, self.nx) + self.col = np.zeros(4, dtype=int) + self.row = np.zeros(4, dtype=int) + self.xflip = np.zeros(4, dtype=bool) + self.yflip = np.zeros(4, dtype=bool) + else: + self.full_shape = (self.ny * 2, self.nx * 2) + self.col = np.asarray([0, 1, 1, 0]) + self.row = np.asarray([0, 0, 1, 1]) + self.xflip = np.asarray([0, 1, 1, 0]).astype(bool) + self.yflip = np.asarray([0, 0, 1, 1]).astype(bool) + + +class SimCCD(CCDArrayConfig): + """ + Defines pixel array configuration for CSST simulation. + + Pixel array configuration class for CSST simulation. A subclass of CCDArrayConfig. + + Parameters + ---------- + image_format : str + Data format. See CCDArrayConfig for more details. + """ + + def __init__(self, + image_format: str, + ): + super().__init__(image_format, + version='sim_202312', + comment='readout similar to e2v CCD290-99 NAOC laboratory test') + + +class GM1201APrototype(CCDArrayConfig): + """ + Defines pixel array configuration for CSST simulation. + + Pixel array configuration class for CSST simulation. A subclass of CCDArrayConfig. + + Parameters + ---------- + image_format : str + Data format. See CCDArrayConfig for more details. + """ + + def __init__(self, + image_format: str, + ): + x_prescan_physical = np.full(16, 27, dtype=int) + x_prescan_physical[[0, 7, 8, 15]] = 29 + x_prescan = np.full(16, 29, dtype=int) + super().__init__(image_format, + nx_prescan_physical=x_prescan_physical, + nx_prescan=x_prescan, + ny_prescan_physical=4, + ny_prescan=4, + version='gm1201a_202208', + comment='GM1201A user manual V2.7 and NAOC laboratory test') + + +def get_array_config(name: str, + ) -> type[CCDArrayConfig]: + """ + Get the pixel array configuration class by name. + + Return the type of class by giving its class name. Need to be called to make a new instance. + + Parameters + ---------- + name : str + Name of the class. + + Returns + ------- + generic + Class of the specific name. + + Examples + -------- + thisclass = get_array_config('CCDArrayConfig') # CCDArrayConfig + conf = thisclass() # CCDArrayConfig() + """ + + if name not in __all__ or name in ['FlagConfig', 'CameraConfig', 'get_array_config']: + raise ValueError('Invalid class name.') + + return eval(name) diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/csst_msc_mbi_instrument.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/csst_msc_mbi_instrument.py new file mode 100644 index 0000000000000000000000000000000000000000..3c99ca90b980ffe1737d78ecb6eb94d06ceba825 --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/csst_msc_mbi_instrument.py @@ -0,0 +1,471 @@ +""" +Identifier: csst_msc_mbi_instrument/csst_msc_mbi_instrument.py +Name: csst_msc_mbi_instrument.py +Description: Make the correction for the intrument effect of CSST MS-MBI +Author: Zhou Xie, created +Created: 2023-11-27 +Modified-History: + 2023-11-27, Zhou Xie, created +""" + +from datetime import datetime +from typing import Optional + +import csst_common +import numpy as np +from itertools import product +from astropy.io import fits +from astropy.stats import sigma_clipped_stats +# from csst_common import CsstResult, CsstStatus +# from csst_msc_instrument import ( +# remove_cr_deepcr, +# subtract_bias, +# subtract_dark, +# FlagConfig, +# ) +from csst_msc_instrument.image import ( + subtract_bias, + subtract_dark, + FlagConfig, +) + + +SATURATE = 50000 + + +def core_msc_l1_mbi_instrument( + image_path: str = "/path/to/image", + bias_path: str = "/path/to/bias", + dark_path: str = "/path/to/dark", + flat_path: str = "/path/to/flat", + shutter_path: str = "/path/to/shutter", + image_output_path: str = "/path/to/image_output", + weight_output_path: str = "/path/to/weight_output", + flag_output_path: str = "/path/to/flag_output", + deepcr_model_path: Optional[str] = None, + config_ccd_info: Optional[str] = None, + config_bad_pixel: Optional[str] = None, + device: Optional[str] = "CPU", +) #-> CsstResult: + """ + Make the instrument correction for one chip of CSST data. + + This function corrects the instrument effect, such as bias, dark, flat, shutter, cosmicrays etc. + + Parameters + ---------- + image_path : str + The sci file path. + bias_path : str + The bias file path. + dark_path : str + The dark file path. + flat_path : str + The flat file path. + shutter_path : str + The shutter file path. + image_output_path : str + The image output file path. + weight_output_path : str + The weight output file path. + flag_output_path : str + The flag output file path. + deepcr_model_path : str, optional + The deepcr model file path. + config_ccd_info : str, optional + The config for ccd file path. + config_bad_pixel : str, optional + The config for bad pixel file path. + device : str + The deepcr device input 'CPU' or 'GPU', by default 'CPU'. + + Returns + ------- + CsstResult + Result containing `status` and `files`. + """ + status_header = _set_status_header() + + # CCD image processing + with fits.open(image_path) as raw: + hdu = fits.HDUList( + [ + fits.PrimaryHDU(header=raw[0].header.copy()), + fits.ImageHDU(header=raw[1].header.copy(), data=raw[1].data.copy()), + ] + ) + # 创建wht[1].data hdulist + wht = fits.HDUList( + [ + fits.PrimaryHDU(header=raw[0].header.copy()), + fits.ImageHDU( + header=raw[1].header.copy(), + data=np.zeros_like(hdu[1].data, dtype=np.float32), + ), + ] + ) + # 创建flg[1].data hdulist + flg = fits.HDUList( + [ + fits.PrimaryHDU(header=raw[0].header.copy()), + fits.ImageHDU( + header=raw[1].header.copy(), + data=np.zeros_like(hdu[1].data, dtype=np.uint32), + ), + ] + ) + + img_err = np.zeros_like(hdu[1].data) + # # shutter + # shutter = fits.getdata(shutter_path, 1) + bias, bias_error = _read_ccds(bias_path) + dark, dark_error = _read_ccds(dark_path) + flat, flat_error = _read_ccds(flat_path) + + hdu[1].data, img_err = subtract_bias( + hdu[1].data, + img_err, + bias, + bias_error, + ) + status_header.set("S_BIAS", 0) + + hdu[1].data, img_err = subtract_dark( + hdu[1].data, img_err, dark, dark_error, hdu[0].header["EXPTIME"] + ) + status_header.set("S_DARK", 0) + + hdu[1].data, img_err = _divide_flat( + hdu[1].data, + img_err, + flat, + flat_error, + ) + status_header.set("S_FLAT", 0) + # 非线性改正 + hdu[1].data = _fix_nonlinear(hdu[1].data) + status_header.set("S_NLIN", 0) + # 坏像素检测 + flg[1].data = _check_badpix(flg[1].data, flat) + # 热像素和暖像素检测 + flg[1].data = _check_hot_and_warm_pix( + flg[1].data, dark, hdu[0].header["EXPTIME"], hdu[1].header["RON01"] + ) + # 过饱和检测 + saturate = SATURATE + flg[1].data = _check_over_saturation(flg[1].data, hdu[1].data, saturate) + saturate = saturate * hdu[1].header["GAIN01"] / hdu[0].header["EXPTIME"] + status_header.set("SATURATE", saturate) + # 时间归一 + hdu[1].data = hdu[1].data / hdu[0].header["EXPTIME"] + # 单位转换 + hdu[1].data = _multiply_gain_map(hdu[1].data, hdu[1].header) + # hdu[1].header.set('BUNIT', 'electron/s') + status_header.set("BUNIT", "electron/s") + # 检测宇宙线 + # hdu[1].data, flg[1].data, cr_count = remove_cr_deepcr( + # hdu[1].data, + # flg[1].data, + # device, + # hdu[1].header["CHIPID"], + # ) + # status_header.set("S_CRS", 0) + # status_header.set("CRCOUNT", cr_count) + + # 记录BKG与RMS + _, bkg, rms = sigma_clipped_stats(data=hdu[1].data, mask=(flg[1].data == 16)) + status_header.set("SKY_BKG", bkg) + status_header.set("SKY_RMS", rms) + # 检测卫星拖尾 + flg[1].data = _check_satellitetrail(flg[1].data, hdu[1].data) + status_header.set("S_SAT", 0) + # 生成wht[1].data + wht[1].data = _create_weight( + hdu[1].data, flg[1].data, hdu[1].header["RON01"], hdu[0].header["EXPTIME"] + ) + # 数据类型 + hdu[1].data = hdu[1].data.astype(np.float32) + + # # 快门改正 + # hdu[1].data = _fix_shutter(hdu[1].data, shutter, hdu[0].header["EXPTIME"]) + # status_header.set("S_SHUT", 0) + + if all( + [ + status_header["S_BIAS"] == 0, + status_header["S_DARK"] == 0, + status_header["S_FLAT"] == 0, + # status_header["S_CRS"] == 0, + status_header["S_NLIN"] == 0, + # status_header["S_CTE"] == 0, # 这个功能还未添加 + status_header["S_SAT"] == 0, + # status_header["S_SHUT"] == 0, + status_header["SKY_BKG"] != -9999, + status_header["SKY_RMS"] != -9999, + status_header["SATURATE"] != -9999, + # status_header["CRCOUNT"] != -9999, + ] + ): + status_header.set("S_INST", 0) + + # hdu[1].header.extend(status_header, bottom=True) + # 修正wht和flg数据类型 + wht[1].header = hdu[1].header.copy() + wht[1].data = wht[1].data.astype(np.float32) + wht[1].header.remove("BUNIT") + + flg[1].header = hdu[1].header.copy() + flg[1].data = flg[1].data.astype(np.uint32) + flg[1].header.set("BUNIT", "dimensionless unit") + + hdu.writeto(image_output_path, overwrite=True) + wht.writeto(weight_output_path, overwrite=True) + flg.writeto(flag_output_path, overwrite=True) + + # # construct CsstResult + # result = CsstResult( + # status=CsstStatus(status_header["S_INST"]), # 0/1/2\ + # header=status_header, + # files=[ + # image_output_path, + # weight_output_path, + # flag_output_path, + # ], + # ) + # return result + + +def _set_status_header(): + header_path = __file__.replace( + "csst_msc_mbi_instrument.py", "instrument_cor_demo.head" + ) + demo_header = fits.getheader(header_path, ignore_missing_simple=True) + + status_header = fits.Header() # template Header + status_header.append(("COMMENT", "=" * 66), bottom=True) + status_header.append(("COMMENT", "INSTRUMENT CORRECTION INFORMATION"), bottom=True) + status_header.append(("COMMENT", "=" * 66), bottom=True) + + for card in demo_header.cards: + status_header.append(card, bottom=True) + + return status_header + + +def _read_ccds( + ref_path: str, +) -> tuple[np.array, np.array]: + ref = fits.getdata(ref_path) + return ref, np.zeros_like(ref) + + +def _divide_flat( + image: np.ndarray, + image_error: np.ndarray, + flat: np.ndarray, + flat_error: np.ndarray, +) -> tuple[np.ndarray, np.ndarray]: + def divide(a, b): + return np.divide(a, b, out=np.zeros_like(a, float), where=(b != 0)) + + result = divide(image, flat) + result_error = ( + np.abs(result) + * ((divide(image_error, image)) ** 2 + (divide(flat_error, flat)) ** 2) ** 0.5 + ) + return result, result_error + + +def _fix_nonlinear(image: np.ndarray, beta1: float = 5e-7) -> np.ndarray: + # 计算非线性系数 + beta = 5e-7 + f1 = [1] + fnlin0 = 1 + beta + fnlin = [fnlin0] + for i in range(1000, 65000, 1000): + f1.append(i) + fnlin.append(1 + beta * i) + # 插值函数 + from scipy.interpolate import PchipInterpolator as PCHIP + + # PCHIP stands for Piecewise Cubic Hermite Interpolating Polynomial + interp_func = PCHIP(f1, fnlin) + # 非线性系数矩阵 + imgnlin = interp_func(image) + # 图像改正 + image = imgnlin * image + return image + + +def _check_badpix( + flag: np.ndarray, + flat: np.ndarray, +) -> np.ndarray: + med = np.median(flat) + _flag = (flat < 0.5 * med) | (1.5 * med < flat) + + flagconf = FlagConfig() + _flag = _flag.astype(flagconf.dtype) + + value = flagconf.get_value("dead_pixel") + flag = flag | (_flag * value) + return flag + + +def _check_hot_and_warm_pix( + flag: np.ndarray, + dark: np.ndarray, + exptime: float, + rdnoise: float, +) -> np.ndarray: + + flagconf = FlagConfig() + _dark = dark * exptime + _dark[_dark < 0] = 0 + _flag = 1 * rdnoise**2 <= _dark # 不确定是否包含 暂定包含 + _flag = _flag.astype(flagconf.dtype) + value = flagconf.get_value("hot_pixel") + flag = flag | (_flag * value) + + _flag = (0.5 * rdnoise**2 < _dark) & (_dark < 1 * rdnoise**2) + _flag = _flag.astype(flagconf.dtype) + value = flagconf.get_value("warm_pixel") + flag = flag | (_flag * value) + return flag + + +def _check_over_saturation( + flag: np.ndarray, + image: np.ndarray, + saturate: int = 65535, + iterations: int = 0, +) -> np.ndarray: + _flag = image >= saturate + _flag = _dilation(_flag, iterations=iterations) + flagconf = FlagConfig() + _flag = _flag.astype(flagconf.dtype) + value = flagconf.get_value("saturated") + flag = np.bitwise_or(flag, _flag * value) + return flag + + +def _dilation(array_orig: np.ndarray, iterations: int = 1): + """ + Make a dilation for array + + This function makes a dilation for the 2D mask array (saturated pixels) + + Parameters + ---------- + array_orig : np.ndarray + The mask array without dilation. + structure : int + The number of dilations performed on the structure with itself. + + Returns + ------- + array_out : np.ndarray + The mask array after dilation. + """ + from scipy import ndimage + + struct1 = ndimage.generate_binary_structure(2, 1) # give a saturate structure + struct_ext = ndimage.iterate_structure(struct1, iterations) # make a dilation + if iterations == 0: + array_out = array_orig + else: + array_out = ndimage.binary_dilation( + array_orig, structure=struct_ext, iterations=1 + ).astype(array_orig.dtype) + + return array_out + + +def _check_satellitetrail( + flag: np.ndarray, + image: np.ndarray, +) -> np.ndarray: + from skimage import exposure, transform + from skimage.draw import line + from skimage.feature import canny + + # 调整图像 + p1, p2 = np.percentile(image, (0.1, 99.9)) + image = exposure.rescale_intensity(image, in_range=(p1, p2)) + # 边界识别,转化为二值图像 + immax = np.max(image) + # canny边界 + edge = canny( + image=image, sigma=2.0, low_threshold=immax * 0.1, high_threshold=immax * 0.5 + ) + + # 概率霍夫变换 + angle = np.radians(np.arange(2, 178, 0.5, dtype=float)) + # 直线端点 + result = transform.probabilistic_hough_line( + image=edge, threshold=210, line_length=400, line_gap=75, theta=angle + ) + result = np.asarray(result) + # 绘制mask + _flag = np.zeros(image.shape) + # 遍历每一条线段 + for (x1, y1), (x2, y2) in result: + xx, yy = line(x1, y1, x2, y2) + _flag[yy, xx] = 1 + flagconf = FlagConfig() + _flag = _flag.astype(flagconf.dtype) + value = flagconf.get_value("satellite_trail") + flag = np.bitwise_or(flag, _flag * value) + return flag + + +def _create_weight( + img: np.ndarray, + flg: np.ndarray, + rdnoise: float, + exptime: float, +) -> np.ndarray: + data = img.copy() + data[img < 0] = 0 + var_raw = data * exptime + rdnoise**2 + var_bias = 0.0 + weight = 1.0 / (var_raw + var_bias) * exptime**2 + weight[flg > 0] = 0 + wht = weight + return wht + + +def _fix_shutter( + img: np.ndarray, shutter: np.ndarray, exptime: float = 150.0 +) -> np.ndarray: + img_cor = np.float32(img * exptime * 1000.0 / (exptime * 1000.0 + shutter)) + return img_cor + + +def _multiply_gain_map( + image: np.ndarray, + header: fits.header, +) -> np.ndarray: + gain_map = np.zeros_like(image) + h = header["NAXIS1"] // 8 + w = header["NAXIS2"] // 2 + for y, x in product(range(8), (0, 1)): + y = y if x == 0 else 7 - y + gain = header["GAIN" + str(x * 8 + y + 1).zfill(2)] + gain_map[slice(w * x, w * (x + 1)), slice(h * y, h * (y + 1))] = gain + # gain_map[w * x : w * (x + 1), h * y : h * (y + 1)] = gain + return image * gain_map + + +# if __name__ == "__main__": +# result: CsstResult = core_msc_l1_mbi_instrument( +# image_path=r"D:\Zhou\Desktop\data\data_09.fits", +# bias_path=r"D:\Zhou\Desktop\data\bias_09.fits", +# dark_path=r"D:\Zhou\Desktop\data\dark_09.fits", +# flat_path=r"D:\Zhou\Desktop\data\flat_09.fits", +# shutter_path=r"D:\Zhou\Desktop\data\csst_msc_ms_shutter_09_000001.fits", +# image_output_path=r"D:\Zhou\Desktop\data\result\image_test_output.fits", +# weight_output_path=r"D:\Zhou\Desktop\data\result\weight_test_output.fits", +# flag_output_path=r"D:\Zhou\Desktop\data\result\flag_test_output.fits", +# ) +# print(result.status == 0) diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/format.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/format.py new file mode 100644 index 0000000000000000000000000000000000000000..88b518bb3ab219bf553fae8a1a2db2596feece8a --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/format.py @@ -0,0 +1,85 @@ +""" +Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/format.py +Name: format.py +Description: Data format conversion. +Author: Li Shao (shaoli@nao.cas.cn) +Created: 2023-11-16 +Modified-History: + 2023-11-16, Li Shao, created + 2023-11-22, Li Shao, split from common_detector + 2023-11-23, Li Shao, change the way to get CCDArrayConfig + 2023-12-21, Li Shao, add function convert_format_one_cube + 2024-02-06, Li Shao, move to csst_msc_instrument + 2024-03-05, Li Shao, merge all three functions into one +""" + +import numpy as np + +from .config import get_array_config + +__all__ = ["convert_format", ] + + +def convert_format(image: np.ndarray, + output_format: str, + array_config_name: str = 'CCDArrayConfig', + ) -> np.ndarray: + """ + Convert image format. + + The function will automatically detect input format and convert to the output format. + The output format includes: "raw", "cube", "sky". + + Parameters + ---------- + image : numpy.ndarray + Input 2-D image. + output_format : str + The output format name: "raw", "cube" or "sky". + array_config_name : str, optional + The name of the pixel array configuration class. Default is "CCDArrayConfig". + + Returns + ------- + numpy.ndarray + Converted image. If convert from "sky" to other format, non-active region will be filled with 0. + """ + + config = get_array_config(array_config_name) + conf_raw = config('raw') + conf_cube = config('cube') + conf_sky = config('sky') + shape_input = image.shape + if np.array_equal(shape_input, conf_raw.get_shape(full=True)): + iconf = conf_raw + elif np.array_equal(shape_input, conf_cube.get_shape(full=True)): + iconf = conf_cube + elif np.array_equal(shape_input, conf_sky.get_shape(full=True)): + iconf = conf_sky + else: + raise ValueError('Input data shape is inconsistent with array configuration.') + + if output_format == 'raw': + oconf = conf_raw + elif output_format == 'cube': + oconf = conf_cube + elif output_format == 'sky': + oconf = conf_sky + else: + raise ValueError('Invalid output type: "{}". Must be "raw", "cube" or "sky".'.format(output_format)) + + if iconf == oconf: + print('Warning: input and output is the same. No format conversion is performed.') + return image + + if iconf.image_format == 'sky' or output_format == 'sky': + rtype = 'active' + else: + rtype = 'channel' + oshape = oconf.get_shape(full=True) + output = np.zeros(oshape, dtype=image.dtype) + for i in range(iconf.nchan): + cutout = iconf.get_cutout(i+1, image, rtype=rtype) + oconf.set_cutout(i+1, output, cutout, rtype=rtype) + + return output diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/__init__.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3dd853fee9d1f387bc6635046548c17cbbcce38d --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/__init__.py @@ -0,0 +1,17 @@ +""" +Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/__init__.py +Name: __init__.py +Description: Instrument effect module (basic operations). +Author: Li Shao (shaoli@nao.cas.cn) +Created: 2023-11-27 +Modified-History: + 2023-11-27, Li Shao, created + 2024-02-06, Li Shao, move to csst_msc_instrument +""" + +from .basic import subtract_bias, subtract_dark +from .crosstalk import remove_crosstalk +from .gain import apply_gain +from .overscan import correct_overscan + +__all__ = ['correct_overscan', 'apply_gain', 'subtract_bias', 'remove_crosstalk', 'subtract_dark', ] \ No newline at end of file diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/basic.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/basic.py new file mode 100644 index 0000000000000000000000000000000000000000..f5cedcfa71b60331e282454f4cbc317867feffcd --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/basic.py @@ -0,0 +1,88 @@ +""" +Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/basic.py +Name: basic.py +Description: Basic instrument effect correction. +Author: Li Shao (shaoli@nao.cas.cn) +Created: 2023-11-16 +Modified-History: + 2023-11-16, Li Shao, created + 2023-11-22, Li Shao, split from common_detector + 2024-02-06, Li Shao, move to csst_msc_instrument +""" + +import numpy as np + +__all__ = ["subtract_bias", "subtract_dark", ] + + +def subtract_bias(image: np.ndarray, + image_err: np.ndarray, + bias: np.ndarray, + bias_err: np.ndarray, + ) -> tuple[np.ndarray, np.ndarray]: + """ + Subtract bias. + + Subtract bias. + + Parameters + ---------- + image : numpy.ndarray + The input image to be corrected. + image_err : numpy.ndarray + The uncertainty of input image. + bias : numpy.ndarray + The input bias to be subtracted. + bias_err : numpy.ndarray + The uncertainty of input bias. + + Returns + ------- + output : np.ndarray + Output corrected image. + output_err : np.ndarray + Output uncertainty map. + """ + + output = image - bias + output_err = np.sqrt(image_err ** 2 + bias_err ** 2) + + return output, output_err + + +def subtract_dark(image: np.ndarray, + image_err: np.ndarray, + dark: np.ndarray, + dark_err: np.ndarray, + tdark_image: float | int = 1.0, + ) -> tuple[np.ndarray, np.ndarray]: + """ + Subtract dark current. + + Subtract dark current. + + Parameters + ---------- + image : numpy.ndarray + The input image to be corrected. + image_err : numpy.ndarray + The uncertainty of input image. + dark : numpy.ndarray + The input dark current image to be subtracted. + dark_err : numpy.ndarray + The uncertainty of input dark current. + tdark_image : float or int, optional + The effective dark current cumulation time of input image. Default value is 1.0. + + Returns + ------- + output : np.ndarray + Output corrected image. + output_err : np.ndarray + Output uncertainty map. + """ + + output = image - dark * tdark_image + output_err = np.sqrt(image_err ** 2 + (dark_err * tdark_image) ** 2) + + return output, output_err diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/crosstalk.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/crosstalk.py new file mode 100644 index 0000000000000000000000000000000000000000..183735361bcd2f8792d5b459c2cad240fb7026df --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/crosstalk.py @@ -0,0 +1,55 @@ +""" +Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/crosstalk.py +Name: crosstalk.py +Description: Crosstalk correction. +Author: Tianmeng Zhang (zhangtm@nao.cas.cn) +Created: 2023-11-16 +Modified-History: + 2023-11-16, Tianmeng Zhang, created + 2023-11-22, Li Shao, split from common_detector + 2024-02-06, Li Shao, move to csst_msc_instrument +""" + +import numpy as np + +__all__ = ["remove_crosstalk", ] + + +def remove_crosstalk(data: np.ndarray, + crosstalk_coe: np.ndarray, + hdu_num: int = 16, + switch: bool = False, + ) -> np.ndarray: + """ + Function to remove the crosstalk between different channel. + + Function to remove the crosstalk between different channel. + + Parameters + ---------- + data : numpy.ndarray + The input data array which is needed to remove the crosstalk from each hdu, 3D-array. + crosstalk_coe : numpy.ndarray + The coefficients of crosstalk, in size [hdu_num, hdu_num]. + hdu_num : int, optional + The number of extension for each detector. + switch : bool, optional + If True, do the crosstalk correction. If False, return original data. Default is False. + + Returns + ------- + numpy.ndarray + The data array after crosstalk correction, 3D-array. + """ + + data_cor = np.copy(data) # backup original data array + + if switch: + for i in range(hdu_num): + for j in range(hdu_num): + data_cor[i, :, :] = data[i, :, :] + data[j, :, :] * crosstalk_coe[i, j] + else: + data_cor = data + print('No crosstalk correction, return the original array') + + return data_cor diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/gain.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/gain.py new file mode 100644 index 0000000000000000000000000000000000000000..1b00b65e142401957159c9bbbeab5fb239e3dfbe --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/gain.py @@ -0,0 +1,75 @@ +""" +Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/gain.py +Name: gain.py +Description: Gain map related function. +Author: Li Shao (shaoli@nao.cas.cn) +Created: 2023-12-22 +Modified-History: + 2023-12-22, Li Shao, created + 2024-02-06, Li Shao, move to csst_msc_instrument +""" + +import numpy as np + +from ..config import get_array_config + +__all__ = ['apply_gain', 'make_gainmap_from_channel_value', ] + + +def apply_gain(image: np.ndarray, + gainmap: np.ndarray, + ) -> np.ndarray: + """ + Apply gain map. + + The gain map is in unit of electrons/ADU. The output image will be in unit of electrons or electrons/s. + + Parameters + ---------- + image : numpy.ndarray + Input image, in unit of ADU or ADU/s. + gainmap : numpy.ndarray + Gain map, in unit of electrons/ADU. + + Returns + ------- + numpy.ndarray + Output image, in unit of electrons or electrons/s. + """ + + return image * gainmap + + +def make_gainmap_from_channel_value(gain_values: list[float] | tuple[float] | np.ndarray, + array_config_name: str = 'CCDArrayConfig', + image_format: str = 'raw', + ) -> np.ndarray: + """ + Make gain map from channel gain values. + + Parameters + ---------- + gain_values : list[float] or tuple[float] or numpy.ndarray + Gain value of each channel. + array_config_name : str, optional + The name of the pixel array configuration class. Default is "CCDArrayConfig". + image_format : str, optional + The output image format: "raw", "cube" or "sky". See config.CCDArrayConfig(). + + Returns + ------- + numpy.ndarray + Output gain map. + """ + + aconf = get_array_config(array_config_name)(image_format) + if len(gain_values) != aconf.nchan: + raise ValueError('The length of input gain values does not match with number of channels.') + + full_shape = aconf.get_shape(full=True) + output = np.zeros(full_shape, dtype=np.float32) + for i in range(aconf.nchan): + cutout = aconf.get_cutout(i+1, output, rtype='channel') + gain_values[i] + aconf.set_cutout(i+1, output, cutout, rtype='channel') + + return output \ No newline at end of file diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/overscan.py b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/overscan.py new file mode 100644 index 0000000000000000000000000000000000000000..5ffa4500a3fe5c2b20248b20319cbb42807d206e --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/image/overscan.py @@ -0,0 +1,278 @@ +""" +Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/overscan.py +Name: overscan.py +Description: Overscan correction and data format conversion. +Author: Li Shao (shaoli@nao.cas.cn) +Created: 2023-11-16 +Modified-History: + 2023-11-16, Li Shao, created + 2023-11-22, Li Shao, split from common_detector + 2024-02-06, Li Shao, move to csst_msc_instrument +""" + +from typing import Callable +import numpy as np +from scipy.ndimage import median_filter +from scipy.signal import savgol_filter +from scipy.special import erf +from scipy.stats import trim_mean, t, sigmaclip +from astropy.stats import mad_std, sigma_clip + +from ..config import get_array_config + +__all__ = ["average_overscan", "smooth_overscan", "correct_overscan", ] + + +def average_overscan(image: np.ndarray, + axis: int | None = None, + cen_method: str | Callable = 'trim_mean', + std_method: str | Callable = 'mad_std', + clip_flag: bool = False, + clip_sigma: float = 3.5, + clip_cenfunc: str | Callable = 'median', + clip_stdfunc: str | Callable = 'std', + ) -> tuple[float | np.ndarray, float | np.ndarray]: + """ + Average overscan data along one direction or overall. + + Average overscan data along one direction or overall. + + Parameters + ---------- + image : numpy.ndarray + Input image: overscan region cutout. + axis : int, optional + Axis along which the operation is performed. If None (default), use the full array. + cen_method : str or Callable, optional + Algorithm to calculate average value: "trim_mean" (default), "median", "mean" or Callable function. + std_method : str or Callable, optional + Algorithm to calculate standard deviation: "mad_std" (default), "std" or Callable function. + clip_flag : bool, optional + If True, use sigma-clipping. + clip_sigma : float, optional + Clipping threshold for sigma-clipping. Default is 3.5. + clip_cenfunc : str or Callable, optional + Center value function for sigma-clipping: "median" (default), "mean" or Callable function. + clip_stdfunc : str or Callable, optional + Standard deviation function for sigma-clipping: "std" (default), "mad_std" or Callable function. + + Returns + ------- + avg : numpy.ndarray + Averaged overscan data (one value or 1D array). + err : numpy.ndarray + The uncertainty. + """ + + # sigma-clipping + if clip_flag and cen_method != 'trim_mean': + data = sigma_clip(image, sigma=clip_sigma, axis=axis, masked=True, + cenfunc=clip_cenfunc, stdfunc=clip_stdfunc) + + if cen_method == 'median': + avg = np.ma.median(data) + elif cen_method == 'mean': + avg = np.ma.mean(data) + else: + avg = cen_method(data) + + if std_method == 'mad_std': + err = mad_std(data, axis=axis) + elif std_method == 'std': + err = np.ma.std(data, axis=axis) + else: + err = std_method(data, axis=axis) + + # without clipping + else: + if cen_method == 'trim_mean': + avg = trim_mean(image, 0.1, axis=axis) + elif cen_method == 'median': + avg = np.median(image, axis=axis).astype(float) + elif cen_method == 'mean': + avg = np.mean(image, axis=axis) + else: + avg = cen_method(image, axis=axis) + + if std_method == 'mad_std': + err = mad_std(image, axis=axis) + elif std_method == 'std': + err = np.std(image, axis=axis) + else: + err = std_method(image, axis=axis) + + # uncertainty of the average + ny, nx = image.shape + if axis is None: + n = ny * nx + elif axis == 0: + n = ny + else: + n = nx + if cen_method == 'trim_mean': + n = n * 0.8 + err = err / np.sqrt(n) * t.ppf(erf(1), df=n - 1) # uncertainty of the mean + if cen_method == 'median' or cen_method is np.median or cen_method is np.ma.median: + err *= 1.2533 + + return avg, err + + +def smooth_overscan(data: np.ndarray, + filter_width: int, + filter_method: str = 'savgol', + filter_polyorder: int = 2, + ) -> np.ndarray: + """ + Smooth 1D averaged overscan data. + + Smooth 1D averaged overscan data. + + Parameters + ---------- + data : numpy.ndarray + Input 1-D overscan data. + filter_width : int + The width of filter. Must be positive. + filter_method : str, optional + Process algorithm: "savgol" (default) or "median". + filter_polyorder : int, optional + The order of polynomial used for Savitzky-Golay filter. + + Returns + ------- + numpy.ndarray + Smoothed 1-D overscan data. + """ + + if filter_method == 'savgol': + dd = savgol_filter(data, filter_width * 2, filter_polyorder) + diff = data - dd + index = np.abs(diff) < sigmaclip(diff, 5, 5)[0].std() * 5 # clip outliers + x = np.arange(len(data)) + dd = np.interp(x, x[index], data[index]) + output = savgol_filter(dd, filter_width, filter_polyorder) + else: + output = median_filter(data, size=filter_width) + + return output + + +def correct_overscan(image: np.ndarray, + array_config_name: str = 'CCDArrayConfig', + region_type: str = 'x', + correct_type: str = 'line', + gap: int = 10, + cen_method: str | Callable = 'trim_mean', + std_method: str | Callable = 'mad_std', + clip_flag: bool = False, + clip_sigma: float = 3.5, + clip_cenfunc: str | Callable = 'median', + clip_stdfunc: str | Callable = 'std', + filter_width: int = 0, + filter_method: str = 'savgol', + filter_polyorder: int = 2, + clean_inactive: bool = True, + ) -> tuple[np.ndarray, np.ndarray]: + """ + Overscan correction. + + Overscan correction. All parameters are optional inputs. + + Parameters + ---------- + image : numpy.ndarray + Input 2-D image. + array_config_name : str, optional + The name of the pixel array configuration class. Default is "CCDArrayConfig". + region_type : str, optional + The type of overscan region. "x" (default) is serial overscan, "y" is parallel overscan. + correct_type : str, optional + Correction type. "line" (default) is correct by row or column. "all" is correct with all pixels. + gap : int, optional + Number of pixels after active pixels not to use for overscan correction. + cen_method : str or Callable, optional + Algorithm to calculate average value: "trim_mean" (default), "median", "mean" or Callable function. + std_method : str or Callable, optional + Algorithm to calculate standard deviation: "mad_std" (default), "std" or Callable function. + clip_flag : bool, optional + If True, use sigma-clipping. + clip_sigma : float, optional + Clipping threshold for sigma-clipping. Default is 3.5. + clip_cenfunc : str or Callable, optional + Center value function for sigma-clipping: "median" (default), "mean" or Callable function. + clip_stdfunc : str or Callable, optional + Standard deviation function for sigma-clipping: "std" (default), "mad_std" or Callable function. + filter_width : int, optional + The width of filter. If smaller than 2, then smoothing will be skipped. + filter_method : str, optional + Filtering algorithm: "savgol" (default) or "median". + filter_polyorder : int, optional + The order of polynomial used for Savitzky-Golay filter. + clean_inactive : bool, optional + If True, all pixels out of active region are set to zero. Default is True. + + Returns + ------- + output : numpy.ndarray + Overscan corrected image. + output_err : numpy.ndarray + Uncertainty image. + + Examples + -------- + image_corr, err_corr = correct_overscan(image, region_type='x', correct_type='line') + image_corr, err_corr = correct_overscan(image, region_type='y', correct_type='all', clean_inactive=True) + """ + + aconf = get_array_config(array_config_name)('raw') + if correct_type == 'line': + if region_type == 'x': + axis = 1 + else: + axis = 0 + else: + axis = None + + if clean_inactive: + output = np.zeros(image.shape) + else: + output = image.astype(float) + output_err = np.zeros(image.shape) + + for i in range(aconf.nchan): + overscan = aconf.get_cutout(i+1, image, rtype='{}_overscan'.format(region_type)) + + # remove the first pixels (to avoid potential CTI contamination) + if gap > 0: + if region_type == 'x': + overscan = overscan[:, gap:] + else: + overscan = overscan[gap:, :] + + # calculate average and uncertainty + avg_value, err_value = average_overscan(overscan, axis=axis, + cen_method=cen_method, std_method=std_method, + clip_flag=clip_flag, clip_sigma=clip_sigma, + clip_cenfunc=clip_cenfunc, clip_stdfunc=clip_stdfunc) + + # smooth if required + if filter_width > 1 and axis is not None: + avg_value = smooth_overscan(avg_value, filter_width, + filter_method=filter_method, filter_polyorder=filter_polyorder) + err_value = err_value / np.sqrt(filter_width) * 1.2533 + + # correct the active region + x1, x2, y1, y2 = aconf.get_boundary(i+1, rtype='active') + nx, ny = x2 - x1, y2 - y1 + if axis is None: + output[y1:y2, x1:x2] = image[y1:y2, x1:x2] - avg_value + output_err[y1:y2, x1:x2] = err_value + elif axis == 0: + output[y1:y2, x1:x2] = image[y1:y2, x1:x2] - np.tile(avg_value, ny).reshape((ny, nx)) + output_err[y1:y2, x1:x2] = np.tile(err_value, ny).reshape((ny, nx)) + else: + output[y1:y2, x1:x2] = image[y1:y2, x1:x2] - np.repeat(avg_value, nx).reshape((ny, nx)) + output_err[y1:y2, x1:x2] = np.repeat(err_value, nx).reshape((ny, nx)) + + return output, output_err diff --git a/measurement_pipeline/L1_pipeline/csst_msc_instrument/instrument_cor_demo.head b/measurement_pipeline/L1_pipeline/csst_msc_instrument/instrument_cor_demo.head new file mode 100644 index 0000000000000000000000000000000000000000..f90fd3c1db5f2d51de8c316104c044bef84d6abf --- /dev/null +++ b/measurement_pipeline/L1_pipeline/csst_msc_instrument/instrument_cor_demo.head @@ -0,0 +1 @@ +V_INST = '0.0.2 ' / version of instrument correction T_INST = '2023-12-29T04:50:46.710' / timestamp of instrument correction S_INST = 0 / status of instrument correction S_OVSCAN= 0 / status of overscan correction S_GAIN = 0 / status of gain correction R_GAIN = 'file ' / reference gain S_BIAS = 0 / status of bias frame correction R_BIAS = 'file ' / reference bias S_CROSST= 0 / status of crosstalk correction R_CROSST= 'file ' / reference crosstalk S_CTI = 1 / status of CTI correction R_CTI = 'file ' / reference CTI S_DARK = 0 / status of dark frame correction R_DARK = 'file ' / reference dark S_NLIN = 1 / status of non-linear correction R_NLIN = 'file ' / reference non-linear S_SHUT = 1 / status of shutter effect correction R_SHUT = 'file ' / reference shutter effect S_FLAT = 0 / status of flat frame correction R_FLAT = 'file ' / reference flat S_CRS = 1 / status of cosmic rays mask R_CRS = 'deepCR_model' / method and config of cosmic rays mask CRCOUNT = 240466 / cosmic rays pixel counts S_SAT = 1 / status of satellite correction R_SAT = 'file ' / reference satellite correction S_FRINGE= 1 / status of fringe correction R_FRINGE= 'file ' / reference fringe SKY_BKG = 0.0323 / estimated sky background (e-/s per pixel) SKY_RMS = 0.0374 / standard dev of frame background (e-/s) SATURATE= 365.9759 / flux limit of saturated pixel (e-/s) END diff --git a/measurement_pipeline/L1_pipeline/csst_msc_mbi_instrument.py b/measurement_pipeline/L1_pipeline/csst_msc_mbi_instrument.py deleted file mode 100644 index 726d2f8615927faae36a232f567b713ae441107d..0000000000000000000000000000000000000000 --- a/measurement_pipeline/L1_pipeline/csst_msc_mbi_instrument.py +++ /dev/null @@ -1,518 +0,0 @@ -from datetime import datetime -from typing import Optional - -import numpy as np -from itertools import product -from astropy.io import fits -from astropy.stats import sigma_clipped_stats - -SATURATE = 50000 - -def core_msc_l1_mbi_instrument( - image_path: str = "/path/to/image", - bias_path: str = "/path/to/bias", - dark_path: str = "/path/to/dark", - flat_path: str = "/path/to/flat", - shutter_path: str = "/path/to/shutter", - image_output_path: str = "/path/to/image_output", - weight_output_path: str = "/path/to/weight_output", - flag_output_path: str = "/path/to/flag_output", - deepcr_model_path: Optional[str] = None, - config_ccd_info: Optional[str] = None, - config_bad_pixel: Optional[str] = None, - device: Optional[str] = "CPU", -): - """ - Make the instrument correction for one chip of CSST data. - - This function corrects the instrument effect, such as bias, dark, flat, shutter, cosmicrays etc. - - Parameters - ---------- - image_path : str - The sci file path. - bias_path : str - The bias file path. - dark_path : str - The dark file path. - flat_path : str - The flat file path. - shutter_path : str - The shutter file path. - image_output_path : str - The image output file path. - weight_output_path : str - The weight output file path. - flag_output_path : str - The flag output file path. - deepcr_model_path : str, optional - The deepcr model file path. - config_ccd_info : str, optional - The config for ccd file path. - config_bad_pixel : str, optional - The config for bad pixel file path. - device : str - The deepcr device input 'CPU' or 'GPU', by default 'CPU'. - - Returns - ------- - CsstResult - Result containing `status` and `files`. - """ - status_header = _set_status_header() - - # CCD image processing - with fits.open(image_path) as raw: - hdu = fits.HDUList([ - fits.PrimaryHDU( - header=raw[0].header.copy()), - fits.ImageHDU( - header=raw[1].header.copy(), - data=raw[1].data.copy()) - ]) - # 创建wht[1].data hdulist - wht = fits.HDUList([ - fits.PrimaryHDU( - header=raw[0].header.copy()), - fits.ImageHDU( - header=raw[1].header.copy(), - data=np.zeros_like(hdu[1].data, dtype=np.float32)) - ]) - # 创建flg[1].data hdulist - flg = fits.HDUList([ - fits.PrimaryHDU( - header=raw[0].header.copy()), - fits.ImageHDU( - header=raw[1].header.copy(), - data=np.zeros_like(hdu[1].data, dtype=np.uint32))]) - - img_err = np.zeros_like(hdu[1].data) - # shutter - shutter = fits.getdata(shutter_path, 1) - bias, bias_error = _read_ccds(bias_path) - dark, dark_error = _read_ccds(dark_path) - flat, flat_error = _read_ccds(flat_path) - - hdu[1].data, img_err = subtract_bias( - hdu[1].data, - img_err, - bias, - bias_error, - ) - status_header.set('STA_BIAS', 0) - - hdu[1].data, img_err = subtract_dark( - hdu[1].data, - img_err, - dark, - dark_error, - hdu[0].header["EXPTIME"] - ) - status_header.set('STA_DARK', 0) - - hdu[1].data, img_err = _divide_flat( - hdu[1].data, - img_err, - flat, - flat_error, - ) - status_header.set('STA_FLAT', 0) - - # 非线性改正 - hdu[1].data = _fix_nonlinear(hdu[1].data) - status_header.set('STA_NLIN', 0) - # 坏像素检测 - flg[1].data = _check_badpix( - flg[1].data, - flat - ) - - # 热像素和暖像素检测 - flg[1].data = _check_hot_and_warm_pix( - flg[1].data, - dark, - hdu[0].header["EXPTIME"], - hdu[1].header["RON01"] - ) - # 过饱和检测 - saturate = SATURATE - flg[1].data = _check_over_saturation( - flg[1].data, - hdu[1].data, - saturate - ) - saturate = (saturate - * hdu[1].header["GAIN01"] - / hdu[0].header["EXPTIME"]) - status_header.set('SATURATE', saturate) - - # 时间归一 - hdu[1].data = hdu[1].data/hdu[0].header["EXPTIME"] - # 单位转换 - hdu[1].data = _multiply_gain_map(hdu[1].data, hdu[1].header) - status_header.set('BUNIT', 'e/s') - - # # 检测宇宙线 - # hdu[1].data, flg[1].data, cr_count = remove_cr_deepcr( - # hdu[1].data, - # flg[1].data, - # device, - # hdu[1].header['CHIPID'], - # ) - # status_header.set('STA_CRS', 0) - # status_header.set('CRCOUNT', cr_count) - - # 记录BKG与RMS - _, bkg, rms = sigma_clipped_stats( - data=hdu[1].data, - mask=(flg[1].data == 16) - ) - status_header.set('SKY_BKG', bkg) - status_header.set('SKY_RMS', rms) - # 检测卫星拖尾 - flg[1].data = _check_satellitetrail( - flg[1].data, - hdu[1].data - ) - status_header.set('STA_SAT', 0) - - # 生成wht[1].data - wht[1].data = _create_weight( - hdu[1].data, - flg[1].data, - hdu[1].header["RON01"], - hdu[0].header["EXPTIME"] - ) - # 数据类型 - hdu[1].data = hdu[1].data.astype(np.float32) - - # 快门改正 - hdu[1].data = _fix_shutter( - hdu[1].data, - shutter, - hdu[0].header["EXPTIME"] - ) - status_header.set('STA_SHUT', 0) - - if all([status_header["STA_BIAS"] == 0, - status_header["STA_DARK"] == 0, - status_header["STA_FLAT"] == 0, - status_header["STA_CRS"] == 0, - status_header["STA_NLIN"] == 0, - # status_header["STA_CTE"] == 0, # 这个功能还未添加 - status_header["STA_SAT"] == 0, - status_header["STA_SHUT"] == 0, - status_header["SKY_BKG"] != -9999, - status_header["SKY_RMS"] != -9999, - status_header["SATURATE"] != -9999, - status_header["CRCOUNT"] != -9999]): - status_header.set('STA_INST', 0) - - # hdu[1].header.extend(status_header, bottom=True) - # 修正wht和flg数据类型 - wht[1].header = hdu[1].header.copy() - wht[1].data = wht[1].data.astype(np.float32) - wht[1].header.remove('BUNIT') - - flg[1].header = hdu[1].header.copy() - flg[1].data = flg[1].data.astype(np.int32) - flg[1].header.remove('BUNIT') - - hdu.writeto(image_output_path, overwrite=True) - wht.writeto(weight_output_path, overwrite=True) - flg.writeto(flag_output_path, overwrite=True) - - -def _set_status_header(): - status_header = fits.Header() # template Header - status_header.append( - ('COMMENT', '=' * 66), bottom=True) - status_header.append( - ('COMMENT', 'instrumental correction information'), bottom=True) - status_header.append( - ('COMMENT', '=' * 66), bottom=True) - # status_header.append( - # ('VER_INST', csst_common.__version__, 'version of instrument'), bottom=True) - # status_header.append( - # ('STM_INST', csst_common.now(), 'time stamp of instrument processing'), bottom=True) - status_header.append( - ('STA_INST', 1, '0=done 1=wrong'), bottom=True) - status_header.append( - ('STA_BIAS', 1, 'status flag for bias frame correction'), bottom=True) - status_header.append( - ('STA_DARK', 1, 'status flag for dark frame correction'), bottom=True) - status_header.append( - ('STA_FLAT', 1, 'status flag for flat frame correction'), bottom=True) - status_header.append( - ('SKY_BKG', -9999, 'estimated sky background (e-/s per pixel)'), bottom=True) - status_header.append( - ('SKY_RMS', -9999, 'standard dev of frame background (ADU)-> e-/s'), bottom=True) - status_header.append( - ('SATURATE', -9999, 'the flux limit of saturated pixel (e-/s)'), bottom=True) - status_header.append( - ('STA_CTE', 1, 'status flag for CTE correction'), bottom=True) - status_header.append( - ('STA_SAT', 1, 'status flag for satellite correction'), bottom=True) - status_header.append( - ('STA_CRS', 1, 'status flag for cosmic rays mask'), bottom=True) - status_header.append( - ('CRCOUNT', -9999, 'cosmic rays pixel counts'), bottom=True) - status_header.append( - ('STA_NLIN', 1, 'status flag for non-linear correction'), bottom=True) - status_header.append( - ('STA_SHUT', 1, 'status flag for shutter effect correction'), bottom=True) - return status_header - -def _read_ccds( - ref_path: str, -) -> tuple[np.array, np.array]: - ref = fits.getdata(ref_path) - return ref, np.zeros_like(ref) - -def _divide_flat( - image: np.ndarray, - image_error: np.ndarray, - flat: np.ndarray, - flat_error: np.ndarray, -) -> tuple[np.ndarray, np.ndarray]: - def divide(a, b): - return np.divide(a, b, out=np.zeros_like(a, float), where=(b != 0)) - result = divide(image, flat) - result_error = np.abs(result) * ((divide(image_error, image)) ** 2 + - (divide(flat_error, flat)) ** 2)**0.5 - return result, result_error - -def _fix_nonlinear( - image: np.ndarray, - beta1: float = 5e-7 -) -> np.ndarray: - # 计算非线性系数 - beta = 5e-7 - f1 = [1] - fnlin0 = 1+beta - fnlin = [fnlin0] - for i in range(1000, 65000, 1000): - f1.append(i) - fnlin.append(1+beta*i) - # 插值函数 - from scipy.interpolate import PchipInterpolator as PCHIP - # PCHIP stands for Piecewise Cubic Hermite Interpolating Polynomial - interp_func = PCHIP(f1, fnlin) - # 非线性系数矩阵 - imgnlin = interp_func(image) - # 图像改正 - image = imgnlin*image - return image - -def _check_badpix( - flag: np.ndarray, - flat: np.ndarray, -) -> np.ndarray: - med = np.median(flat) - _flag = (flat < 0.5 * med) | (1.5 * med < flat) - flag = flag | (_flag * 1) - return flag - -def _check_hot_and_warm_pix( - flag: np.ndarray, - dark: np.ndarray, - exptime: float, - rdnoise: float, -) -> np.ndarray: - _dark = dark * exptime - _dark[_dark < 0] = 0 - _flag = 1 * rdnoise ** 2 <= _dark # 不确定是否包含 暂定包含 - flag = flag | (_flag * 2) - _flag = (0.5 * rdnoise ** 2 < _dark) & (_dark < 1 * rdnoise ** 2) - flag = flag | (_flag * 4) - return flag - -def _check_over_saturation( - flag: np.ndarray, - image: np.ndarray, - saturate: int = 65535, - iterations: int = 0, - flag_value: int = 8 -) -> np.ndarray: - _flag = image >= saturate - flag_dilated = _dilation(_flag, iterations=iterations) - flag = flag | (flag_dilated * flag_value) - return flag - -def _dilation(array_orig: np.ndarray, iterations: int = 1): - """ - Make a dilation for array - - This function makes a dilation for the 2D mask array (saturated pixels) - - Parameters - ---------- - array_orig : np.ndarray - The mask array without dilation. - structure : int - The number of dilations performed on the structure with itself. - - Returns - ------- - array_out : np.ndarray - The mask array after dilation. - """ - from scipy import ndimage - struct1 = ndimage.generate_binary_structure( - 2, 1) # give a saturate structure - struct_ext = ndimage.iterate_structure( - struct1, iterations) # make a dilation - if iterations == 0: - array_out = array_orig - else: - array_out = ndimage.binary_dilation( - array_orig, structure=struct_ext, iterations=1).astype(array_orig.dtype) - - return array_out - -def _check_satellitetrail( - flag: np.ndarray, - image: np.ndarray, -) -> np.ndarray: - from skimage import exposure, transform - from skimage.draw import line - from skimage.feature import canny - # 调整图像 - p1, p2 = np.percentile(image, (0.1, 99.9)) - image = exposure.rescale_intensity(image, in_range=(p1, p2)) - # 边界识别,转化为二值图像 - immax = np.max(image) - # canny边界 - edge = canny(image=image, - sigma=2.0, - low_threshold=immax * 0.1, - high_threshold=immax * 0.5) - - # 概率霍夫变换 - angle = np.radians(np.arange(2, 178, 0.5, dtype=float)) - # 直线端点 - result = transform.probabilistic_hough_line(image=edge, - threshold=210, - line_length=400, - line_gap=75, - theta=angle) - result = np.asarray(result) - # 绘制mask - mask = np.zeros(image.shape) - # 遍历每一条线段 - for (x1, y1), (x2, y2) in result: - xx, yy = line(x1, y1, x2, y2) - mask[yy, xx] = 1 - mask = mask.astype(np.int32) - flag = flag | (mask * 32) - return flag - -def _create_weight( - img: np.ndarray, - flg: np.ndarray, - rdnoise: float, - exptime: float, -) -> np.ndarray: - data = img.copy() - data[img < 0] = 0 - var_raw = data * exptime + rdnoise ** 2 - var_bias = 0.0 - weight = 1. / (var_raw + var_bias) * exptime ** 2 - weight[flg > 0] = 0 - wht = weight - return wht - -def _fix_shutter( - img: np.ndarray, - shutter: np.ndarray, - exptime: float = 150. -) -> np.ndarray: - img_cor = np.float32(img * exptime*1000. / (exptime*1000. + shutter)) - return img_cor - -def _multiply_gain_map( - image: np.ndarray, - header: fits.header, -) -> np.ndarray: - gain_map = np.zeros_like(image) - h = header['NAXIS1'] // 8 - w = header['NAXIS2'] // 2 - for y, x in product(range(8), (0, 1)): - y = y if x == 0 else 7-y - gain = header['GAIN' + str(x*8+y+1).zfill(2)] - gain_map[w*x:w*(x+1), h*y:h*(y+1)] = gain - return image * gain_map - -def subtract_bias(image: np.ndarray, - image_err: np.ndarray, - bias: np.ndarray, - bias_err: np.ndarray, - ) -> tuple[np.ndarray, np.ndarray]: - """ - Subtract bias. - - Subtract bias. - - Parameters - ---------- - image : numpy.ndarray - The input image to be corrected. - image_err : numpy.ndarray - The uncertainty of input image. - bias : numpy.ndarray - The input bias to be subtracted. - bias_err : numpy.ndarray - The uncertainty of input bias. - - Returns - ------- - output : np.ndarray - Output corrected image. - output_err : np.ndarray - Output uncertainty map. - """ - - output = image - bias - output_err = np.sqrt(image_err ** 2 + bias_err ** 2) - - return output, output_err - - -def subtract_dark(image: np.ndarray, - image_err: np.ndarray, - dark: np.ndarray, - dark_err: np.ndarray, - tdark_image: float = 1.0, - ) -> tuple[np.ndarray, np.ndarray]: - """ - Subtract dark current. - - Subtract dark current. - - Parameters - ---------- - image : numpy.ndarray - The input image to be corrected. - image_err : numpy.ndarray - The uncertainty of input image. - dark : numpy.ndarray - The input dark current image to be subtracted. - dark_err : numpy.ndarray - The uncertainty of input dark current. - tdark_image : float, optional - The effective dark current cumulation time of input image. Default value is 1.0. - - Returns - ------- - output : np.ndarray - Output corrected image. - output_err : np.ndarray - Output uncertainty map. - """ - - output = image - dark * tdark_image - output_err = np.sqrt(image_err ** 2 + (dark_err * tdark_image) ** 2) - - return output, output_err - - diff --git a/measurement_pipeline/L1_pipeline/ref_combine.py b/measurement_pipeline/L1_pipeline/ref_combine.py new file mode 100644 index 0000000000000000000000000000000000000000..33421915c6905f2cc25879dc78ad98aa34f56028 --- /dev/null +++ b/measurement_pipeline/L1_pipeline/ref_combine.py @@ -0,0 +1,82 @@ +import numpy as np +from astropy.io import fits + + +def array_combine(ndarray, mode="mean") -> np.ndarray: + """ Function to combine 3-D data array + + Parameters + ---------- + ndarray: array, input data cube (3D) + mode: mean, median, sum, mean_clip, median_clip, default is mean + """ + if mode == "median": + array = np.median(ndarray, axis=0) + elif mode == "median_clip": + ndarray = np.sort(ndarray, axis=0)[1:-1] + array = np.median(ndarray, axis=0) + elif mode == "sum": + array = np.sum(ndarray, axis=0) + elif mode == "mean": + array = np.mean(ndarray, axis=0) + elif mode == "mean_clip": + ndarray = np.sort(ndarray, axis=0)[1:-1] + array = np.mean(ndarray, axis=0) + return array + + +def load_bias(path: str) -> np.ndarray: + with fits.open(path) as hdul: + du = hdul[1].data + du = du.astype(int) + return du + + +def load_dark(path: str, bias) -> np.ndarray: + with fits.open(path) as hdul: + du = hdul[1].data + hu = hdul[0].header + du = du.astype(int) + du = du - bias + du = du / hu["EXPTIME"] + return du + + +def load_flat(path: str, bias, dark) -> np.ndarray: + with fits.open(path) as hdul: + du = hdul[1].data + hu = hdul[0].header + du = du.astype(int) + du = du - bias - dark * hu["EXPTIME"] + du = du / hu["EXPTIME"] + du = du / np.median(du) + return du + + +def combine(func, mode: str, path_list, *args) -> np.ndarray: + du_list = [func(path, *args) for path in path_list] + du = array_combine(du_list, mode) + return du + + +def combine_images(b_p_lst, d_p_lst, f_p_lst, mode_list=["median", "median", "median", ]): + """ + + Parameters + ---------- + b_p_lst: + List of currently ccd number bias file path + d_p_lst: + List of currently ccd number dark file path + f_p_lst: + List of currently ccd number flat file path + mode_list: + [0] bias combine mode + [1] dark combine mode + [2] flat combine mode + mean, median, sum, mean_clip, median_clip + """ + bias = combine(load_bias, mode_list[0], b_p_lst) + dark = combine(load_dark, mode_list[1], d_p_lst, bias) + flat = combine(load_flat, mode_list[2], f_p_lst, bias, dark) + return bias.copy(), dark.copy(), flat.copy() diff --git a/measurement_pipeline/run_ref_combine.py b/measurement_pipeline/run_ref_combine.py new file mode 100644 index 0000000000000000000000000000000000000000..d06114a9cee72f7859e759e0568438192e2beccb --- /dev/null +++ b/measurement_pipeline/run_ref_combine.py @@ -0,0 +1,34 @@ +import os +from glob import glob +from astropy.io import fits +from L1_pipeline.ref_combine import combine_images + +ref_path = "/public/share/yangxuliu/CSSOSDataProductsSims/outputs_cali/" +output_path = "/public/home/fangyuedong/project/calib_data" + +def combine_ref_func(ref_path, output_path, num="01"): + bias_path_list = glob(ref_path + '*/CSST_MSC_MS_BIAS_*_' + num + '_*') + dark_path_list = glob(ref_path + '*/CSST_MSC_MS_DARK_*_' + num + '_*') + flat_path_list = glob(ref_path + '*/CSST_MSC_MS_FLAT_*_' + num + '_*') + bias, dark, flat = combine_images(b_p_lst=bias_path_list, + d_p_lst=dark_path_list, + f_p_lst=flat_path_list, ) + bias_out_path = os.path.join(output_path, "bias_" + str(num) + ".fits") + dark_out_path = os.path.join(output_path, "dark_" + str(num) + ".fits") + flat_out_path = os.path.join(output_path, "flat_" + str(num) + ".fits") + hdu = fits.PrimaryHDU(bias) + hdu.writeto(bias_out_path) + hdu = fits.PrimaryHDU(dark) + hdu.writeto(dark_out_path) + hdu = fits.PrimaryHDU(flat) + hdu.writeto(flat_out_path) + +if __name__ == "__main__": + ref_path = "/public/share/yangxuliu/CSSOSDataProductsSims/outputs_cali/" + output_path = "/public/home/fangyuedong/project/calib_data" + num = '08' + combine_ref_func( + ref_path=ref_path, + output_path=output_path, + num=num + ) \ No newline at end of file