Commit ff118a41 authored by Fang Yuedong's avatar Fang Yuedong
Browse files

add L1 msc instrument pipeline modules

parent de19281e
import os
from .csst_msc_mbi_instrument import core_msc_l1_mbi_instrument
__all__ = ["core_msc_l1_mbi_instrument"]
\ No newline at end of file
"""
Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/config.py
Name: config.py
Description: Configuration classes.
Author: Li Shao (shaoli@nao.cas.cn)
Created: 2023-11-16
Modified-History:
2023-11-16, Li Shao, created
2023-11-23, Li Shao, change the way to get CCDArrayConfig
2023-12-21, Li Shao, add CameraConfig
2024-02-06, Li Shao, move to csst_msc_instrument
2024-03-05, Li Shao, update flag and CCDArrayConfig
"""
import numpy as np
from astropy.table import Table
__all__ = ["FlagConfig", "CameraConfig", "CCDArrayConfig",
"TestCCD", "SimCCD", "GM1201APrototype", "get_array_config"]
class FlagConfig:
"""
Defines the universal bit flag value.
It can return the name and the value of each flag definition, giving a safe way to handle flags.
Attributes
----------
dtype : type
The data type of the flag.
data : tuple[list]
Data including the bit values, names and comments.
Methods
-------
get_value(name)
Get the flag value.
get_comment(name)
Get the comment string of the flag.
get_name(bit)
Get the name of the flag of the given bit.
make_mask(names)
Make a bitmask from a name list of masked bits.
Examples
--------
flagconf = CCDFlagConfig()
name = flagconf.get_name(0) # "no_data"
value = flagconf('no_data') # 1
value = flagconf.get_value('no_data') # 1
comment = flagconf.get_comment('dead_pixel') # "permanently damaged pixel"
mask = flagconf.make_mask('no_data', 'dead_pixel') # 3 = 0b11
"""
def __init__(self):
self._version = 'sim_202312'
self._comment = ''
self.dtype = np.uint32
self.data = ([0, 'no_data', 'no data'],
[1, 'dead_pixel', 'permanently damaged pixel'],
[2, 'no_gain', 'no gain value is available'],
[3, 'poor_gain', 'gain value is unreliable'],
[4, 'no_bias', 'no bias value is available'],
[5, 'poor_bias', 'bias value is unreliable'],
[6, 'noisy_readout', 'readout noise is exceptionally large'],
[7, 'no_dark', 'no dark current value is available'],
[8, 'poor_dark', 'dark current value is unreliable'],
[9, 'hot_pixel', 'pixel with exceptionally large dark current'],
[10, 'warm_pixel', 'pixel with moderately large dark current'],
[11, 'no_flat', 'no flat value is available'],
[12, 'poor_flat', 'flat value is unreliable'],
[13, 'dark_pixel', 'pixel with relatively low QE'],
[14, 'poor_nonlinearity', 'pixel with large nonlinearity'],
[15, 'saturated', 'saturated pixel'],
[16, 'saturated_crosstalk', 'pixel affected by crosstalk of saturated pixel'],
[17, 'sink_pixel', 'pixel with exceptionaly high trap density'],
[18, 'poor_cti_corr', 'pixel affected by poor CTI correction'],
[19, 'poor_shutter_shade', 'pixel affected by poor shutter shade correction'],
[20, 'cosmic_ray', 'identified cosmic ray'],
[21, 'poor_cosmic_ray', 'pixel affected by poor cosmic ray correction'],
[22, 'poor_fringe', 'pixel affected by poor fringe correction'],
[23, 'ghost', 'optics ghost'],
[24, 'satellite_trail', 'trail caused by passby satellite'],
[25, 'stray_light', 'other serious stray light feature'],
[26, 'open_pixel', 'pixel with poor electron bounding capability'],
[27, 'adj_open_pixel', 'neighboring pixel of any open pixel'],
[28, 'undefined', 'not defined'],
[29, 'phot_detection', 'source detection region (0 order source if on spectroscopic image)'],
[30, 'spec_detection', 'spectra detection region'],
[31, 'spec_contamination', 'pixel contaminated by other higher/lower order spectra'],
)
def __call__(self,
name: str,
) -> np.unsignedinteger:
return self.get_value(name)
def get_value(self,
name: str,
) -> np.unsignedinteger:
"""
Return the value of the flag.
The input flag name should be in lower case. The value will be returned in format of self.dtype.
If no name is matched, then it will raise an exception.
Parameters
----------
name : str
The name of the flag.
Returns
-------
numpy.unsignedinteger
The value of the flag.
"""
value = None
for entry in self.data:
if entry[1] == name:
value = self.dtype(2 ** entry[0])
if value is None:
raise KeyError('Invalid flag name: {}.'.format(name))
return value
def get_comment(self,
name: str,
) -> str:
"""
Return the comment string of the flag.
The input flag name should be in lower case.
If no name is matched, then it will raise an exception.
Parameters
----------
name : str
The name of the flag.
Returns
-------
str
The value of the flag.
"""
comment = None
for entry in self.data:
if entry[1] == name:
comment = entry[2]
if comment is None:
raise KeyError('Invalid flag name: {}.'.format(name))
return comment
def get_name(self,
bit: int,
) -> str | None:
"""
Return the flag name of the given bit.
The function searches all defined bit. If the bit is not defined, then None will be returned.
Parameters
----------
bit : int
The bit value.
Returns
-------
str or None
The name of the flag. If the bit is undefined, return None.
"""
name = None
for entry in self.data:
if entry[0] == bit:
name = entry[1]
return name
def print(self) -> None:
"""
Print the bitmask definition table.
"""
print('Bitmask definition:')
for i in range(np.iinfo(self.dtype).bits):
name = self.get_name(i)
if name is not None:
value = self(name)
comment = self.get_comment(name)
print(" {:2d}\t{:15d}\t\t{:20s}\t\t{}".format(i, value, name, comment))
def make_mask(self,
*names: str,
) -> np.unsignedinteger:
"""
Make a bitmask from a name list of masked bits.
All bits correponding to the given names will be flagged. Used for combination of several flags.
Parameters
----------
*names : str
Names of bit masks to be masked. Variable length.
Returns
-------
numpy.unsignedinteger
The mask (in unsigned integer format).
"""
outmask = self.dtype(0)
for name in set(names):
value = self.get_value(name)
outmask += value
return outmask
class CameraConfig:
"""
Define camera configurations: chip layout, LED, etc.
It inclues the filter, the pixel array configuration, LED, etc.
Attributes
----------
chip : astropy.table.Table
The table containing the chip information.
led : astropy.table.Table
The table containing the LED information.
"""
def __init__(self):
self._version = 'sim_202312'
self._comment = ''
self.chip = Table()
self.chip['index'] = np.arange(30, dtype=int)
self.chip['chipid'] = ['01', '02', '03', '04', '05',
'06', '07', '08', '09', '10',
'11', '12', '13', '14', '15',
'16', '17', '18', '19', '20',
'21', '22', '23', '24', '25',
'26', '27', '28', '29', '30']
self.chip['row'] = [0, 0, 0, 0, 0,
1, 1, 1, 1, 1,
2, 2, 2, 2, 2,
3, 3, 3, 3, 3,
4, 4, 4, 4, 4,
5, 5, 5, 5, 5]
self.chip['column'] = [0, 1, 2, 3, 4,
0, 1, 2, 3, 4,
0, 1, 2, 3, 4,
0, 1, 2, 3, 4,
0, 1, 2, 3, 4,
0, 1, 2, 3, 4]
self.chip['filter'] = ['GI', 'GV', 'GU', 'GU', 'GV',
'y', 'i', 'g', 'r', 'GI',
'z', 'NUV', 'NUV', 'u', 'y',
'y', 'u', 'NUV', 'NUV', 'z',
'GI', 'r', 'g', 'i', 'y',
'GV', 'GU', 'GU', 'GV', 'GI']
self.chip['chiplab'] = ['GI-1', 'GV-1', 'GU-1', 'GU-2', 'GV-2',
'y-1', 'i-1', 'g-1', 'r-1', 'GI-2',
'z-1', 'NUV-1', 'NUV-2', 'u-1', 'y-2',
'y-3', 'u-2', 'NUV-3', 'NUV-4', 'z-2',
'GI-3', 'r-2', 'g-2', 'i-2', 'y-4',
'GV-3', 'GU-3', 'GU-4', 'GV-4', 'GI-4']
self.chip['array_config'] = ['CCDArrayConfig', ] * 30
self.led = Table()
self.led['index'] = np.arange(28)
self.led['dev'] = np.append(['main'] * 14, ['back'] * 14)
self.led['id'] = [i + 1 for i in range(14)] * 2
self.led['name'] = ['LED{:02d}'.format(i + 1) for i in range(14)] * 2
self.led['wave'] = np.array([275, 310, 430, 505, 545, 590, 670,
760, 880, 940, 1050, 1550, 340, 365] * 2).astype(float)
self.led['filter_phot'] = ['NUV', 'NUV', 'g', 'g', 'g', 'r', 'r',
'i', 'z', 'y', 'J', 'H', 'u', 'u'] * 2
self.led['filter_spec'] = ['GU', 'GU', 'GV', 'GV', 'GV', 'GV', 'GI',
'GI', 'GI', 'GI', 'J', 'H', 'GU', 'GU'] * 2
class CCDArrayConfig:
"""
Defines pixel array configuration.
Basic pixel array configuration class. A parent class for all other array configurations.
Default values are from e2v CCD290-99.
Parameters
----------
image_format : str
Data format:
"raw": readout oriented raw format (default);
"cube": data cube, all channels are stacked on z direction;
"sky": sky oriented mosaic format.
nx : int, optional
The number of pixels in each channel in x (serial readout) direction.
nx_active : int
The number of active pixels in each channel in x (serial readout) direction.
nx_prescan_physical : int or list or tuple or numpy.ndarray, optional
The number of physical prescan pixels in each channel in x (serial readout) direction.
nx_prescan : int or list or tuple or numpy.ndarray, optional
The number of prescan pixels in each channel in x (serial readout) direction.
ny : int, optional
The number of pixels in each channel in y (parallel readout) direction.
ny_active : int
The number of active pixels in each channel in y (serial readout) direction.
ny_prescan_physical : int or list or tuple or numpy.ndarray, optional
The number of physical prescan pixels in each channel in y (parallel readout) direction.
ny_prescan : int or list or tuple or numpy.ndarray, optional
The number of prescan pixels in each channel in y (parallel readout) direction.
version : str, optional
The version tag.
comment : str, optional
Comment to this version.
Attributes
----------
nchan : int
The number of channels.
nx_overscan : numpy.ndarray
A list of numbers of overscan pixels in each channel in x (serial readout) direction.
ny_overscan : numpy.ndarray
A list of numbers of overscan pixels in each channel in y (parallel readout) direction.
Methods
-------
has_valid_overscan(minwidth, axis='x')
To judge if there are enough overscan region for overscan correction.
get_shape(full=True)
Return the shape of array (numpy default definition).
get_boundary(channel, rtype='active', full=True)
Calculate boundary indices for active, prescan, overscan, channel and physical pixel regions.
get_cutout(channel, image, rtype='active', flip=True)
Get image cutout of specified region (active, prescan, overscan or channel).
set_cutout(channel, image, cutout, rtype='active', flip=True)
Paste the cutout into specific region of the image.
Examples
--------
conf0 = CCDArrayConfig("raw")
conf1 = CCDArrayConfig("sky")
conf0.has_valid_overscan(100, axis='y') # False
conf1.get_shape(full=False) # 1152, 4616
conf0.get_range(1, rtype='channel') # 0, 1250, 0, 4700
conf1.get_range(1, rtype='active') # 0, 1152, 0, 4616
cutout = conf0.get_cutout(4, image, rtype='x_overscan') # serial overscan region: numpy array (4616, 71)
conf0.set_cutout(4, image2, cutout, rtype='x_overscan') # paste the cutout back to some other image
"""
def __init__(self,
image_format: str,
nx: int = 1250,
nx_active: int = 1152,
nx_prescan_physical: int | list | tuple | np.ndarray = 27,
nx_prescan: int | list | tuple | np.ndarray = 27,
ny: int = 4700,
ny_active: int = 4616,
ny_prescan_physical: int | list | tuple | np.ndarray = 0,
ny_prescan: int | list | tuple | np.ndarray = 0,
version: str = '',
comment: str = '',
):
self.image_format = image_format
if self.image_format not in ['raw', 'cube', 'sky']:
raise ValueError('Invalid image type: "{}". Must be "raw", "cube" or "sky".'.format(self.image_format))
self.version = version
self.comment = comment
# physical configuration
self.nchan = 16
self.nx_active = nx_active
self.ny_active = ny_active
# prescan
if type(nx_prescan_physical) is int:
self.nx_prescan_physical = np.full(self.nchan, nx_prescan_physical)
else:
self.nx_prescan_physical = np.asarray(nx_prescan_physical).astype(int)
if type(ny_prescan_physical) is int:
self.ny_prescan_physical = np.full(self.nchan, ny_prescan_physical)
else:
self.ny_prescan_physical = np.asarray(ny_prescan_physical).astype(int)
# single channel configuration
# always follow the readout sequence: prescan -> active -> overscan
if self.image_format == 'sky':
self.nx, self.ny = self.nx_active, self.ny_active # no prescan or overscan
self.nx_prescan = np.full(self.nchan, 0, dtype=int)
self.ny_prescan = np.full(self.nchan, 0, dtype=int)
else:
self.nx, self.ny = nx, ny # include prescan and overscan
if type(nx_prescan) is int:
self.nx_prescan = np.full(self.nchan, nx_prescan)
else:
self.nx_prescan = np.asarray(nx_prescan).astype(int)
if type(ny_prescan) is int:
self.ny_prescan = np.full(self.nchan, ny_prescan)
else:
self.ny_prescan = np.asarray(ny_prescan).astype(int)
self.nx_overscan = self.nx - self.nx_prescan - self.nx_active
self.ny_overscan = self.ny - self.ny_prescan - self.ny_active
self.shape = (self.ny, self.nx)
# image configuration
# xflip and yflip decide if the channel cutout should be flipped when making full image
if self.image_format == 'raw':
self.full_shape = (self.ny, self.nx * 16)
self.col = np.arange(16, dtype=int)
self.row = np.zeros(16, dtype=int)
self.xflip = np.zeros(16, dtype=bool)
self.yflip = np.zeros(16, dtype=bool)
elif self.image_format == 'cube':
self.full_shape = (16, self.ny, self.nx)
self.col = np.zeros(16, dtype=int)
self.row = np.zeros(16, dtype=int)
self.xflip = np.zeros(16, dtype=bool)
self.yflip = np.zeros(16, dtype=bool)
else:
self.full_shape = (self.ny * 2, self.nx * 8)
self.col = np.asarray([0, 1, 2, 3, 4, 5, 6, 7, 7, 6, 5, 4, 3, 2, 1, 0])
self.row = np.asarray([0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1])
self.xflip = np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0]).astype(bool)
self.yflip = np.asarray([0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1]).astype(bool)
def print(self):
"""
Print configuration.
"""
print('CLASS: {}'.format(self.__class__.__name__))
for key in self.__dict__:
if type(self.__dict__[key]) is str:
print(' {} = "{}"'.format(key, self.__dict__[key]))
else:
print(' {} = {}'.format(key, self.__dict__[key]))
def has_valid_overscan(self,
minwidth: int,
axis: str = 'x',
) -> bool:
"""
Judge if there are enough overscan region for overscan correction.
Judge if there are enough overscan region for overscan correction.
The function can be used for exception raising when overscan is used.
Parameters
----------
minwidth : int
Minimum width of valid overscan region.
axis : str, optional
Overscan direction, "x" (default) or "y".
Returns
-------
bool
If True, the configuration contains valid overscan region.
"""
if axis == 'x':
index = self.nx_overscan < minwidth
else:
index = self.ny_overscan < minwidth
if index.sum() > 0:
flag = False
else:
flag = True
return flag
def get_shape(self,
full: bool = True,
) -> tuple:
"""
Return the shape of array (numpy default definition).
This function returns the image shape, for both full image and channel image.
Parameters
----------
full : bool, optional
If True, return the shape of full image; otherwise return the shape of single channel.
Returns
-------
tuple
The shape of image, in format of tuple (ny, nx) or (nz, ny, nx).
"""
if full:
return self.full_shape
else:
return self.shape
def get_boundary(self,
channel: int,
rtype: str = 'active',
full: bool = True,
) -> tuple[int, int, int, int]:
"""
Calculate boundary indices for active, prescan, overscan, channel and physical pixel regions.
This function can return corner indices for different kind of regions. It could be used for image cutout.
The indices are in python format (start from 0).
Parameters
----------
channel : int
Channel number (start from 1).
rtype : str, optional
Region type:
"active": area exposed to external light (default);
"channel": the whole channel;
"x_prescan": serial prescan region (including artificial and physical pixels);
"y_prescan": parallel prescan region;
"x_overscan": serial overscan region;
"y_overscan": parallel overscan region;
"physical": region with physical pixels, including physical part of prescan and active region;
"cti": region affected by charge transfer inefficiency, i.e. all pixels except artificial prescan.
full : bool, optional
If True (default), return the coordinates on the full image;
otherwise return the coordinates on single channel cutout.
Returns
-------
x1 : int
X start coordinate of the desired region (python indexing).
x2 : int
X end coordinate of the desired region (python indexing).
y1 : int
Y start coordinates of the desired region (python indexing).
y2 : int
Y end coordinates of the desired region (python indexing).
"""
xcoords = [0, self.nx_prescan[channel - 1], self.nx_prescan[channel - 1] + self.nx_active, self.nx]
ycoords = [0, self.ny_prescan[channel - 1], self.ny_prescan[channel - 1] + self.ny_active, self.ny]
# one single channel cutout: readout at (0, 0)
if rtype == 'active':
x1, x2 = xcoords[1], xcoords[2]
y1, y2 = ycoords[1], ycoords[2]
elif rtype == 'channel':
x1, x2 = xcoords[0], xcoords[3]
y1, y2 = ycoords[0], ycoords[3]
elif rtype in ['x_prescan', 'x_overscan']:
y1, y2 = ycoords[1], ycoords[2]
if rtype == 'x_prescan':
x1, x2 = xcoords[0], xcoords[1]
else:
x1, x2 = xcoords[2], xcoords[3]
elif rtype in ['y_prescan', 'y_overscan']:
x1, x2 = xcoords[1], xcoords[2]
if rtype == 'y_prescan':
y1, y2 = ycoords[0], ycoords[1]
else:
y1, y2 = ycoords[2], ycoords[3]
elif rtype == 'physical':
x1, x2 = xcoords[1] - self.nx_prescan_physical[channel - 1], xcoords[2]
y1, y2 = ycoords[1] - self.ny_prescan_physical[channel - 1], ycoords[2]
elif rtype == 'cti':
x1, x2 = xcoords[1] - self.nx_prescan_physical[channel - 1], xcoords[3]
y1, y2 = ycoords[1] - self.ny_prescan_physical[channel - 1], ycoords[3]
else:
raise ValueError('Invalid region type. Please use one of them: ' +
'"active", "x_prescan", "y_prescan", "x_overscan", "y_overscan", ' +
'"channel", "physical", "cti"')
# if on full mosaic image, flip and shift the coordinates
if full:
if self.xflip[channel - 1]:
x1, x2 = self.nx - x2, self.nx - x1
if self.yflip[channel - 1]:
y1, y2 = self.ny - y2, self.ny - y1
x1 = self.col[channel - 1] * self.nx + x1
x2 = self.col[channel - 1] * self.nx + x2
y1 = self.row[channel - 1] * self.ny + y1
y2 = self.row[channel - 1] * self.ny + y2
return x1, x2, y1, y2
def get_cutout(self,
channel: int,
image: np.ndarray,
rtype: str = 'active',
flip: bool = True,
) -> np.ndarray:
"""
Get image cutout of specified region (active, prescan, overscan or channel).
A 2D image cutout will be extracted by requirement.
The input image should be either full image or single channel image.
Parameters
----------
channel : int
Channel number (start from 1).
image : numpy.ndarray
Input image.
rtype : str, optional
Region type. See get_boundary() for more details.
flip : bool, optional
If True (default), return the flipped image cutout according to xflip/yflip parameter;
otherwise return the cutout with the same orientation as input image.
It only works if the input image is full image.
Returns
-------
numpy.ndarray
2D image cutout of desired region.
"""
if np.array_equal(image.shape, self.get_shape(full=True)):
full = True
elif np.array_equal(image.shape, self.get_shape(full=False)):
full = False
else:
raise ValueError('Invalid input image shape: {}.'.format(image.shape))
x1, x2, y1, y2 = self.get_boundary(channel, rtype=rtype, full=full)
if x1 == x2 or y1 == y2:
raise IndexError('No such region exists. Please check your configuration attributes.')
if len(image.shape) == 3:
cutout = image[channel - 1, y1:y2, x1:x2]
else:
cutout = image[y1:y2, x1:x2]
if full and flip:
if self.xflip[channel - 1]:
cutout = cutout[:, ::-1]
if self.yflip[channel - 1]:
cutout = cutout[::-1, :]
return cutout
def set_cutout(self,
channel: int,
image: np.ndarray,
cutout: np.ndarray,
rtype: str = 'active',
flip: bool = True,
) -> None:
"""
Paste a 2D image cutout to a specified region of image.
The specific region of the image will be overwritten by the cutout. Caution should be taken here.
Parameters
----------
channel : int
Channel number (start from 1).
image : numpy.ndarray
Image to be updated.
cutout : numpy.ndarray
Input cutout. It must be a 2D array.
rtype : str, optional
Region type. See get_boundary() for more details.
flip : bool, optional
If True (default), return the flipped image cutout according to xflip/yflip parameter;
otherwise return the cutout with the same orientation as input image.
It only works if the input image is full image.
"""
if np.array_equal(image.shape, self.get_shape(full=True)):
full = True
elif np.array_equal(image.shape, self.get_shape(full=False)):
full = False
else:
raise ValueError('Invalid input image shape: {}.'.format(image.shape))
x1, x2, y1, y2 = self.get_boundary(channel, rtype=rtype, full=full)
if x1 == x2 or y1 == y2:
raise IndexError('No such region exists. Please check your configuration attributes.')
if not np.array_equal(cutout.shape, (y2 - y1, x2 - x1)):
raise ValueError('Invalid input cutout shape: {}. It should be {}'.format(cutout.shape, (y2 - y1, x2 - x1)))
cutout_paste = cutout.copy()
if full and flip:
if self.xflip[channel - 1]:
cutout_paste = cutout_paste[:, ::-1]
if self.yflip[channel - 1]:
cutout_paste = cutout_paste[::-1, :]
if len(image.shape) == 3:
image[channel - 1, y1:y2, x1:x2] = cutout_paste
else:
image[y1:y2, x1:x2] = cutout_paste
class TestCCD(CCDArrayConfig):
"""
Defines pixel array configuration for testing.
Pixel array configuration class for testing. A subclass of CCDArrayConfig.
Parameters
----------
image_format : str
Data format. See CCDArrayConfig for more details.
"""
def __init__(self,
image_format: str,
):
super().__init__(image_format,
nx=200,
ny=300,
nx_active=50,
ny_active=150,
nx_prescan_physical=np.full(4, 27, dtype=int),
nx_prescan=np.full(4, 27, dtype=int),
ny_prescan_physical=np.full(4, 4, dtype=int),
ny_prescan=np.full(4, 4, dtype=int),
version='test_202403',
comment='for function testing only')
self.nchan = 4
if self.image_format == 'raw':
self.full_shape = (self.ny, self.nx * 4)
self.col = np.arange(4, dtype=int)
self.row = np.zeros(4, dtype=int)
self.xflip = np.zeros(4, dtype=bool)
self.yflip = np.zeros(4, dtype=bool)
elif self.image_format == 'cube':
self.full_shape = (4, self.ny, self.nx)
self.col = np.zeros(4, dtype=int)
self.row = np.zeros(4, dtype=int)
self.xflip = np.zeros(4, dtype=bool)
self.yflip = np.zeros(4, dtype=bool)
else:
self.full_shape = (self.ny * 2, self.nx * 2)
self.col = np.asarray([0, 1, 1, 0])
self.row = np.asarray([0, 0, 1, 1])
self.xflip = np.asarray([0, 1, 1, 0]).astype(bool)
self.yflip = np.asarray([0, 0, 1, 1]).astype(bool)
class SimCCD(CCDArrayConfig):
"""
Defines pixel array configuration for CSST simulation.
Pixel array configuration class for CSST simulation. A subclass of CCDArrayConfig.
Parameters
----------
image_format : str
Data format. See CCDArrayConfig for more details.
"""
def __init__(self,
image_format: str,
):
super().__init__(image_format,
version='sim_202312',
comment='readout similar to e2v CCD290-99 NAOC laboratory test')
class GM1201APrototype(CCDArrayConfig):
"""
Defines pixel array configuration for CSST simulation.
Pixel array configuration class for CSST simulation. A subclass of CCDArrayConfig.
Parameters
----------
image_format : str
Data format. See CCDArrayConfig for more details.
"""
def __init__(self,
image_format: str,
):
x_prescan_physical = np.full(16, 27, dtype=int)
x_prescan_physical[[0, 7, 8, 15]] = 29
x_prescan = np.full(16, 29, dtype=int)
super().__init__(image_format,
nx_prescan_physical=x_prescan_physical,
nx_prescan=x_prescan,
ny_prescan_physical=4,
ny_prescan=4,
version='gm1201a_202208',
comment='GM1201A user manual V2.7 and NAOC laboratory test')
def get_array_config(name: str,
) -> type[CCDArrayConfig]:
"""
Get the pixel array configuration class by name.
Return the type of class by giving its class name. Need to be called to make a new instance.
Parameters
----------
name : str
Name of the class.
Returns
-------
generic
Class of the specific name.
Examples
--------
thisclass = get_array_config('CCDArrayConfig') # CCDArrayConfig
conf = thisclass() # CCDArrayConfig()
"""
if name not in __all__ or name in ['FlagConfig', 'CameraConfig', 'get_array_config']:
raise ValueError('Invalid class name.')
return eval(name)
"""
Identifier: csst_msc_mbi_instrument/csst_msc_mbi_instrument.py
Name: csst_msc_mbi_instrument.py
Description: Make the correction for the intrument effect of CSST MS-MBI
Author: Zhou Xie, created
Created: 2023-11-27
Modified-History:
2023-11-27, Zhou Xie, created
"""
from datetime import datetime
from typing import Optional
import csst_common
import numpy as np
from itertools import product
from astropy.io import fits
from astropy.stats import sigma_clipped_stats
# from csst_common import CsstResult, CsstStatus
# from csst_msc_instrument import (
# remove_cr_deepcr,
# subtract_bias,
# subtract_dark,
# FlagConfig,
# )
from csst_msc_instrument.image import (
subtract_bias,
subtract_dark,
FlagConfig,
)
SATURATE = 50000
def core_msc_l1_mbi_instrument(
image_path: str = "/path/to/image",
bias_path: str = "/path/to/bias",
......@@ -21,7 +46,7 @@ def core_msc_l1_mbi_instrument(
config_ccd_info: Optional[str] = None,
config_bad_pixel: Optional[str] = None,
device: Optional[str] = "CPU",
):
) #-> CsstResult:
"""
Make the instrument correction for one chip of CSST data.
......@@ -63,32 +88,36 @@ def core_msc_l1_mbi_instrument(
# CCD image processing
with fits.open(image_path) as raw:
hdu = fits.HDUList([
fits.PrimaryHDU(
header=raw[0].header.copy()),
fits.ImageHDU(
header=raw[1].header.copy(),
data=raw[1].data.copy())
])
hdu = fits.HDUList(
[
fits.PrimaryHDU(header=raw[0].header.copy()),
fits.ImageHDU(header=raw[1].header.copy(), data=raw[1].data.copy()),
]
)
# 创建wht[1].data hdulist
wht = fits.HDUList([
fits.PrimaryHDU(
header=raw[0].header.copy()),
fits.ImageHDU(
header=raw[1].header.copy(),
data=np.zeros_like(hdu[1].data, dtype=np.float32))
])
wht = fits.HDUList(
[
fits.PrimaryHDU(header=raw[0].header.copy()),
fits.ImageHDU(
header=raw[1].header.copy(),
data=np.zeros_like(hdu[1].data, dtype=np.float32),
),
]
)
# 创建flg[1].data hdulist
flg = fits.HDUList([
fits.PrimaryHDU(
header=raw[0].header.copy()),
fits.ImageHDU(
header=raw[1].header.copy(),
data=np.zeros_like(hdu[1].data, dtype=np.uint32))])
flg = fits.HDUList(
[
fits.PrimaryHDU(header=raw[0].header.copy()),
fits.ImageHDU(
header=raw[1].header.copy(),
data=np.zeros_like(hdu[1].data, dtype=np.uint32),
),
]
)
img_err = np.zeros_like(hdu[1].data)
# shutter
shutter = fits.getdata(shutter_path, 1)
# # shutter
# shutter = fits.getdata(shutter_path, 1)
bias, bias_error = _read_ccds(bias_path)
dark, dark_error = _read_ccds(dark_path)
flat, flat_error = _read_ccds(flat_path)
......@@ -99,16 +128,12 @@ def core_msc_l1_mbi_instrument(
bias,
bias_error,
)
status_header.set('STA_BIAS', 0)
status_header.set("S_BIAS", 0)
hdu[1].data, img_err = subtract_dark(
hdu[1].data,
img_err,
dark,
dark_error,
hdu[0].header["EXPTIME"]
hdu[1].data, img_err, dark, dark_error, hdu[0].header["EXPTIME"]
)
status_header.set('STA_DARK', 0)
status_header.set("S_DARK", 0)
hdu[1].data, img_err = _divide_flat(
hdu[1].data,
......@@ -116,159 +141,124 @@ def core_msc_l1_mbi_instrument(
flat,
flat_error,
)
status_header.set('STA_FLAT', 0)
status_header.set("S_FLAT", 0)
# 非线性改正
hdu[1].data = _fix_nonlinear(hdu[1].data)
status_header.set('STA_NLIN', 0)
status_header.set("S_NLIN", 0)
# 坏像素检测
flg[1].data = _check_badpix(
flg[1].data,
flat
)
flg[1].data = _check_badpix(flg[1].data, flat)
# 热像素和暖像素检测
flg[1].data = _check_hot_and_warm_pix(
flg[1].data,
dark,
hdu[0].header["EXPTIME"],
hdu[1].header["RON01"]
flg[1].data, dark, hdu[0].header["EXPTIME"], hdu[1].header["RON01"]
)
# 过饱和检测
saturate = SATURATE
flg[1].data = _check_over_saturation(
flg[1].data,
hdu[1].data,
saturate
)
saturate = (saturate
* hdu[1].header["GAIN01"]
/ hdu[0].header["EXPTIME"])
status_header.set('SATURATE', saturate)
flg[1].data = _check_over_saturation(flg[1].data, hdu[1].data, saturate)
saturate = saturate * hdu[1].header["GAIN01"] / hdu[0].header["EXPTIME"]
status_header.set("SATURATE", saturate)
# 时间归一
hdu[1].data = hdu[1].data/hdu[0].header["EXPTIME"]
hdu[1].data = hdu[1].data / hdu[0].header["EXPTIME"]
# 单位转换
hdu[1].data = _multiply_gain_map(hdu[1].data, hdu[1].header)
status_header.set('BUNIT', 'e/s')
# # 检测宇宙线
# hdu[1].header.set('BUNIT', 'electron/s')
status_header.set("BUNIT", "electron/s")
# 检测宇宙线
# hdu[1].data, flg[1].data, cr_count = remove_cr_deepcr(
# hdu[1].data,
# flg[1].data,
# device,
# hdu[1].header['CHIPID'],
# hdu[1].header["CHIPID"],
# )
# status_header.set('STA_CRS', 0)
# status_header.set('CRCOUNT', cr_count)
# status_header.set("S_CRS", 0)
# status_header.set("CRCOUNT", cr_count)
# 记录BKG与RMS
_, bkg, rms = sigma_clipped_stats(
data=hdu[1].data,
mask=(flg[1].data == 16)
)
status_header.set('SKY_BKG', bkg)
status_header.set('SKY_RMS', rms)
_, bkg, rms = sigma_clipped_stats(data=hdu[1].data, mask=(flg[1].data == 16))
status_header.set("SKY_BKG", bkg)
status_header.set("SKY_RMS", rms)
# 检测卫星拖尾
flg[1].data = _check_satellitetrail(
flg[1].data,
hdu[1].data
)
status_header.set('STA_SAT', 0)
flg[1].data = _check_satellitetrail(flg[1].data, hdu[1].data)
status_header.set("S_SAT", 0)
# 生成wht[1].data
wht[1].data = _create_weight(
hdu[1].data,
flg[1].data,
hdu[1].header["RON01"],
hdu[0].header["EXPTIME"]
hdu[1].data, flg[1].data, hdu[1].header["RON01"], hdu[0].header["EXPTIME"]
)
# 数据类型
hdu[1].data = hdu[1].data.astype(np.float32)
# 快门改正
hdu[1].data = _fix_shutter(
hdu[1].data,
shutter,
hdu[0].header["EXPTIME"]
)
status_header.set('STA_SHUT', 0)
if all([status_header["STA_BIAS"] == 0,
status_header["STA_DARK"] == 0,
status_header["STA_FLAT"] == 0,
status_header["STA_CRS"] == 0,
status_header["STA_NLIN"] == 0,
# status_header["STA_CTE"] == 0, # 这个功能还未添加
status_header["STA_SAT"] == 0,
status_header["STA_SHUT"] == 0,
# # 快门改正
# hdu[1].data = _fix_shutter(hdu[1].data, shutter, hdu[0].header["EXPTIME"])
# status_header.set("S_SHUT", 0)
if all(
[
status_header["S_BIAS"] == 0,
status_header["S_DARK"] == 0,
status_header["S_FLAT"] == 0,
# status_header["S_CRS"] == 0,
status_header["S_NLIN"] == 0,
# status_header["S_CTE"] == 0, # 这个功能还未添加
status_header["S_SAT"] == 0,
# status_header["S_SHUT"] == 0,
status_header["SKY_BKG"] != -9999,
status_header["SKY_RMS"] != -9999,
status_header["SATURATE"] != -9999,
status_header["CRCOUNT"] != -9999]):
status_header.set('STA_INST', 0)
# status_header["CRCOUNT"] != -9999,
]
):
status_header.set("S_INST", 0)
# hdu[1].header.extend(status_header, bottom=True)
# 修正wht和flg数据类型
wht[1].header = hdu[1].header.copy()
wht[1].data = wht[1].data.astype(np.float32)
wht[1].header.remove('BUNIT')
wht[1].header.remove("BUNIT")
flg[1].header = hdu[1].header.copy()
flg[1].data = flg[1].data.astype(np.int32)
flg[1].header.remove('BUNIT')
flg[1].data = flg[1].data.astype(np.uint32)
flg[1].header.set("BUNIT", "dimensionless unit")
hdu.writeto(image_output_path, overwrite=True)
wht.writeto(weight_output_path, overwrite=True)
flg.writeto(flag_output_path, overwrite=True)
# # construct CsstResult
# result = CsstResult(
# status=CsstStatus(status_header["S_INST"]), # 0/1/2\
# header=status_header,
# files=[
# image_output_path,
# weight_output_path,
# flag_output_path,
# ],
# )
# return result
def _set_status_header():
header_path = __file__.replace(
"csst_msc_mbi_instrument.py", "instrument_cor_demo.head"
)
demo_header = fits.getheader(header_path, ignore_missing_simple=True)
status_header = fits.Header() # template Header
status_header.append(
('COMMENT', '=' * 66), bottom=True)
status_header.append(
('COMMENT', 'instrumental correction information'), bottom=True)
status_header.append(
('COMMENT', '=' * 66), bottom=True)
# status_header.append(
# ('VER_INST', csst_common.__version__, 'version of instrument'), bottom=True)
# status_header.append(
# ('STM_INST', csst_common.now(), 'time stamp of instrument processing'), bottom=True)
status_header.append(
('STA_INST', 1, '0=done 1=wrong'), bottom=True)
status_header.append(
('STA_BIAS', 1, 'status flag for bias frame correction'), bottom=True)
status_header.append(
('STA_DARK', 1, 'status flag for dark frame correction'), bottom=True)
status_header.append(
('STA_FLAT', 1, 'status flag for flat frame correction'), bottom=True)
status_header.append(
('SKY_BKG', -9999, 'estimated sky background (e-/s per pixel)'), bottom=True)
status_header.append(
('SKY_RMS', -9999, 'standard dev of frame background (ADU)-> e-/s'), bottom=True)
status_header.append(
('SATURATE', -9999, 'the flux limit of saturated pixel (e-/s)'), bottom=True)
status_header.append(
('STA_CTE', 1, 'status flag for CTE correction'), bottom=True)
status_header.append(
('STA_SAT', 1, 'status flag for satellite correction'), bottom=True)
status_header.append(
('STA_CRS', 1, 'status flag for cosmic rays mask'), bottom=True)
status_header.append(
('CRCOUNT', -9999, 'cosmic rays pixel counts'), bottom=True)
status_header.append(
('STA_NLIN', 1, 'status flag for non-linear correction'), bottom=True)
status_header.append(
('STA_SHUT', 1, 'status flag for shutter effect correction'), bottom=True)
status_header.append(("COMMENT", "=" * 66), bottom=True)
status_header.append(("COMMENT", "INSTRUMENT CORRECTION INFORMATION"), bottom=True)
status_header.append(("COMMENT", "=" * 66), bottom=True)
for card in demo_header.cards:
status_header.append(card, bottom=True)
return status_header
def _read_ccds(
ref_path: str,
) -> tuple[np.array, np.array]:
ref = fits.getdata(ref_path)
return ref, np.zeros_like(ref)
def _divide_flat(
image: np.ndarray,
image_error: np.ndarray,
......@@ -277,68 +267,88 @@ def _divide_flat(
) -> tuple[np.ndarray, np.ndarray]:
def divide(a, b):
return np.divide(a, b, out=np.zeros_like(a, float), where=(b != 0))
result = divide(image, flat)
result_error = np.abs(result) * ((divide(image_error, image)) ** 2 +
(divide(flat_error, flat)) ** 2)**0.5
result_error = (
np.abs(result)
* ((divide(image_error, image)) ** 2 + (divide(flat_error, flat)) ** 2) ** 0.5
)
return result, result_error
def _fix_nonlinear(
image: np.ndarray,
beta1: float = 5e-7
) -> np.ndarray:
def _fix_nonlinear(image: np.ndarray, beta1: float = 5e-7) -> np.ndarray:
# 计算非线性系数
beta = 5e-7
f1 = [1]
fnlin0 = 1+beta
fnlin0 = 1 + beta
fnlin = [fnlin0]
for i in range(1000, 65000, 1000):
f1.append(i)
fnlin.append(1+beta*i)
fnlin.append(1 + beta * i)
# 插值函数
from scipy.interpolate import PchipInterpolator as PCHIP
# PCHIP stands for Piecewise Cubic Hermite Interpolating Polynomial
interp_func = PCHIP(f1, fnlin)
# 非线性系数矩阵
imgnlin = interp_func(image)
# 图像改正
image = imgnlin*image
image = imgnlin * image
return image
def _check_badpix(
flag: np.ndarray,
flat: np.ndarray,
) -> np.ndarray:
med = np.median(flat)
_flag = (flat < 0.5 * med) | (1.5 * med < flat)
flag = flag | (_flag * 1)
flagconf = FlagConfig()
_flag = _flag.astype(flagconf.dtype)
value = flagconf.get_value("dead_pixel")
flag = flag | (_flag * value)
return flag
def _check_hot_and_warm_pix(
flag: np.ndarray,
dark: np.ndarray,
exptime: float,
rdnoise: float,
) -> np.ndarray:
flagconf = FlagConfig()
_dark = dark * exptime
_dark[_dark < 0] = 0
_flag = 1 * rdnoise ** 2 <= _dark # 不确定是否包含 暂定包含
flag = flag | (_flag * 2)
_flag = (0.5 * rdnoise ** 2 < _dark) & (_dark < 1 * rdnoise ** 2)
flag = flag | (_flag * 4)
_flag = 1 * rdnoise**2 <= _dark # 不确定是否包含 暂定包含
_flag = _flag.astype(flagconf.dtype)
value = flagconf.get_value("hot_pixel")
flag = flag | (_flag * value)
_flag = (0.5 * rdnoise**2 < _dark) & (_dark < 1 * rdnoise**2)
_flag = _flag.astype(flagconf.dtype)
value = flagconf.get_value("warm_pixel")
flag = flag | (_flag * value)
return flag
def _check_over_saturation(
flag: np.ndarray,
image: np.ndarray,
saturate: int = 65535,
iterations: int = 0,
flag_value: int = 8
) -> np.ndarray:
_flag = image >= saturate
flag_dilated = _dilation(_flag, iterations=iterations)
flag = flag | (flag_dilated * flag_value)
_flag = _dilation(_flag, iterations=iterations)
flagconf = FlagConfig()
_flag = _flag.astype(flagconf.dtype)
value = flagconf.get_value("saturated")
flag = np.bitwise_or(flag, _flag * value)
return flag
def _dilation(array_orig: np.ndarray, iterations: int = 1):
"""
Make a dilation for array
......@@ -358,18 +368,19 @@ def _dilation(array_orig: np.ndarray, iterations: int = 1):
The mask array after dilation.
"""
from scipy import ndimage
struct1 = ndimage.generate_binary_structure(
2, 1) # give a saturate structure
struct_ext = ndimage.iterate_structure(
struct1, iterations) # make a dilation
struct1 = ndimage.generate_binary_structure(2, 1) # give a saturate structure
struct_ext = ndimage.iterate_structure(struct1, iterations) # make a dilation
if iterations == 0:
array_out = array_orig
else:
array_out = ndimage.binary_dilation(
array_orig, structure=struct_ext, iterations=1).astype(array_orig.dtype)
array_orig, structure=struct_ext, iterations=1
).astype(array_orig.dtype)
return array_out
def _check_satellitetrail(
flag: np.ndarray,
image: np.ndarray,
......@@ -377,36 +388,37 @@ def _check_satellitetrail(
from skimage import exposure, transform
from skimage.draw import line
from skimage.feature import canny
# 调整图像
p1, p2 = np.percentile(image, (0.1, 99.9))
image = exposure.rescale_intensity(image, in_range=(p1, p2))
# 边界识别,转化为二值图像
immax = np.max(image)
# canny边界
edge = canny(image=image,
sigma=2.0,
low_threshold=immax * 0.1,
high_threshold=immax * 0.5)
edge = canny(
image=image, sigma=2.0, low_threshold=immax * 0.1, high_threshold=immax * 0.5
)
# 概率霍夫变换
angle = np.radians(np.arange(2, 178, 0.5, dtype=float))
# 直线端点
result = transform.probabilistic_hough_line(image=edge,
threshold=210,
line_length=400,
line_gap=75,
theta=angle)
result = transform.probabilistic_hough_line(
image=edge, threshold=210, line_length=400, line_gap=75, theta=angle
)
result = np.asarray(result)
# 绘制mask
mask = np.zeros(image.shape)
_flag = np.zeros(image.shape)
# 遍历每一条线段
for (x1, y1), (x2, y2) in result:
xx, yy = line(x1, y1, x2, y2)
mask[yy, xx] = 1
mask = mask.astype(np.int32)
flag = flag | (mask * 32)
_flag[yy, xx] = 1
flagconf = FlagConfig()
_flag = _flag.astype(flagconf.dtype)
value = flagconf.get_value("satellite_trail")
flag = np.bitwise_or(flag, _flag * value)
return flag
def _create_weight(
img: np.ndarray,
flg: np.ndarray,
......@@ -415,104 +427,45 @@ def _create_weight(
) -> np.ndarray:
data = img.copy()
data[img < 0] = 0
var_raw = data * exptime + rdnoise ** 2
var_raw = data * exptime + rdnoise**2
var_bias = 0.0
weight = 1. / (var_raw + var_bias) * exptime ** 2
weight = 1.0 / (var_raw + var_bias) * exptime**2
weight[flg > 0] = 0
wht = weight
return wht
def _fix_shutter(
img: np.ndarray,
shutter: np.ndarray,
exptime: float = 150.
img: np.ndarray, shutter: np.ndarray, exptime: float = 150.0
) -> np.ndarray:
img_cor = np.float32(img * exptime*1000. / (exptime*1000. + shutter))
img_cor = np.float32(img * exptime * 1000.0 / (exptime * 1000.0 + shutter))
return img_cor
def _multiply_gain_map(
image: np.ndarray,
header: fits.header,
image: np.ndarray,
header: fits.header,
) -> np.ndarray:
gain_map = np.zeros_like(image)
h = header['NAXIS1'] // 8
w = header['NAXIS2'] // 2
h = header["NAXIS1"] // 8
w = header["NAXIS2"] // 2
for y, x in product(range(8), (0, 1)):
y = y if x == 0 else 7-y
gain = header['GAIN' + str(x*8+y+1).zfill(2)]
gain_map[w*x:w*(x+1), h*y:h*(y+1)] = gain
y = y if x == 0 else 7 - y
gain = header["GAIN" + str(x * 8 + y + 1).zfill(2)]
gain_map[slice(w * x, w * (x + 1)), slice(h * y, h * (y + 1))] = gain
# gain_map[w * x : w * (x + 1), h * y : h * (y + 1)] = gain
return image * gain_map
def subtract_bias(image: np.ndarray,
image_err: np.ndarray,
bias: np.ndarray,
bias_err: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""
Subtract bias.
Subtract bias.
Parameters
----------
image : numpy.ndarray
The input image to be corrected.
image_err : numpy.ndarray
The uncertainty of input image.
bias : numpy.ndarray
The input bias to be subtracted.
bias_err : numpy.ndarray
The uncertainty of input bias.
Returns
-------
output : np.ndarray
Output corrected image.
output_err : np.ndarray
Output uncertainty map.
"""
output = image - bias
output_err = np.sqrt(image_err ** 2 + bias_err ** 2)
return output, output_err
def subtract_dark(image: np.ndarray,
image_err: np.ndarray,
dark: np.ndarray,
dark_err: np.ndarray,
tdark_image: float = 1.0,
) -> tuple[np.ndarray, np.ndarray]:
"""
Subtract dark current.
Subtract dark current.
Parameters
----------
image : numpy.ndarray
The input image to be corrected.
image_err : numpy.ndarray
The uncertainty of input image.
dark : numpy.ndarray
The input dark current image to be subtracted.
dark_err : numpy.ndarray
The uncertainty of input dark current.
tdark_image : float, optional
The effective dark current cumulation time of input image. Default value is 1.0.
Returns
-------
output : np.ndarray
Output corrected image.
output_err : np.ndarray
Output uncertainty map.
"""
output = image - dark * tdark_image
output_err = np.sqrt(image_err ** 2 + (dark_err * tdark_image) ** 2)
return output, output_err
# if __name__ == "__main__":
# result: CsstResult = core_msc_l1_mbi_instrument(
# image_path=r"D:\Zhou\Desktop\data\data_09.fits",
# bias_path=r"D:\Zhou\Desktop\data\bias_09.fits",
# dark_path=r"D:\Zhou\Desktop\data\dark_09.fits",
# flat_path=r"D:\Zhou\Desktop\data\flat_09.fits",
# shutter_path=r"D:\Zhou\Desktop\data\csst_msc_ms_shutter_09_000001.fits",
# image_output_path=r"D:\Zhou\Desktop\data\result\image_test_output.fits",
# weight_output_path=r"D:\Zhou\Desktop\data\result\weight_test_output.fits",
# flag_output_path=r"D:\Zhou\Desktop\data\result\flag_test_output.fits",
# )
# print(result.status == 0)
"""
Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/format.py
Name: format.py
Description: Data format conversion.
Author: Li Shao (shaoli@nao.cas.cn)
Created: 2023-11-16
Modified-History:
2023-11-16, Li Shao, created
2023-11-22, Li Shao, split from common_detector
2023-11-23, Li Shao, change the way to get CCDArrayConfig
2023-12-21, Li Shao, add function convert_format_one_cube
2024-02-06, Li Shao, move to csst_msc_instrument
2024-03-05, Li Shao, merge all three functions into one
"""
import numpy as np
from .config import get_array_config
__all__ = ["convert_format", ]
def convert_format(image: np.ndarray,
output_format: str,
array_config_name: str = 'CCDArrayConfig',
) -> np.ndarray:
"""
Convert image format.
The function will automatically detect input format and convert to the output format.
The output format includes: "raw", "cube", "sky".
Parameters
----------
image : numpy.ndarray
Input 2-D image.
output_format : str
The output format name: "raw", "cube" or "sky".
array_config_name : str, optional
The name of the pixel array configuration class. Default is "CCDArrayConfig".
Returns
-------
numpy.ndarray
Converted image. If convert from "sky" to other format, non-active region will be filled with 0.
"""
config = get_array_config(array_config_name)
conf_raw = config('raw')
conf_cube = config('cube')
conf_sky = config('sky')
shape_input = image.shape
if np.array_equal(shape_input, conf_raw.get_shape(full=True)):
iconf = conf_raw
elif np.array_equal(shape_input, conf_cube.get_shape(full=True)):
iconf = conf_cube
elif np.array_equal(shape_input, conf_sky.get_shape(full=True)):
iconf = conf_sky
else:
raise ValueError('Input data shape is inconsistent with array configuration.')
if output_format == 'raw':
oconf = conf_raw
elif output_format == 'cube':
oconf = conf_cube
elif output_format == 'sky':
oconf = conf_sky
else:
raise ValueError('Invalid output type: "{}". Must be "raw", "cube" or "sky".'.format(output_format))
if iconf == oconf:
print('Warning: input and output is the same. No format conversion is performed.')
return image
if iconf.image_format == 'sky' or output_format == 'sky':
rtype = 'active'
else:
rtype = 'channel'
oshape = oconf.get_shape(full=True)
output = np.zeros(oshape, dtype=image.dtype)
for i in range(iconf.nchan):
cutout = iconf.get_cutout(i+1, image, rtype=rtype)
oconf.set_cutout(i+1, output, cutout, rtype=rtype)
return output
"""
Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/__init__.py
Name: __init__.py
Description: Instrument effect module (basic operations).
Author: Li Shao (shaoli@nao.cas.cn)
Created: 2023-11-27
Modified-History:
2023-11-27, Li Shao, created
2024-02-06, Li Shao, move to csst_msc_instrument
"""
from .basic import subtract_bias, subtract_dark
from .crosstalk import remove_crosstalk
from .gain import apply_gain
from .overscan import correct_overscan
__all__ = ['correct_overscan', 'apply_gain', 'subtract_bias', 'remove_crosstalk', 'subtract_dark', ]
\ No newline at end of file
"""
Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/basic.py
Name: basic.py
Description: Basic instrument effect correction.
Author: Li Shao (shaoli@nao.cas.cn)
Created: 2023-11-16
Modified-History:
2023-11-16, Li Shao, created
2023-11-22, Li Shao, split from common_detector
2024-02-06, Li Shao, move to csst_msc_instrument
"""
import numpy as np
__all__ = ["subtract_bias", "subtract_dark", ]
def subtract_bias(image: np.ndarray,
image_err: np.ndarray,
bias: np.ndarray,
bias_err: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""
Subtract bias.
Subtract bias.
Parameters
----------
image : numpy.ndarray
The input image to be corrected.
image_err : numpy.ndarray
The uncertainty of input image.
bias : numpy.ndarray
The input bias to be subtracted.
bias_err : numpy.ndarray
The uncertainty of input bias.
Returns
-------
output : np.ndarray
Output corrected image.
output_err : np.ndarray
Output uncertainty map.
"""
output = image - bias
output_err = np.sqrt(image_err ** 2 + bias_err ** 2)
return output, output_err
def subtract_dark(image: np.ndarray,
image_err: np.ndarray,
dark: np.ndarray,
dark_err: np.ndarray,
tdark_image: float | int = 1.0,
) -> tuple[np.ndarray, np.ndarray]:
"""
Subtract dark current.
Subtract dark current.
Parameters
----------
image : numpy.ndarray
The input image to be corrected.
image_err : numpy.ndarray
The uncertainty of input image.
dark : numpy.ndarray
The input dark current image to be subtracted.
dark_err : numpy.ndarray
The uncertainty of input dark current.
tdark_image : float or int, optional
The effective dark current cumulation time of input image. Default value is 1.0.
Returns
-------
output : np.ndarray
Output corrected image.
output_err : np.ndarray
Output uncertainty map.
"""
output = image - dark * tdark_image
output_err = np.sqrt(image_err ** 2 + (dark_err * tdark_image) ** 2)
return output, output_err
"""
Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/crosstalk.py
Name: crosstalk.py
Description: Crosstalk correction.
Author: Tianmeng Zhang (zhangtm@nao.cas.cn)
Created: 2023-11-16
Modified-History:
2023-11-16, Tianmeng Zhang, created
2023-11-22, Li Shao, split from common_detector
2024-02-06, Li Shao, move to csst_msc_instrument
"""
import numpy as np
__all__ = ["remove_crosstalk", ]
def remove_crosstalk(data: np.ndarray,
crosstalk_coe: np.ndarray,
hdu_num: int = 16,
switch: bool = False,
) -> np.ndarray:
"""
Function to remove the crosstalk between different channel.
Function to remove the crosstalk between different channel.
Parameters
----------
data : numpy.ndarray
The input data array which is needed to remove the crosstalk from each hdu, 3D-array.
crosstalk_coe : numpy.ndarray
The coefficients of crosstalk, in size [hdu_num, hdu_num].
hdu_num : int, optional
The number of extension for each detector.
switch : bool, optional
If True, do the crosstalk correction. If False, return original data. Default is False.
Returns
-------
numpy.ndarray
The data array after crosstalk correction, 3D-array.
"""
data_cor = np.copy(data) # backup original data array
if switch:
for i in range(hdu_num):
for j in range(hdu_num):
data_cor[i, :, :] = data[i, :, :] + data[j, :, :] * crosstalk_coe[i, j]
else:
data_cor = data
print('No crosstalk correction, return the original array')
return data_cor
"""
Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/gain.py
Name: gain.py
Description: Gain map related function.
Author: Li Shao (shaoli@nao.cas.cn)
Created: 2023-12-22
Modified-History:
2023-12-22, Li Shao, created
2024-02-06, Li Shao, move to csst_msc_instrument
"""
import numpy as np
from ..config import get_array_config
__all__ = ['apply_gain', 'make_gainmap_from_channel_value', ]
def apply_gain(image: np.ndarray,
gainmap: np.ndarray,
) -> np.ndarray:
"""
Apply gain map.
The gain map is in unit of electrons/ADU. The output image will be in unit of electrons or electrons/s.
Parameters
----------
image : numpy.ndarray
Input image, in unit of ADU or ADU/s.
gainmap : numpy.ndarray
Gain map, in unit of electrons/ADU.
Returns
-------
numpy.ndarray
Output image, in unit of electrons or electrons/s.
"""
return image * gainmap
def make_gainmap_from_channel_value(gain_values: list[float] | tuple[float] | np.ndarray,
array_config_name: str = 'CCDArrayConfig',
image_format: str = 'raw',
) -> np.ndarray:
"""
Make gain map from channel gain values.
Parameters
----------
gain_values : list[float] or tuple[float] or numpy.ndarray
Gain value of each channel.
array_config_name : str, optional
The name of the pixel array configuration class. Default is "CCDArrayConfig".
image_format : str, optional
The output image format: "raw", "cube" or "sky". See config.CCDArrayConfig().
Returns
-------
numpy.ndarray
Output gain map.
"""
aconf = get_array_config(array_config_name)(image_format)
if len(gain_values) != aconf.nchan:
raise ValueError('The length of input gain values does not match with number of channels.')
full_shape = aconf.get_shape(full=True)
output = np.zeros(full_shape, dtype=np.float32)
for i in range(aconf.nchan):
cutout = aconf.get_cutout(i+1, output, rtype='channel') + gain_values[i]
aconf.set_cutout(i+1, output, cutout, rtype='channel')
return output
\ No newline at end of file
"""
Identifier: csst-l1/msc/csst_msc_instrument/csst_msc_instrument/image/overscan.py
Name: overscan.py
Description: Overscan correction and data format conversion.
Author: Li Shao (shaoli@nao.cas.cn)
Created: 2023-11-16
Modified-History:
2023-11-16, Li Shao, created
2023-11-22, Li Shao, split from common_detector
2024-02-06, Li Shao, move to csst_msc_instrument
"""
from typing import Callable
import numpy as np
from scipy.ndimage import median_filter
from scipy.signal import savgol_filter
from scipy.special import erf
from scipy.stats import trim_mean, t, sigmaclip
from astropy.stats import mad_std, sigma_clip
from ..config import get_array_config
__all__ = ["average_overscan", "smooth_overscan", "correct_overscan", ]
def average_overscan(image: np.ndarray,
axis: int | None = None,
cen_method: str | Callable = 'trim_mean',
std_method: str | Callable = 'mad_std',
clip_flag: bool = False,
clip_sigma: float = 3.5,
clip_cenfunc: str | Callable = 'median',
clip_stdfunc: str | Callable = 'std',
) -> tuple[float | np.ndarray, float | np.ndarray]:
"""
Average overscan data along one direction or overall.
Average overscan data along one direction or overall.
Parameters
----------
image : numpy.ndarray
Input image: overscan region cutout.
axis : int, optional
Axis along which the operation is performed. If None (default), use the full array.
cen_method : str or Callable, optional
Algorithm to calculate average value: "trim_mean" (default), "median", "mean" or Callable function.
std_method : str or Callable, optional
Algorithm to calculate standard deviation: "mad_std" (default), "std" or Callable function.
clip_flag : bool, optional
If True, use sigma-clipping.
clip_sigma : float, optional
Clipping threshold for sigma-clipping. Default is 3.5.
clip_cenfunc : str or Callable, optional
Center value function for sigma-clipping: "median" (default), "mean" or Callable function.
clip_stdfunc : str or Callable, optional
Standard deviation function for sigma-clipping: "std" (default), "mad_std" or Callable function.
Returns
-------
avg : numpy.ndarray
Averaged overscan data (one value or 1D array).
err : numpy.ndarray
The uncertainty.
"""
# sigma-clipping
if clip_flag and cen_method != 'trim_mean':
data = sigma_clip(image, sigma=clip_sigma, axis=axis, masked=True,
cenfunc=clip_cenfunc, stdfunc=clip_stdfunc)
if cen_method == 'median':
avg = np.ma.median(data)
elif cen_method == 'mean':
avg = np.ma.mean(data)
else:
avg = cen_method(data)
if std_method == 'mad_std':
err = mad_std(data, axis=axis)
elif std_method == 'std':
err = np.ma.std(data, axis=axis)
else:
err = std_method(data, axis=axis)
# without clipping
else:
if cen_method == 'trim_mean':
avg = trim_mean(image, 0.1, axis=axis)
elif cen_method == 'median':
avg = np.median(image, axis=axis).astype(float)
elif cen_method == 'mean':
avg = np.mean(image, axis=axis)
else:
avg = cen_method(image, axis=axis)
if std_method == 'mad_std':
err = mad_std(image, axis=axis)
elif std_method == 'std':
err = np.std(image, axis=axis)
else:
err = std_method(image, axis=axis)
# uncertainty of the average
ny, nx = image.shape
if axis is None:
n = ny * nx
elif axis == 0:
n = ny
else:
n = nx
if cen_method == 'trim_mean':
n = n * 0.8
err = err / np.sqrt(n) * t.ppf(erf(1), df=n - 1) # uncertainty of the mean
if cen_method == 'median' or cen_method is np.median or cen_method is np.ma.median:
err *= 1.2533
return avg, err
def smooth_overscan(data: np.ndarray,
filter_width: int,
filter_method: str = 'savgol',
filter_polyorder: int = 2,
) -> np.ndarray:
"""
Smooth 1D averaged overscan data.
Smooth 1D averaged overscan data.
Parameters
----------
data : numpy.ndarray
Input 1-D overscan data.
filter_width : int
The width of filter. Must be positive.
filter_method : str, optional
Process algorithm: "savgol" (default) or "median".
filter_polyorder : int, optional
The order of polynomial used for Savitzky-Golay filter.
Returns
-------
numpy.ndarray
Smoothed 1-D overscan data.
"""
if filter_method == 'savgol':
dd = savgol_filter(data, filter_width * 2, filter_polyorder)
diff = data - dd
index = np.abs(diff) < sigmaclip(diff, 5, 5)[0].std() * 5 # clip outliers
x = np.arange(len(data))
dd = np.interp(x, x[index], data[index])
output = savgol_filter(dd, filter_width, filter_polyorder)
else:
output = median_filter(data, size=filter_width)
return output
def correct_overscan(image: np.ndarray,
array_config_name: str = 'CCDArrayConfig',
region_type: str = 'x',
correct_type: str = 'line',
gap: int = 10,
cen_method: str | Callable = 'trim_mean',
std_method: str | Callable = 'mad_std',
clip_flag: bool = False,
clip_sigma: float = 3.5,
clip_cenfunc: str | Callable = 'median',
clip_stdfunc: str | Callable = 'std',
filter_width: int = 0,
filter_method: str = 'savgol',
filter_polyorder: int = 2,
clean_inactive: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
"""
Overscan correction.
Overscan correction. All parameters are optional inputs.
Parameters
----------
image : numpy.ndarray
Input 2-D image.
array_config_name : str, optional
The name of the pixel array configuration class. Default is "CCDArrayConfig".
region_type : str, optional
The type of overscan region. "x" (default) is serial overscan, "y" is parallel overscan.
correct_type : str, optional
Correction type. "line" (default) is correct by row or column. "all" is correct with all pixels.
gap : int, optional
Number of pixels after active pixels not to use for overscan correction.
cen_method : str or Callable, optional
Algorithm to calculate average value: "trim_mean" (default), "median", "mean" or Callable function.
std_method : str or Callable, optional
Algorithm to calculate standard deviation: "mad_std" (default), "std" or Callable function.
clip_flag : bool, optional
If True, use sigma-clipping.
clip_sigma : float, optional
Clipping threshold for sigma-clipping. Default is 3.5.
clip_cenfunc : str or Callable, optional
Center value function for sigma-clipping: "median" (default), "mean" or Callable function.
clip_stdfunc : str or Callable, optional
Standard deviation function for sigma-clipping: "std" (default), "mad_std" or Callable function.
filter_width : int, optional
The width of filter. If smaller than 2, then smoothing will be skipped.
filter_method : str, optional
Filtering algorithm: "savgol" (default) or "median".
filter_polyorder : int, optional
The order of polynomial used for Savitzky-Golay filter.
clean_inactive : bool, optional
If True, all pixels out of active region are set to zero. Default is True.
Returns
-------
output : numpy.ndarray
Overscan corrected image.
output_err : numpy.ndarray
Uncertainty image.
Examples
--------
image_corr, err_corr = correct_overscan(image, region_type='x', correct_type='line')
image_corr, err_corr = correct_overscan(image, region_type='y', correct_type='all', clean_inactive=True)
"""
aconf = get_array_config(array_config_name)('raw')
if correct_type == 'line':
if region_type == 'x':
axis = 1
else:
axis = 0
else:
axis = None
if clean_inactive:
output = np.zeros(image.shape)
else:
output = image.astype(float)
output_err = np.zeros(image.shape)
for i in range(aconf.nchan):
overscan = aconf.get_cutout(i+1, image, rtype='{}_overscan'.format(region_type))
# remove the first pixels (to avoid potential CTI contamination)
if gap > 0:
if region_type == 'x':
overscan = overscan[:, gap:]
else:
overscan = overscan[gap:, :]
# calculate average and uncertainty
avg_value, err_value = average_overscan(overscan, axis=axis,
cen_method=cen_method, std_method=std_method,
clip_flag=clip_flag, clip_sigma=clip_sigma,
clip_cenfunc=clip_cenfunc, clip_stdfunc=clip_stdfunc)
# smooth if required
if filter_width > 1 and axis is not None:
avg_value = smooth_overscan(avg_value, filter_width,
filter_method=filter_method, filter_polyorder=filter_polyorder)
err_value = err_value / np.sqrt(filter_width) * 1.2533
# correct the active region
x1, x2, y1, y2 = aconf.get_boundary(i+1, rtype='active')
nx, ny = x2 - x1, y2 - y1
if axis is None:
output[y1:y2, x1:x2] = image[y1:y2, x1:x2] - avg_value
output_err[y1:y2, x1:x2] = err_value
elif axis == 0:
output[y1:y2, x1:x2] = image[y1:y2, x1:x2] - np.tile(avg_value, ny).reshape((ny, nx))
output_err[y1:y2, x1:x2] = np.tile(err_value, ny).reshape((ny, nx))
else:
output[y1:y2, x1:x2] = image[y1:y2, x1:x2] - np.repeat(avg_value, nx).reshape((ny, nx))
output_err[y1:y2, x1:x2] = np.repeat(err_value, nx).reshape((ny, nx))
return output, output_err
V_INST = '0.0.2 ' / version of instrument correction T_INST = '2023-12-29T04:50:46.710' / timestamp of instrument correction S_INST = 0 / status of instrument correction S_OVSCAN= 0 / status of overscan correction S_GAIN = 0 / status of gain correction R_GAIN = 'file ' / reference gain S_BIAS = 0 / status of bias frame correction R_BIAS = 'file ' / reference bias S_CROSST= 0 / status of crosstalk correction R_CROSST= 'file ' / reference crosstalk S_CTI = 1 / status of CTI correction R_CTI = 'file ' / reference CTI S_DARK = 0 / status of dark frame correction R_DARK = 'file ' / reference dark S_NLIN = 1 / status of non-linear correction R_NLIN = 'file ' / reference non-linear S_SHUT = 1 / status of shutter effect correction R_SHUT = 'file ' / reference shutter effect S_FLAT = 0 / status of flat frame correction R_FLAT = 'file ' / reference flat S_CRS = 1 / status of cosmic rays mask R_CRS = 'deepCR_model' / method and config of cosmic rays mask CRCOUNT = 240466 / cosmic rays pixel counts S_SAT = 1 / status of satellite correction R_SAT = 'file ' / reference satellite correction S_FRINGE= 1 / status of fringe correction R_FRINGE= 'file ' / reference fringe SKY_BKG = 0.0323 / estimated sky background (e-/s per pixel) SKY_RMS = 0.0374 / standard dev of frame background (e-/s) SATURATE= 365.9759 / flux limit of saturated pixel (e-/s) END
import numpy as np
from astropy.io import fits
def array_combine(ndarray, mode="mean") -> np.ndarray:
""" Function to combine 3-D data array
Parameters
----------
ndarray: array, input data cube (3D)
mode: mean, median, sum, mean_clip, median_clip, default is mean
"""
if mode == "median":
array = np.median(ndarray, axis=0)
elif mode == "median_clip":
ndarray = np.sort(ndarray, axis=0)[1:-1]
array = np.median(ndarray, axis=0)
elif mode == "sum":
array = np.sum(ndarray, axis=0)
elif mode == "mean":
array = np.mean(ndarray, axis=0)
elif mode == "mean_clip":
ndarray = np.sort(ndarray, axis=0)[1:-1]
array = np.mean(ndarray, axis=0)
return array
def load_bias(path: str) -> np.ndarray:
with fits.open(path) as hdul:
du = hdul[1].data
du = du.astype(int)
return du
def load_dark(path: str, bias) -> np.ndarray:
with fits.open(path) as hdul:
du = hdul[1].data
hu = hdul[0].header
du = du.astype(int)
du = du - bias
du = du / hu["EXPTIME"]
return du
def load_flat(path: str, bias, dark) -> np.ndarray:
with fits.open(path) as hdul:
du = hdul[1].data
hu = hdul[0].header
du = du.astype(int)
du = du - bias - dark * hu["EXPTIME"]
du = du / hu["EXPTIME"]
du = du / np.median(du)
return du
def combine(func, mode: str, path_list, *args) -> np.ndarray:
du_list = [func(path, *args) for path in path_list]
du = array_combine(du_list, mode)
return du
def combine_images(b_p_lst, d_p_lst, f_p_lst, mode_list=["median", "median", "median", ]):
"""
Parameters
----------
b_p_lst:
List of currently ccd number bias file path
d_p_lst:
List of currently ccd number dark file path
f_p_lst:
List of currently ccd number flat file path
mode_list:
[0] bias combine mode
[1] dark combine mode
[2] flat combine mode
mean, median, sum, mean_clip, median_clip
"""
bias = combine(load_bias, mode_list[0], b_p_lst)
dark = combine(load_dark, mode_list[1], d_p_lst, bias)
flat = combine(load_flat, mode_list[2], f_p_lst, bias, dark)
return bias.copy(), dark.copy(), flat.copy()
import os
from glob import glob
from astropy.io import fits
from L1_pipeline.ref_combine import combine_images
ref_path = "/public/share/yangxuliu/CSSOSDataProductsSims/outputs_cali/"
output_path = "/public/home/fangyuedong/project/calib_data"
def combine_ref_func(ref_path, output_path, num="01"):
bias_path_list = glob(ref_path + '*/CSST_MSC_MS_BIAS_*_' + num + '_*')
dark_path_list = glob(ref_path + '*/CSST_MSC_MS_DARK_*_' + num + '_*')
flat_path_list = glob(ref_path + '*/CSST_MSC_MS_FLAT_*_' + num + '_*')
bias, dark, flat = combine_images(b_p_lst=bias_path_list,
d_p_lst=dark_path_list,
f_p_lst=flat_path_list, )
bias_out_path = os.path.join(output_path, "bias_" + str(num) + ".fits")
dark_out_path = os.path.join(output_path, "dark_" + str(num) + ".fits")
flat_out_path = os.path.join(output_path, "flat_" + str(num) + ".fits")
hdu = fits.PrimaryHDU(bias)
hdu.writeto(bias_out_path)
hdu = fits.PrimaryHDU(dark)
hdu.writeto(dark_out_path)
hdu = fits.PrimaryHDU(flat)
hdu.writeto(flat_out_path)
if __name__ == "__main__":
ref_path = "/public/share/yangxuliu/CSSOSDataProductsSims/outputs_cali/"
output_path = "/public/home/fangyuedong/project/calib_data"
num = '08'
combine_ref_func(
ref_path=ref_path,
output_path=output_path,
num=num
)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment