"""
Back end routines for solid-waffle.
Functions
---------
get_version
Version number of script
get_nside
Function to get array size from format codes.
get_num_slices
Gets the number of time slices in an image cube.
load_segment
Function to load an image segment.
pyIRC_percentile
Routine to get percentile cuts with a mask removed.
pyIRC_mean
Routine to get mean with a mask removed.
ref_corr
Get reference corrections from left & right pixel sets.
ref_array
Get reference corrections from left & right pixel sets for a full list of files.
ref_array_onerow
Similar to ref_array, but for one row being valid. Saves time if you only need that one row.
ref_array_block
Extracts reference pixel data in a specified range of y values.
pixel_data
Generate a 4D date cube containing information on a region of the detector.
gen_nl_cube
Routine to get nonlinearity curve.
compute_gain_corr
Gets the correction to the gain from using the full model versus the beta model.
compute_gain_corr_many
Gets the correction to the gain from using the full model versus the beta model.
compute_xc_corr
Gets the correction to the adjacent-pixel correlation from using the full model versus the beta model.
compute_xc_corr_many
Gets the correction to the adjacent-pixel correlation from using the full model versus the beta model.
gain_alphacorr
Routine to get IPC-corrected gain from properties of a difference image.
gain_alphabetacorr
Get IPC+NL-corrected gain.
basic
Basic characterization of a data cube.
corr_5x5
Extracts 5x5 correlation matrix from light and dark data.
corrstats
Routine to obtain statistical properties of a region of the detector across many time slices.
polychar
Routine to characterize of a region of the detector across many time slices.
bfe
Routines to compute the BFE coefficients.
hotpix
Selects hot pixels.
hotpix_ipc
Return IPC data from a list of hot pixels.
slidemed_percentile
Sliding median function.
get_vmin_vmax
Generates min and max range for a color bar based on inter-quartile range.
Classes
-------
IndexDictionary
Table of indices.
"""
import copy
import sys
import warnings
import asdf
import fitsio
import numpy as np
import scipy
import scipy.ndimage
import scipy.stats
from astropy.io import fits
from scipy.signal import fftconvolve
from .ftsolve import (
decenter,
pad_to_N,
solve_corr,
solve_corr_many,
solve_corr_vis,
)
# <== TESTING PARAMETERS ONLY ==>
#
# [these are false and should only be set to true for debugging purposes]
# <== THESE FUNCTIONS DEPEND ON THE FORMAT OF THE INPUT FILES ==>
# Recall the naming convention for formatpars:
# 1 ... 999 = lab test data
# 1001 ... 1999 = dedicated test configurations
# 2001 ... 2999 = WFI flight-like data (ASDF)
[docs]
def get_version():
"""Version number of script"""
return 37
[docs]
def get_nside(formatpars):
"""
Function to get array size from format codes.
Parameters
----------
formatpars : int
Format code.
Returns
-------
int
The array size (including reference pixels).
Notes
-----
This is 4096 for Roman, but we have codes that enable us to run the script on H2RG data.
"""
# Lab tests
if formatpars == 1:
return 4096
if formatpars == 2:
return 2048
if formatpars == 3:
return 4096
if formatpars == 4:
return 4096
if formatpars == 5:
return 4096
if formatpars == 6:
return 4096
if formatpars == 7:
return 2048
# Test configurations
if formatpars == 1001:
return 512
if formatpars == 2002:
return 512
# asdf files
if formatpars == 2001:
return 4096
# Get number of time slices
[docs]
def get_num_slices(formatpars, filename):
"""
Gets the number of time slices in an image cube.
Parameters
----------
formatpars : int
The format code.
filename : str
The file name.
Returns
-------
int
The number of time slices in that file.
"""
# Switch based on input format
if formatpars == 1 or formatpars == 2 or formatpars == 5 or formatpars == 1001:
hdus = fits.open(filename)
ntslice = int(hdus[0].header["NAXIS3"])
hdus.close()
elif formatpars == 3 or formatpars == 7:
hdus = fits.open(filename)
ntslice = len(hdus) - 1
hdus.close()
elif formatpars == 4 or formatpars == 6:
hdus = fits.open(filename)
ntslice = int(hdus[1].header["NAXIS3"])
hdus.close()
elif formatpars == 2001 or formatpars == 2002:
af = asdf.open(filename)
ntslice = np.shape(af["roman"]["data"])[0]
af.close()
else:
print("Error! Invalid formatpars =", formatpars)
exit()
return ntslice
[docs]
def load_segment(filename, formatpars, xyrange, tslices, verbose):
"""
Function to load an image segment.
Parameters
----------
filename : str
Name of the source FITS file.
formatpars : int
Format code.
xyrange : list of int
List in the form ``[xmin,xmax,ymin,ymax]``.
Numpy convention so the first row & column are 0, and excludes xmax and ymax.
tslices : list of int
Time slices to use (beginning slice is 1).
verbose : bool
Whether to print a lot of outputs.
Returns
-------
np.array of float
A 3D array of the data, shape (len(`tslices`), ymax-ymin, xmax-xmin).
All formats are in "ramp slope negative" format (digital saturation is 0).
Notes
-----
Floating-point return type was chosen instead of the native uint16 so that differences
don't underflow to 65535 minus a small number. But integers are exactly represented.
"""
if verbose:
print("Reading:", filename)
# Recommended True
# (False defaults to astropy tools, which work but are slow because of the way this script works)
use_fitsio = True
# Get dimensions of output cube
nxuse = xyrange[1] - xyrange[0]
nyuse = xyrange[3] - xyrange[2]
ntslice_use = len(tslices)
output_cube = np.zeros((ntslice_use, nyuse, nxuse))
# Switch based on input format
if formatpars == 1 or formatpars == 2 or formatpars == 5 or formatpars == 1001:
if use_fitsio:
fileh = fitsio.FITS(filename)
N = get_nside(formatpars)
for ts in range(ntslice_use):
t = tslices[ts]
if ts > 0 and tslices[ts] == tslices[ts - 1]:
output_cube[ts, :, :] = output_cube[ts - 1, :, :] # don't read this slice again
else:
output_cube[ts, :, :] = 65535 - np.array(
fileh[0][t - 1, xyrange[2] : xyrange[3], xyrange[0] : xyrange[1]]
)
fileh.close()
else:
hdus = fits.open(filename)
in_hdu = hdus[0]
ntslice = in_hdu.data.shape[0]
if verbose:
print("input shape -> ", in_hdu.data.shape)
print("number of slices =", ntslice, ", used =", ntslice_use)
for ts in range(ntslice_use):
t = tslices[ts]
output_cube[ts, :, :] = (
65535 - in_hdu.data[t - 1, xyrange[2] : xyrange[3], xyrange[0] : xyrange[1]]
)
hdus.close()
elif formatpars == 3 or formatpars == 7:
if use_fitsio:
fileh = fitsio.FITS(filename)
N = get_nside(formatpars)
for ts in range(ntslice_use):
t = tslices[ts]
output_cube[ts, :, :] = np.array(fileh[t][xyrange[2] : xyrange[3], xyrange[0] : xyrange[1]])
fileh.close()
else:
print("Error: non-fitsio methods not yet supported for formatpars=3 or 7")
exit()
elif formatpars == 4:
if use_fitsio:
fileh = fitsio.FITS(filename)
N = get_nside(formatpars)
for ts in range(ntslice_use):
t = tslices[ts]
if ts >= 1 and t == tslices[ts - 1]:
output_cube[ts, :, :] = output_cube[ts - 1, :, :] # asked for same slice again
else:
output_cube[ts, :, :] = np.array(
fileh[1][0, t - 1, xyrange[2] : xyrange[3], xyrange[0] : xyrange[1]]
)
fileh.close()
else:
print("Error: non-fitsio methods not yet supported for formatpars=4")
exit()
elif formatpars == 5:
if use_fitsio:
fileh = fitsio.FITS(filename)
N = get_nside(formatpars)
for ts in range(ntslice_use):
t = tslices[ts]
if ts >= 1 and t == tslices[ts - 1]:
output_cube[ts, :, :] = output_cube[ts - 1, :, :] # asked for same slice again
else:
output_cube[ts, :, :] = np.array(
fileh[0][t - 1, xyrange[2] : xyrange[3], xyrange[0] : xyrange[1]]
)
fileh.close()
else:
print("Error: non-fitsio methods not yet supported for formatpars=5")
exit()
elif formatpars == 6:
if use_fitsio:
fileh = fitsio.FITS(filename)
N = get_nside(formatpars) # might need this in the future # noqa: F841
for ts in range(ntslice_use):
t = tslices[ts]
if ts >= 1 and t == tslices[ts - 1]:
output_cube[ts, :, :] = output_cube[ts - 1, :, :] # asked for same slice again
else:
output_cube[ts, :, :] = 65535 - np.array(
fileh[1][0, t - 1, xyrange[2] : xyrange[3], xyrange[0] : xyrange[1]]
)
fileh.close()
else:
print("Error: non-fitsio methods not yet supported for formatpars=6")
exit()
elif formatpars == 2001 or formatpars == 2002:
fileh = asdf.open(filename)
_ = get_nside(formatpars) # might need in the future
for ts in range(ntslice_use):
t = tslices[ts]
if ts >= 1 and t == tslices[ts - 1]:
output_cube[ts, :, :] = output_cube[ts - 1, :, :] # asked for the same slice again
else:
output_cube[ts, :, :] = 65535 - np.array(
fileh["roman"]["data"][t - 1, xyrange[2] : xyrange[3], xyrange[0] : xyrange[1]]
)
fileh.close()
else:
print("Error! Invalid formatpars =", formatpars)
exit()
return output_cube
# <== FUNCTIONS BELOW HERE ARE INDEPENDENT OF THE INPUT FORMAT ==>
[docs]
class IndexDictionary:
"""
Table of indices.
These are designed for consistency with the outputs lists of certain functions.
Parameters
----------
itype : int
Index type (currently only accepts 0).
Attributes
----------
N : int
Number of parameters.
Nb : int
Number of "basic" parameters.
Nbb : int
Number of "basic" + BFE parameters.
p : int, optional
Number of non-linearity coefficients (polynomial order is `p` + 1).
ngood : int
Column number for how many pixels in a sub-pixel are good.
graw : int
Column number for raw gain.
gacorr : int
Column number for IPC-corrected gain.
g : int
Column number for best estimate of the gain.
alphaH : int
Column number for horizontal IPC.
alphaV : int
Column number for vertical IPC.
beta : int
Column number for non-linearity parameter (in inverse electrons).
I : int
Column number for flat intensity in e/p/s.
alphaD : int
Column number for diagonal IPC.
tCH : int
Column number for horizontal pixel-pixel covariance.
tCV : int
Column number for vertical pixel-pixel covariance.
ker0 : int, optional
Column number for the (0,0) position in the BFE kernel.
s : int, optional
Size of BFE kernel (range on both axes is from -`s` to +`s`, inclusive).
Methods
-------
addbfe
Adds columns for BFE kernel.
addhnl
Adds columns for higher-order non-linearity kernel.
Notes
-----
Additional columns:
- If BFE is enabled:
the column numbers for the BFE kernel (dx,dy) are `ker0 + dy*(2s+1) + dx`.
- If non-linearity is enabled:
The column numbers
for the higher-order linearity terms are that the degree ``d`` term is in column
``Nbb + d-2``.
It is not allowed to add non-linearity and then BFE.
"""
def __init__(self, itype):
# basic characterization parameter index list -- outputs for "basic" function
if itype == 0:
self.Nb = 11 # number of basic parameters
#
self.ngood = 0
self.graw = 1
self.gacorr = 2
self.g = 3
self.alphaH = 4
self.alphaV = 5
self.beta = 6
self.I = 7
self.alphaD = 8
self.tCH = 9
self.tCV = 10
#
# polychar indices: output is list [1, ***, residual]
# intermediate terms are basic output [ind1:ind2]
self.ind1 = 3
self.ind2 = 9
self.indp1 = 1
self.indp2 = self.indp1 + self.ind2 - self.ind1
#
self.N = self.Nbb = self.Nb
[docs]
def addbfe(self, s):
"""
Adds columns for the BFE kernel (the size is (2s+1, 2s+1), but flattened).
Parameters
----------
s : int
Size of kernel (dx and dy range from -`s` to +`s`).
Returns
-------
None
"""
self.s = s
self.ker0 = self.Nb + 2 * s * (s + 1) # BFE (0,0) kernel index
self.Nbb += (2 * s + 1) * (2 * s + 1)
self.N += (2 * s + 1) * (2 * s + 1)
if hasattr(self, "p"):
print(self.p)
raise ValueError("Should add BFE columns first, then non-linearity.")
[docs]
def addhnl(self, p):
"""
Adds columns for higher-order non-linearity coefficients (p coefs, for total degree 1+p).
Parameters
----------
p : int
Number of non-linearity coefficients (starts at degree 2 since 0 and 1 are linear,
so the total degree is 1+p).
Returns
-------
None
"""
self.p = p
self.N += p
[docs]
def pyIRC_percentile(this_array, mask, perc, disc=True):
"""
Routine to get percentile cuts with a mask removed.
Parameters
----------
this_array : np.array
The array from which we want percentiles.
mask : np.array of bool or int
The mask (True or >=1 for good pixels, False or <=0 for bad).
Must be the same size as `this_array`. (Usually they are the same
shape, but this is not strictly necessary since they are flattened.)
perc : float
The desired percentile (between 0 and 100).
disc : bool, optional
Interpolate a percentile based on the input data being discrete?
Returns
-------
float
Masked percentile of `this_array`.
"""
val = this_array.flatten()
ma = mask.flatten()
w = np.array([val[x] for x in np.where(ma > 0.5)])
n = np.size(w)
if disc:
ctr = np.percentile(w, perc)
n1 = np.count_nonzero(w < ctr - 0.499999)
n2 = np.count_nonzero(w <= ctr + 0.499999)
assert n1 <= n2
if n1 == n2:
return ctr
dctr = (perc / 100.0 * n - (n1 + n2) / 2.0) / float(n2 - n1)
return ctr + dctr
# w -= np.modf(np.linspace(0,(1.+np.sqrt(5.))/2*(n-1), num=n))[0] - .5
return np.percentile(w, perc)
[docs]
def pyIRC_mean(this_array, mask):
"""
Routine to get mean with a mask removed.
Parameters
----------
this_array : np.array
The array from which we want percentiles.
mask : np.array of bool or int
The mask (True or >=1 for good pixels, False or <=0 for bad).
Must be the same size as `this_array`. (Usually they are the same
shape, but this is not strictly necessary since they are flattened.)
Returns
-------
float
The mean of the masked array.
"""
val = this_array.flatten()
ma = mask.flatten()
w = np.array([val[x] for x in np.where(ma > 0.5)])
return np.mean(w)
[docs]
def ref_corr(filename, formatpars, yrange, tslices, verbose):
"""
Get reference corrections from left & right pixel sets.
Parameters
----------
filename : str
The input file name.
formatpars : int
The input format.
yrange : list of int
The minimum and maximum row number, ``yrange[0]<=y<yrange[1]``.
tslices : list of int
A list of the time slices to use in ascending order (first in the file is 1).
verbose : bool
Whether to talk a lot.
Returns
-------
output_ref : list of np.array
A list of reference pixel corrections in these rows. There are 2*``ntslices``+1
entries, each a 1D numpy array (where ``ntslices`` is the length of `tslices`).
Notes
-----
The contents of `output_ref` are as follows.
In pseudocode where ``S[j]`` refers to the signal in time
slice ``ntslices[j]``:
- In all cases, the first ``ntslices`` elements are the median of the corresponding
time slice, i.e., ``output_ref[j]`` is the median of ``S[j]``.
- The next ``ntslices`` elements are the medians of the difference frames, i.e.,
``output_ref[ntslices+j]`` is the median of the difference of slices
``S[0]-S[j]``.
- If there are at least 2 slices, then the last element is the median of the double difference
``(D[-2]-D[-1]) - (D[0]-D[1])``. Otherwise it is zero.
(This is mainly used to measure curvature of the reference pixel ramp.)
Both the left and right side reference pixels are used and medianed together.
"""
# Side length of the array (needed to find reference pixel indexing)
N = get_nside(formatpars)
# Number of time slices
ntslices = len(tslices)
# Clear list
output_ref = []
# Build arrays of reference pixels
my_array_band = load_segment(filename, formatpars, [0, N] + yrange, tslices, False)
my_array_L = my_array_band[:, :, :4]
my_array_R = my_array_band[:, :, -4:]
# my_array_L = load_segment(filename, formatpars, [0,4]+yrange, tslices, False)
# my_array_R = load_segment(filename, formatpars, [N-4,N]+yrange, tslices, False)
my_array_LR = np.concatenate((my_array_L, my_array_R), axis=2)
if verbose:
print(N, my_array_LR.shape)
for ts in range(ntslices):
output_ref.append(np.median(my_array_LR[ts, :, :]))
for ts in range(ntslices):
diff_array = my_array_LR[0, :, :] - my_array_LR[ts, :, :]
output_ref.append(np.median(diff_array))
if ntslices > 1:
diff_array = (
my_array_LR[ntslices - 2, :, :]
- my_array_LR[ntslices - 1, :, :]
- (my_array_LR[0, :, :] - my_array_LR[1, :, :])
* (tslices[-1] - tslices[-2])
/ float(tslices[1] - tslices[0])
)
output_ref.append(np.median(diff_array))
else:
output_ref.append(0)
return output_ref
[docs]
def ref_array(filelist, formatpars, ny, tslices, verbose):
"""
Get reference corrections from left & right pixel sets for a full list of files.
Parameters
----------
filelist : list of str
The list of input files to use.
formatpars : int
The format parameter.
ny : int
Number of groups to bin the rows for making a list of reference pixels
(should be a power of 2).
tslices : list of int
A list of the time slices to use in ascending order (first in the file is 1).
verbose : bool
Whether to talk a lot.
Returns
-------
output_array : np.array
An array of the reference pixel information, including medians of differences.
The shape is (``num_files``, `ny`, 2*``ntslice_use``+1), where ``num_files`` is the
number of files in `filelist`; and ``ntslice_use`` is the number of time slices.
The indexing is that ``output_array[i,iy,j]`` is the median reference pixel in file
``i``, in row group ``iy``, and in combination of time slices ``j`` (see notes since there
are more than ``ntslice_use`` options for ``j``).
Notes
-----
The combination of time slices (last axis of `output_array`) is as follows.
In pseudocode where ``S[j]`` refers to the signal in time
slice ``ntslices[j]``:
- In all cases, the first ``ntslices`` elements are the median of the corresponding
time slice, i.e., ``output_array[:,:,j]`` is the median of ``S[j]``.
- The next ``ntslices`` elements are the medians of the difference frames, i.e.,
``output_array[:,:,ntslices+j]`` is the median of the difference of slices
``S[0]-S[j]``.
- If there are at least 2 slices, then the last element ``output_array[:,:,-1]``
is the median of the double difference
``(D[-2]-D[-1]) - (D[0]-D[1])``. Otherwise it is zero.
(This is mainly used to measure curvature of the reference pixel ramp.)
"""
num_files = len(filelist)
ntslices = len(tslices)
output_array = np.zeros((num_files, ny, 2 * ntslices + 1))
dy = get_nside(formatpars) // ny
for ifile in range(num_files):
for iy in range(ny):
ymin = dy * iy
ymax = ymin + dy
output_array[ifile, iy, :] = np.asarray(
ref_corr(filelist[ifile], formatpars, [ymin, ymax], tslices, False)
)
if verbose:
print(ifile, iy)
print(output_array[ifile, iy, :])
return output_array
[docs]
def ref_array_onerow(filelist, formatpars, iy, ny, tslices, verbose):
"""
Similar to ref_array, but for one row being valid. Saves time if you only need that one row.
**Only use this function if you are absolutely sure you don't need the other rows!**
Parameters
----------
filelist : list of str
The list of input files to use.
formatpars : int
The format parameter.
iy : int
Which row group (between 0 and `ny`-1) needs to be good.
ny : int
Number of groups to bin the rows for making a list of reference pixels
(should be a power of 2).
tslices : list of int
A list of the time slices to use in ascending order (first in the file is 1).
verbose : bool
Whether to talk a lot.
Returns
-------
output_array : np.array
An array of the reference pixel information, including medians of differences.
See Also
--------
ref_array
See for description of the `output_array` (with the warning that only that one
row group is going to be good).
"""
num_files = len(filelist)
ntslices = len(tslices)
output_array = np.zeros((num_files, ny, 2 * ntslices + 1))
dy = get_nside(formatpars) // ny
for ifile in range(num_files):
ymin = dy * iy
ymax = ymin + dy
output_array[ifile, iy, :] = np.asarray(
ref_corr(filelist[ifile], formatpars, [ymin, ymax], tslices, False)
)
if verbose:
print(ifile, iy)
print(output_array[ifile, iy, :])
return output_array
[docs]
def ref_array_block(filelist, formatpars, yrange, tslices, verbose):
"""
Extracts reference pixel data in a specified range of y values.
Parameters
----------
filelist : list of str
The list of input files to use.
formatpars : int
The format parameter.
yrange : list or tuple of int
Which row range to extract; length 2 ( = ymin, ymax ), with usual Python
convention (extracts ymin<=y<ymax).
tslices : list of int
A list of the time slices to use in ascending order (first in the file is 1).
verbose : bool
Whether to talk a lot.
Returns
-------
output_array : np.array
A 2D array, shape (``num_files``, 2*``ntslices``+1).
The indexing is that ``output_array[i,j]`` is the median reference pixel in file
``i``, and in combination of time slices ``j`` (see notes since there
are more than ``ntslice_use`` options for ``j``).
Notes
-----
The combination of time slices (last axis of `output_array`) is as follows.
In pseudocode where ``S[j]`` refers to the signal in time
slice ``ntslices[j]``:
- In all cases, the first ``ntslices`` elements are the median of the corresponding
time slice, i.e., ``output_array[:,j]`` is the median of ``S[j]``.
- The next ``ntslices`` elements are the medians of the difference frames, i.e.,
``output_array[:,ntslices+j]`` is the median of the difference of slices
``S[0]-S[j]``.
- If there are at least 2 slices, then the last element ``output_array[:,-1]``
is the median of the double difference
``(D[-2]-D[-1]) - (D[0]-D[1])``. Otherwise it is zero.
(This is mainly used to measure curvature of the reference pixel ramp.)
"""
num_files = len(filelist)
ntslices = len(tslices)
output_array = np.zeros((num_files, 2 * ntslices + 1))
if len(yrange) < 2:
print("Error in ref_array_block: yrange =", yrange)
exit()
for ifile in range(num_files):
ymin = yrange[0]
ymax = yrange[1]
output_array[ifile, :] = np.asarray(
ref_corr(filelist[ifile], formatpars, [ymin, ymax], tslices, False)
)
if verbose:
print(ifile)
print(output_array[ifile, :])
return output_array
[docs]
def pixel_data(filelist, formatpars, xyrange, tslices, maskinfo, verbose):
"""
Generate a 4D date cube containing information on a region of the detector.
Parameters
----------
filelist : list of str
A list of the input file names.
formatpars : int
Which input format type to use.
xyrange : list of int
The rectangular region to pull, in [xmin,xmax,ymin,ymax] format.
Python format, i.e., the first row and column are zero, and xmax and ymax are not included.
tslices : list of int
The time slices to use (first time slice is 1).
maskinfo : list or tuple
A list-like object with at least 2 elements, [``cut_offset``, ``do_mask``]. Here ``cut_offset``
is the range around median to accept (default: 0.1, must be within 10% of median); and
``do_mask`` is a boolean on whether to do the masking.
If we don't have at least 2 elements, defaults to [0.1, True].
verbose : bool
Whether to print lots of information.
Returns
-------
output_array : np.array
A 4D array. The shape is (``num_files``+1, ``ntslices``, dy, dx),
where dy=ymax-ymin and dx=xmax-xmin are the sizes of the regions on the detector;
``ntslices`` is the number of time slices requested (length of `tslice`); and ``num_files``
is the number of files (length of `filelist`). The last slice, ``output_array[-1,:,:,:]``
is the mask (good=True).
"""
# Masking parameters
cut_offset = 0.1
if len(maskinfo) >= 1:
cut_offset = maskinfo[0]
do_mask = True
if len(maskinfo) >= 2:
do_mask = maskinfo[1]
num_files = len(filelist)
ntslices = len(tslices)
output_array = np.zeros((num_files + 1, ntslices, xyrange[3] - xyrange[2], xyrange[1] - xyrange[0]))
for ifile in range(num_files):
output_array[ifile, :, :, :] = load_segment(filelist[ifile], formatpars, xyrange, tslices, verbose)
# Generate mean CDS image and consider the median
mCDS = np.mean(output_array[0:num_files, 0, :, :], axis=0) - np.mean(
output_array[0:num_files, -1, :, :], axis=0
)
mCDS_med = np.median(mCDS)
if do_mask:
a = (1.0 / mCDS_med) * mCDS
goodmap = np.where(np.logical_and(a > 1 - cut_offset, a < 1 + cut_offset), 1, 0)
else:
goodmap = np.ones_like(mCDS)
for f in range(num_files):
for t in range(ntslices):
goodmap *= np.where(output_array[f, t, :, :] > 0, 1, 0)
if verbose:
print("Median =", mCDS_med, "cut_offset =", cut_offset)
print(goodmap)
print(goodmap.shape)
# Copy map of good pixels into the output
for t in range(ntslices):
output_array[num_files, t, :, :] = goodmap
return output_array
[docs]
def gen_nl_cube(filelist, formatpars, timeslice, ngrid, Ib, usemode, swi, verbose):
"""
Routine to get nonlinearity curve.
Parameters
----------
filelist : list of str
A list of the input file names.
formatpars : int
Which input format type to use.
timeslice : int or list of int
Which samples to use. If a list, uses ``timeslice[1]`` through ``timeslice[2]``,
assuming reset at time
``timeslice[0]``. If an integer, does time slices 1 ... `timeslice`,
assuming reset at slice 0.
ngrid : list or tuple of int
Number of cells on each axis; length 2, y first:``[ny,nx]`` or ``(ny,nx)``.
Ib : variable
Deprecated; can pass anything.
usemode : str
Either ``'dev'`` (deviation from beta fit) or ``'abs'`` (absolute -- zero of time is absolute).
swi : class
Column table.
verbose : bool
Whether to talk a lot.
Returns
-------
output_array : np.array
Reference corrected signal in DN; shape = (``nt``, ``ny``, ``nx``), where ``nt``
is the number of time slices requested.
This is the median within a file, and then we take the mean across files.
fit_array : np.array
The polynomial fit in DN; shape = (``nt``, ``ny``, ``nx``).
Would be equal to `output_array` if the fit is perfect.
deriv_array : np.array
The derivative of the polynomial fit in DN/frame; shape = (``nt``, ``ny``, ``nx``).
coefs_array : np.array, optional
The polynomial coefficients for the ramps; shape (``order``+1, ``ny``, ``nx``).
Order is ascending, i.e., constant term is ``coefs_array[0,:,:]``, then the linear
term is ``coefs_array[1,:,:]``, etc. Only returned if `usemode` is ``'abs'``.
"""
# Extract basic information
nfiles = len(filelist)
nx = ngrid[1]
ny = ngrid[0]
N = get_nside(formatpars)
dx = N // nx
dy = N // ny
# Check whether we have a list or single slice
if isinstance(timeslice, list):
tref = timeslice[0]
tmin = timeslice[1]
tmax = timeslice[2]
nt = tmax - tmin + 1
else:
tref = 0
tmin = 1
nt = tmax = timeslice
output_array = np.zeros((nt, ny, nx))
temp_array = np.zeros((nfiles, ny, nx))
# order of polynomial fit per pixel
my_order = 5
if usemode == "abs":
my_order = swi.p
if verbose:
print("Nonlinear cube:")
sys.stdout.write(" reference pixel extraction ...")
sys.stdout.flush()
# Extract reference information
# Now ref_signal[ifile, iy, it] contains it_th time slice of group iy of ref pixels in file ifile
ref_signal = ref_array(filelist, formatpars, ny, range(tmin, tmax + 1), False)
if verbose:
print(" done.")
sys.stdout.write("Time slices:")
sys.stdout.flush()
# Now loop over times
for t in range(tmin, tmax + 1):
temp_array[:, :, :] = 0.0
if verbose:
sys.stdout.write(f" {t:2d}")
sys.stdout.flush()
for ifile in range(nfiles):
val = load_segment(filelist[ifile], formatpars, [0, N, 0, N], [1, t], False) # make 2D array
valc = val[1, :, :] - val[0, :, :]
for iy in range(ny):
for ix in range(nx):
temp_array[ifile, iy, ix] = np.median(
valc[dy * iy : dy * (iy + 1), dx * ix : dx * (ix + 1)]
) - (ref_signal[ifile, iy, t - tmin] - ref_signal[ifile, iy, 0])
output_array[t - tmin, :, :] = -np.mean(temp_array, axis=0)
# <-- note: we flipped the sign so that the signal is positive
if verbose:
print("")
# Make fit and derivatives
coefs_array = np.zeros((my_order + 1, ny, nx))
fit_array = np.zeros_like(output_array)
deriv_array = np.zeros_like(output_array)
for iy in range(ny):
for ix in range(nx):
p = np.poly1d(
np.polyfit(np.asarray(range(tmin - tref, tmax + 1 - tref)), output_array[:, iy, ix], my_order)
)
q = np.poly1d.deriv(p)
fit_array[:, iy, ix] = p(range(tmin - tref, tmax + 1 - tref))
deriv_array[:, iy, ix] = q(range(tmin - tref, tmax + 1 - tref))
coefs_array[: p.order + 1, iy, ix] = p.c[::-1]
if usemode == "dev":
return output_array, fit_array, deriv_array
else:
return output_array, fit_array, deriv_array, coefs_array
[docs]
def compute_gain_corr(fit_array, deriv_array, Ib, tslices, reset_frame):
"""
Gets the correction to the gain from using the full model versus the beta model.
Parameters
----------
fit_array : np.array of float
Signal in DN for true curve (length ``tmax`` array, starting with frame 1).
deriv_array : np.array of float
Signal rate in DN/frame (length ``tmax`` array, starting with frame 1).
Ib : float
The charge per frame times beta (unitless).
tslices : list of int
Length 3 list: the time slices used for the quadratic fit in determining beta.
reset_frame : int or float
Reset frame (ideally int, but float would be OK if you want to represent an
imperfect reset as we observe in Roman detectors).
Returns
-------
float
The ractional gain error, log( gain[full NL] / gain[est. quad.]),
caused by using a beta-model for the nonlinearity curve instead of the full curve.
See Also
--------
compute_gain_corr_many : Similar but for multiple superpixels at once.
"""
# unpack time information
ta = tslices[0] - reset_frame
tb = tslices[1] - reset_frame
td = tslices[2] - reset_frame
# indices
ina = tslices[0] - 1
inb = tslices[1] - 1
ind = tslices[2] - 1
# We want the nonlinearity corrections (mean[td]-mean[tb])/(var[tad]-var[tab])
# which, for a non-linearity curve f(t) with f(0)=0, f'(0)=1, is
# [f(td)-f(tb)] / { [ta*(f'(td)-f'(ta))^2 + tad*f'(td)^2] - [ta*(f'(tb)-f'(ta))^2 + tab*f'(tb)^2] }
# = [f(td)-f(tb)] / [ ta*(f'(td)^2-f'(tb)^2 - 2f'(ta)*(f'(td)-f'(tb)) )
# + td*f'(td)^2 - ta*f'(td)^2 - tb*f'(tb)^2 + ta*f'(tb)^2 ]
# = [f(td)-f(tb)] / [ -2 ta f'(ta) (f'(td)-f'(tb)) + td f'(td)^2 - tb f'(tb)^2 ]
#
# Let us call that factor e^{epsilon}
# Now for the beta-model, f(t) = t - Ib * t^2
# so e^{epsilon} = tbd [1 - Ib (tb+td) ] / [
# 4 Ib ta tbd (1 - 2 Ib ta) + td (1 - 2 Ib td)^2 - tb (1 - 2 Ib tb)^2 ]
# = [1 - Ib (tb+td) ] / [
# 4 Ib ta (1 - 2 Ib ta) + 1 - 4 Ib (tb+td) + Ib^2 (td^2 + tb td + tb^2) ]
# to lowest order in Ib (what is in the notes):
# epsilon ~ Ib (-4ta+3tb+3td)
true_expepsilon = (fit_array[ind] - fit_array[inb]) / (
-2 * ta * deriv_array[ina] * (deriv_array[ind] - deriv_array[inb])
+ td * deriv_array[ind] ** 2
- tb * deriv_array[inb] ** 2
)
return np.log(true_expepsilon * deriv_array[0]) - Ib * (-4.0 * ta + 3.0 * tb + 3.0 * td)
[docs]
def compute_gain_corr_many(fit_array, deriv_array, Ib, tslices, reset_frame, is_good):
"""
Gets the correction to the gain from using the full model versus the beta model.
This version works for many superpixels.
Parameters
----------
fit_array : np.array of float
Signal in DN for true curve. Shape (``tmax``, ``ny``, ``nx``),
where the first axis corresponds to the time stamp and the others are superpixel
indices. Time stamps start at frame 1.
deriv_array : np.array of float
Signal rate in DN/frame, same shape as `fit_array`.
Ib : np.array of float
The charge per frame times beta (unitless). This is a 2D array, shape (``ny``, ``nx``).
tslices : list of int
Length 3 list: the time slices used for the quadratic fit in determining beta.
reset_frame : int or float
Reset frame (ideally int, but float would be OK if you want to represent an
imperfect reset as we observe in Roman detectors).
is_good : np.array of int or bool
Mask array, shape (``ny``, ``nx``), True or 1 indicates good pixels,
False or 0 indicates bad.
Returns
-------
np.array of float
The ractional gain error, log( gain[full NL] / gain[est. quad.]),
caused by using a beta-model for the nonlinearity curve instead of the full curve.
This is 2D with all the superpixels, shape (``ny``, ``nx``).
See Also
--------
compute_gain_corr : Similar but for only one superpixel.
"""
out_array = np.zeros_like(fit_array[0, :, :])
ny = np.shape(fit_array)[1]
nx = np.shape(fit_array)[2]
for iy in range(ny):
for ix in range(nx):
if is_good[iy, ix] > 0.5:
out_array[iy, ix] = compute_gain_corr(
fit_array[:, iy, ix], deriv_array[:, iy, ix], Ib[iy, ix], tslices, reset_frame
)
return out_array
[docs]
def compute_xc_corr(fit_array, deriv_array, Ib, tslices, reset_frame):
"""
Gets the correction to the adjacent-pixel correlation from using the full model versus the beta model.
Parameters
----------
fit_array : np.array of float
Signal in DN for true curve (length ``tmax`` array, starting with frame 1).
deriv_array : np.array of float
Signal rate in DN/frame (length ``tmax`` array, starting with frame 1).
Ib : float
The charge per frame times beta (unitless).
tslices : list of int
Length 3 list: the time slices used for the quadratic fit in determining beta.
reset_frame : int or float
Reset frame (ideally int, but float would be OK if you want to represent an
imperfect reset as we observe in Roman detectors).
Returns
-------
float
The correction to the adjacent-pixel correlation,
(full correlation) / (beta model correlation)
caused by using a beta-model for the nonlinearity curve instead of the full curve.
See Also
--------
compute_xc_corr_many : Similar but for multiple superpixels at once.
"""
# unpack time information
ta = tslices[0] - reset_frame
tb = tslices[1] - reset_frame
# indices
ina = tslices[0] - 1
inb = tslices[1] - 1
# We want the correction
# f'(tb)^2 + ta/tab * (f'(tb)-f'(ta)) - (1 - 4 Ib tb)
return (
deriv_array[inb] ** 2 - ta / (tb - ta) * (deriv_array[inb] - deriv_array[ina]) ** 2
) / deriv_array[0] ** 2 - (1.0 - 4 * Ib * tb)
[docs]
def compute_xc_corr_many(fit_array, deriv_array, Ib, tslices, reset_frame, is_good):
"""
Gets the correction to the adjacent-pixel correlation from using the full model versus the beta model.
This version works for many superpixels.
Parameters
----------
fit_array : np.array of float
Signal in DN for true curve. Shape (``tmax``, ``ny``, ``nx``),
where the first axis corresponds to the time stamp and the others are superpixel
indices. Time stamps start at frame 1.
deriv_array : np.array of float
Signal rate in DN/frame, same shape as `fit_array`.
Ib : np.array of float
The charge per frame times beta (unitless). This is a 2D array, shape (``ny``, ``nx``).
tslices : list of int
Length 3 list: the time slices used for the quadratic fit in determining beta.
reset_frame : int or float
Reset frame (ideally int, but float would be OK if you want to represent an
imperfect reset as we observe in Roman detectors).
is_good : np.array of int or bool
Mask array, shape (``ny``, ``nx``), True or 1 indicates good pixels,
False or 0 indicates bad.
Returns
-------
np.array of float
The ractional gain error, log( gain[full NL] / gain[est. quad.]),
caused by using a beta-model for the nonlinearity curve instead of the full curve.
This is 2D with all the superpixels, shape (``ny``, ``nx``).
See Also
--------
compute_xc_corr_many : Similar but for only one superpixel.
"""
out_array = np.zeros_like(fit_array[0, :, :])
ny = np.shape(fit_array)[1]
nx = np.shape(fit_array)[2]
for iy in range(ny):
for ix in range(nx):
if is_good[iy, ix] > 0.5:
out_array[iy, ix] = compute_xc_corr(
fit_array[:, iy, ix], deriv_array[:, iy, ix], Ib[iy, ix], tslices, reset_frame
)
return out_array
[docs]
def gain_alphacorr(graw, CH, CV, signal):
"""
Routine to get IPC-corrected gain from properties of a difference image.
Parameters
----------
graw : float
Uncorrected gain (e/DN)
CH : float
Horizontal correlation (DN^2)
CV : float
Vertical correlation (DN^2)
signal : float
Signal in this ramp (DN)
Returns
-------
list of float
If successful, returns a list of [gain, alphaH, alphaV]
(with gain alpha-corrected and in e/DN).
Returns an empty list if failed.
"""
g = graw
for _ in range(100):
alphaH = CH * g / (2 * signal)
alphaV = CV * g / (2 * signal)
if alphaH + alphaV > 0.25:
return [] # FAIL!
g = graw * ((1 - 2 * (alphaH + alphaV)) ** 2 + 2 * (alphaH**2 + alphaV**2))
return [g, alphaH, alphaV]
# Routine to get IPC+NL-corrected gain
#
# Inputs:
# graw = uncorrected gain (e/DN)
# CH = horizontal correlation (DN^2)
# CV = vertical correlation (DN^2)
# signal = signal in this ramp (DN)
# frac_dslope = mean signal rate in (cd) / mean signal rate in (ab) - 1
# times = list of times [a,b,c,d] used, normalized to reference slice
#
# Output list:
# gain g (alpha corr), e/DN
# alphaH
# alphaV
# beta
# current I (electrons per time slice)
#
# returns [] if failed
[docs]
def gain_alphabetacorr(graw, CH, CV, signal, frac_dslope, times):
"""
Get IPC+NL-corrected gain.
Parameters
----------
graw : float
Uncorrected gain (e/DN)
CH : float
Horizontal correlation (DN^2)
CV : float
Vertical correlation (DN^2)
signal : float
Signal in this ramp (DN)
frac_dslope : float
Signal ratio for non-linearity, S_{cd}/S_{ab}-1 (unitless).
times : list of int
The time slices to use (length 4: [ta, tb, tc, td]).
Returns
-------
list of float
If successful, returns a list of [gain, alphaH, alphaV, beta, current]
(with gain corrected and in e/DN; beta in 1/e; and current in e/frame).
Returns an empty list if failed.
Notes
-----
This is solving the following set of equations
(see Hirata's brighter-fatter effect paper)::
# in pseudocode
graw = g * [ 1 + beta I (3tb+3td-4ta) ] / [ (1-4alpha)^2 + 2alphaH^2 + 2alphaV^2 ]
CH = (2 I tad alphaH / g^2) [ 1 - 4alpha - 4 beta I td ]
CV = (2 I tad alphaV / g^2) [ 1 - 4alpha - 4 beta I td ]
signal = I tad [ 1 - beta I (ta+td) ] / g
frac_dslope = - beta I (tc+td-ta-tb)
"""
# Initial guess
g = graw
alpha = alphaH = alphaV = beta = 0
I_ = signal * g / (times[3] - times[0])
# Iterate
# (100 iterations is overkill for this problem if alpha and beta are small)
for _ in range(100):
g = (
graw
* ((1 - 4 * alpha) ** 2 + 2 * (alphaH**2 + alphaV**2))
/ (1 + beta * I_ * (3 * (times[1] + times[3]) - 4 * times[0]))
)
if g < 1e-3:
print("Gain did not converge")
print("IN:", graw, CH, CV, signal, frac_dslope, times)
print("STATUS:", g, alphaH, alphaV, alpha, I_, beta)
raise RuntimeError("Gain did not converge")
temp = (1 - 4 * alpha - 4 * beta * I_ * times[3]) * 2 * I_ * (times[3] - times[0]) / g**2
alphaH = CH / temp
alphaV = CV / temp
if alphaH + alphaV > 0.25:
return [] # FAIL!
alpha = (alphaH + alphaV) / 2.0
I_ = signal * g / (times[3] - times[0]) / (1 - beta * I_ * (times[3] + times[0]))
beta = -frac_dslope / I_ / (times[2] + times[3] - times[0] - times[1])
if np.fabs(beta) * I_ * (times[3] + times[0]) > 0.5:
return [] # FAIL!
return [g, alphaH, alphaV, beta, I_]
[docs]
def basic(region_cube, dark_cube, tslices, lightref, darkref, ctrl_pars, verbose):
"""
Basic characterization of a data cube.
Parameters
----------
region_cube : np.array
4D array of the region of interest. shape: (num_files+1, nt, dy, dx),
where num_files is the number of files, nt is the number of time slices, and
(dy, dx) is the shape of the region on the SCA used. The last slice, ``region_cube[-1,:,:,:]``,
is the mask (1 for good, 0 for bad).
dark_cube : np.array
Like `region_cube`, but for the darks. It is optional whether there is a separate mask
(it isn't used, so it is OK if the first axis has length num_files or num_files+1).
tslices : list of int
List of the time slice numbers; length ``nt``.
lightref : np.array
Reference pixel table for correcting light exposures. shape = (num_files, 2*nt+1);
the way the time axis is managed is described in :func:`ref_corr`.
darkref : np.array
Similar to `lightref`, but for the dark exposures.
ctrl_pars : class
Contains the control parameters as attributes (see Notes).
verbose : bool
Whether to print lots of information.
Returns
-------
list
The basic calibration parameters. The return information depends on whether ``ctrl_pars.full_corr``
is True or False::
# True (default):
[number of good pixels, gain_raw, gain_acorr, gain_abcorr, aH, aV, beta, I, 0., tCH, tCV]
# False:
[number of good pixels, median, variance, tCH, tCV, tCD]
Returns the null list [] if failed.
Notes
-----
The `ctrl_pars` class contains the following attributes:
- ``epsilon`` : float
Fraction of data points to cut for computing correlations (default 0.01)
- ``subtr_corr`` : bool
Do mean subtraction for the IPC correlation? (default to True)
- ``noise_corr`` : bool
Do noise subtraction for the IPC correlation? (default to True)
- ``reset_frame`` : int
Reset frame (default to 0)
- ``subtr_href`` : bool
Horizontal reference pixel subtraction? (default to True)
- ``full_corr`` : bool
Which parameters to report? (default to True = standard basic pars; False = correlation data instead)
- ``leadtrailSub`` : bool
Perform lead-trail subtraction? (default to False)
- ``g_ptile`` : float
Percentile for inter-quantile range (default to 75)
This includes a test so this won't crash if tslices[1]>=tslices[-1] but returns meaningful
cross-correlation C_{abab} (everything else is nonsense in this case).
"""
# Settings:
newMeanSubMethod = True # use False only for test/debug
leadtrailSub = True # subtract leading & trailing (by +/-4 pix) from horiz & vert correlations
g_ptile = 75.0 # percentile use for inter-quantile range for variance (default: 75, giving standard IQR)
# Extract basic parameters
num_files = region_cube.shape[0] - 1
nt = region_cube.shape[1]
dy = region_cube.shape[2]
dx = region_cube.shape[3]
# npix = dx * dy
if nt != len(tslices):
print("Error in pyirc.basic: incompatible number of time slices")
exit()
if verbose:
print("nfiles = ", num_files, ", ntimes = ", nt, ", dx,dy=", dx, dy)
treset = 0
if hasattr(ctrl_pars, "reset_frame"):
treset = ctrl_pars.reset_frame
# First get correlation parameters
epsilon = 0.01
if hasattr(ctrl_pars, "epsilon"):
epsilon = ctrl_pars.epsilon
subtr_corr = True
if hasattr(ctrl_pars, "subtr_corr"):
subtr_corr = ctrl_pars.subtr_corr
noise_corr = True
if hasattr(ctrl_pars, "noise_corr"):
noise_corr = ctrl_pars.noise_corr
if verbose:
print("corr pars =", epsilon, subtr_corr, noise_corr)
#
# Reference pixel subtraction?
subtr_href = True
if hasattr(ctrl_pars, "subtr_href"):
subtr_href = ctrl_pars.subtr_href
# return full correlation information?
full_corr = True
if hasattr(ctrl_pars, "full_corr"):
full_corr = ctrl_pars.full_corr
# lead-trail subtraction for IPC correlations?
if hasattr(ctrl_pars, "leadtrailSub"):
leadtrailSub = ctrl_pars.leadtrailSub
# quantile for variance?
if hasattr(ctrl_pars, "g_ptile"):
g_ptile = ctrl_pars.g_ptile
# Get means and variances at the early and last slices
# (i.e. 1-point information)
gauss_iqr_in_sigmas = scipy.stats.norm.ppf(g_ptile / 100.0) * 2 # about 1.349 for g_ptile=75.
box1 = region_cube[0:num_files, 0, :, :] - region_cube[0:num_files, 1, :, :]
box2 = region_cube[0:num_files, 0, :, :] - region_cube[0:num_files, -1, :, :]
box2Noise = dark_cube[0:num_files, 0, :, :] - dark_cube[0:num_files, -1, :, :]
#
if subtr_href:
for f in range(num_files):
if verbose:
print(
"lightref.shape=",
lightref.shape,
"subtr ->",
lightref[f, nt + 1],
lightref[f, 2 * nt - 1],
darkref[f, 2 * nt - 1],
)
box1[f, :, :] -= lightref[f, nt + 1]
box2[f, :, :] -= lightref[f, 2 * nt - 1]
box2Noise[f, :, :] -= darkref[f, 2 * nt - 1]
mean1 = np.mean(box1, axis=0)
mean2 = np.mean(box2, axis=0)
med1 = np.median(mean1)
med2 = np.median(mean2)
var1 = 0
var2 = 0
corr_mask = region_cube[-1, 0, :, :]
for if1 in range(1, num_files):
for if2 in range(if1):
temp_box = box1[if1, :, :] - box1[if2, :, :]
iqr1 = pyIRC_percentile(temp_box, corr_mask, g_ptile) - pyIRC_percentile(
temp_box, corr_mask, 100 - g_ptile
)
temp_box = box2[if1, :, :] - box2[if2, :, :]
iqr2 = pyIRC_percentile(temp_box, corr_mask, g_ptile) - pyIRC_percentile(
temp_box, corr_mask, 100 - g_ptile
)
var1 += (iqr1 / gauss_iqr_in_sigmas) ** 2 / 2.0
var2 += (iqr2 / gauss_iqr_in_sigmas) ** 2 / 2.0
if verbose:
print("Inner loop,", if1, if2, temp_box.shape)
var1 /= num_files * (num_files - 1) / 2.0
var2 /= num_files * (num_files - 1) / 2.0
if var2 <= var1 and tslices[1] < tslices[-1]:
return [] # FAIL!
gain_raw = (med2 - med1) / (var2 - var1 + 1e-100) # in e/DN
# 1e-100 does nothing except to prevent an error when var1 and var2 are exactly the same
# Correlations of neighboring pixels, in DN^2
#
tCH = tCV = tCD = 0
for if1 in range(1, num_files):
for if2 in range(if1):
temp_box = box2[if1, :, :] - box2[if2, :, :]
# Run through twice if we have noise, otherwise once
nrun = 2 if noise_corr else 1
for icorr in range(nrun):
# clipping
cmin = pyIRC_percentile(temp_box, corr_mask, 100 * epsilon)
cmax = pyIRC_percentile(temp_box, corr_mask, 100 * (1 - epsilon))
this_mask = np.where(np.logical_and(temp_box > cmin, temp_box < cmax), 1, 0) * corr_mask
if np.sum(this_mask) < 1:
return [] # FAIL!
# mean subtraction
mean_of_temp_box = np.sum(temp_box * this_mask) / np.sum(this_mask)
if subtr_corr and newMeanSubMethod:
temp_box -= mean_of_temp_box
# Correlations in horizontal and vertical directions
maskCV = np.sum(this_mask[:-1, :] * this_mask[1:, :])
maskCH = np.sum(this_mask[:, :-1] * this_mask[:, 1:])
CV = np.sum(this_mask[:-1, :] * this_mask[1:, :] * temp_box[:-1, :] * temp_box[1:, :])
CH = np.sum(this_mask[:, :-1] * this_mask[:, 1:] * temp_box[:, :-1] * temp_box[:, 1:])
if maskCH < 1 or maskCV < 1:
return []
CH /= maskCH
CV /= maskCV
# diagonal directions
if not full_corr:
maskCD1 = np.sum(this_mask[:-1, :-1] * this_mask[1:, 1:])
maskCD2 = np.sum(this_mask[:-1, 1:] * this_mask[1:, :-1])
CD1 = np.sum(
this_mask[:-1, :-1] * this_mask[1:, 1:] * temp_box[:-1, :-1] * temp_box[1:, 1:]
)
CD2 = np.sum(
this_mask[:-1, 1:] * this_mask[1:, :-1] * temp_box[:-1, 1:] * temp_box[1:, :-1]
)
if maskCD1 < 1 or maskCD2 < 1:
return []
CD1 /= maskCD1
CD2 /= maskCD2
CD = (CD1 + CD2) / 2.0
if leadtrailSub:
maskCVx1 = np.sum(this_mask[:-1, :-4] * this_mask[1:, 4:])
maskCHx1 = np.sum(this_mask[:, :-5] * this_mask[:, 5:])
CVx1 = np.sum(
this_mask[:-1, :-4] * this_mask[1:, 4:] * temp_box[:-1, :-4] * temp_box[1:, 4:]
)
CHx1 = np.sum(this_mask[:, :-5] * this_mask[:, 5:] * temp_box[:, :-5] * temp_box[:, 5:])
if maskCHx1 < 1 or maskCVx1 < 1:
return []
CHx1 /= maskCHx1
CVx1 /= maskCVx1
maskCVx2 = np.sum(this_mask[:-1, 4:] * this_mask[1:, :-4])
maskCHx2 = np.sum(this_mask[:, :-3] * this_mask[:, 3:])
CVx2 = np.sum(
this_mask[:-1, 4:] * this_mask[1:, :-4] * temp_box[:-1, 4:] * temp_box[1:, :-4]
)
CHx2 = np.sum(this_mask[:, :-3] * this_mask[:, 3:] * temp_box[:, :-3] * temp_box[:, 3:])
if maskCHx2 < 1 or maskCVx2 < 1:
return []
CHx2 /= maskCHx2
CVx2 /= maskCVx2
CH -= (CHx1 + CHx2) / 2.0
CV -= (CVx1 + CVx2) / 2.0
#
# correction of the diagonal directions
if not full_corr:
maskCDx1 = np.sum(this_mask[:-1, :-5] * this_mask[1:, 5:])
maskCDx2 = np.sum(this_mask[:-1, :-3] * this_mask[1:, 3:])
maskCDx3 = np.sum(this_mask[1:, :-5] * this_mask[:-1, 5:])
maskCDx4 = np.sum(this_mask[1:, :-3] * this_mask[:-1, 3:])
CDx1 = np.sum(
this_mask[:-1, :-5] * this_mask[1:, 5:] * temp_box[:-1, :-5] * temp_box[1:, 5:]
)
CDx2 = np.sum(
this_mask[:-1, :-3] * this_mask[1:, 3:] * temp_box[:-1, :-3] * temp_box[1:, 3:]
)
CDx3 = np.sum(
this_mask[1:, :-5] * this_mask[:-1, 5:] * temp_box[1:, :-5] * temp_box[1:, 5:]
)
CDx4 = np.sum(
this_mask[1:, :-3] * this_mask[:-1, 3:] * temp_box[1:, :-3] * temp_box[1:, 3:]
)
if maskCDx1 < 1 or maskCDx2 < 1 or maskCDx3 < 1 or maskCDx4 < 1:
return []
CDx1 /= maskCDx1
CDx2 /= maskCDx2
CDx3 /= maskCDx3
CDx4 /= maskCDx4
CD -= (CDx1 + CDx2 + CDx3 + CDx4) / 4.0
if subtr_corr and not newMeanSubMethod and not leadtrailSub:
CH -= mean_of_temp_box**2
CV -= mean_of_temp_box**2
tCH += CH * (1 if icorr == 0 else -1)
tCV += CV * (1 if icorr == 0 else -1)
if not full_corr:
if subtr_corr and not newMeanSubMethod and not leadtrailSub:
CD -= mean_of_temp_box**2
tCD += CD * (1 if icorr == 0 else -1)
if verbose:
print("pos =", if1, if2, "iteration", icorr, "cmin,cmax =", cmin, cmax)
print("Mask size", np.sum(this_mask), "correlations =", maskCH, maskCV, "data:", CH, CV)
temp_box = box2Noise[if1, :, :] - box2Noise[if2, :, :]
# end nested for loop
#
# Normalize covariances. Note that taking the difference of 2 frames doubled the covariance
# matrix, so we have introduced cov_clip_corr
xi = scipy.stats.norm.ppf(1 - epsilon)
cov_clip_corr = (1.0 - np.sqrt(2.0 / np.pi) * xi * np.exp(-xi * xi / 2.0) / (1.0 - 2.0 * epsilon)) ** 2
tCH /= num_files * (num_files - 1) * cov_clip_corr
tCV /= num_files * (num_files - 1) * cov_clip_corr
if not full_corr:
tCD /= num_files * (num_files - 1) * cov_clip_corr
# if we don't need full correlations, exit now
if not full_corr:
return [np.sum(this_mask), med2, var2, tCH, tCV, tCD]
# Curvature information (for 2nd order NL coefficient)
if tslices[-1] != tslices[-2]:
if subtr_href:
for f in range(num_files):
box1[f, :, :] += lightref[f, nt + 1]
boxD = (
region_cube[0:num_files, -2, :, :]
- region_cube[0:num_files, -1, :, :]
- (tslices[-1] - tslices[-2]) / float(tslices[1] - tslices[0]) * box1
)
# difference map
if subtr_href:
for f in range(num_files):
box1[f, :, :] -= lightref[f, nt + 1]
boxD[f, :, :] -= (
(tslices[-1] - tslices[-2]) / float(tslices[1] - tslices[0]) * lightref[f, 2 * nt]
)
fac0 = fac1 = 0
for if1 in range(num_files):
box1R = box1[if1, :, :]
boxDR = boxD[if1, :, :]
c1min = pyIRC_percentile(box1R, corr_mask, 100 * epsilon)
if c1min <= 0.5:
c1min = 0.5 # should have no effect if successful, but prevents division by 0 if failure
c1max = pyIRC_percentile(box1R, corr_mask, 100 * (1 - epsilon))
cDmin = pyIRC_percentile(boxDR, corr_mask, 100 * epsilon)
cDmax = pyIRC_percentile(boxDR, corr_mask, 100 * (1 - epsilon))
this_file_mask = np.where(
np.logical_and(
box1R > c1min, np.logical_and(box1R < c1max, np.logical_and(boxDR > cDmin, boxDR < cDmax))
),
corr_mask,
0,
)
fac0 += np.sum(this_file_mask * boxDR)
fac1 += np.sum(this_file_mask * box1R)
if fac1 < 0.5:
return [] # FAIL!
frac_dslope = fac0 / fac1 / ((tslices[-1] - tslices[-2]) / float(tslices[1] - tslices[0]))
else:
frac_dslope = 0.0
if verbose:
print("frac_dslope =", frac_dslope)
if verbose:
print("Group 1 ->", med1, var1)
print("Group 2 ->", med2, var2)
print("correlations in Group 2:", tCH, tCV)
print("factors used: xi =", xi, ", cov_clip_corr =", cov_clip_corr)
# Get alpha-corrected gains
out = gain_alphacorr(gain_raw, tCH, tCV, med2)
if tslices[1] >= tslices[-1] and len(out) < 1:
return [
np.sum(this_mask),
gain_raw,
gain_raw,
gain_raw,
0.0,
0.0,
0.0,
med2 / gain_raw / (tslices[1] - tslices[0]),
0.0,
tCH,
tCV,
]
if len(out) < 1:
return [] # FAIL!
gain_acorr = out[0]
aH = out[1]
aV = out[2]
if tslices[1] >= tslices[-1]:
return [
np.sum(this_mask),
gain_raw,
gain_acorr,
gain_acorr,
aH,
aV,
0.0,
med2 / gain_acorr / (tslices[1] - tslices[0]),
0.0,
tCH,
tCV,
]
out = gain_alphabetacorr(gain_raw, tCH, tCV, med2, frac_dslope, [t - treset for t in tslices])
if len(out) < 1:
return [] # FAIL!
gain_abcorr = out[0]
aH = out[1]
aV = out[2]
beta = out[3]
I_ = out[4]
return [np.sum(this_mask), gain_raw, gain_acorr, gain_abcorr, aH, aV, beta, I_, 0.0, tCH, tCV]
[docs]
def corr_5x5(region_cube, dark_cube, tslices, lightref, darkref, ctrl_pars, verbose):
"""
Extracts 5x5 correlation matrix from light and dark data.
Parameters
----------
region_cube : np.array
4D array of the region of interest. shape: (num_files+1, nt, dy, dx),
where num_files is the number of files, nt is the number of time slices, and
(dy, dx) is the shape of the region on the SCA used. The last slice, ``region_cube[-1,:,:,:]``,
is the mask (1 for good, 0 for bad).
dark_cube : np.array
Like `region_cube`, but for the darks. It is optional whether there is a separate mask
(it isn't used, so it is OK if the first axis has length num_files or num_files+1).
tslices : list of int
List of the time slice numbers; length ``nt``.
lightref : np.array
Reference pixel table for correcting light exposures. shape = (num_files, 2*nt+1);
the way the time axis is managed is described in :func:`ref_corr`.
darkref : np.array
Similar to `lightref`, but for the dark exposures.
ctrl_pars : class
A class containing the control parameters as attributes. These are optional (but
recommended); if specified, they follow the same format as in :func:`basic`.
verbose : bool
Whether to print lots of information.
Returns
-------
list
The return list has 5 entries. In what follows, "a", "b", and "d" correspond
to time slices ``tslices[0]``, ``tslices[1]``, and ``tslices[-1]``:
- Number of good pixels
- Accumulated signal S_{bd} (in DN; median of mean method).
- Estimated robust variance (from inter-quantile range) of S_{ab} (DN^2).
- Estimated robust variance (from inter-quantile range) of S_{ad} (DN^2).
- Correlation function out to 2 pixels of S_{ad}, i.e., C_{adad}, shape = (5,5)
cenetered on (0,0).
"""
# Settings:
newMeanSubMethod = True # use False only for test/debug
leadtrailSub = True # subtract leading & trailing (by +/-4 pix) from horiz & vert correlations
g_ptile = 75.0 # percentile use for inter-quantile range for variance (default: 75, giving standard IQR)
# Extract basic parameters
num_files = region_cube.shape[0] - 1
nt = region_cube.shape[1]
dy = region_cube.shape[2]
dx = region_cube.shape[3]
# npix = dx * dy
if nt != len(tslices):
print("Error in pyirc.corr_5x5: incompatible number of time slices")
exit()
if verbose:
print("nfiles = ", num_files, ", ntimes = ", nt, ", dx,dy=", dx, dy)
# treset = 0
# if hasattr(ctrl_pars, "reset_frame"):
# treset = ctrl_pars.reset_frame
# First get correlation parameters
epsilon = 0.01
if hasattr(ctrl_pars, "epsilon"):
epsilon = ctrl_pars.epsilon
subtr_corr = True
if hasattr(ctrl_pars, "subtr_corr"):
subtr_corr = ctrl_pars.subtr_corr
noise_corr = True
if hasattr(ctrl_pars, "noise_corr"):
noise_corr = ctrl_pars.noise_corr
if verbose:
print("corr pars =", epsilon, subtr_corr, noise_corr)
#
# Reference pixel subtraction?
subtr_href = True
if hasattr(ctrl_pars, "subtr_href"):
subtr_href = ctrl_pars.subtr_href
# lead-trail subtraction for IPC correlations?
if hasattr(ctrl_pars, "leadtrailSub"):
leadtrailSub = ctrl_pars.leadtrailSub
# quantile for variance?
if hasattr(ctrl_pars, "g_ptile"):
g_ptile = ctrl_pars.g_ptile
# Get means and variances at the early and last slices
# (i.e. 1-point information)
gauss_iqr_in_sigmas = scipy.stats.norm.ppf(g_ptile / 100.0) * 2 # about 1.349 for g_ptile=75.
box1 = region_cube[0:num_files, 0, :, :] - region_cube[0:num_files, 1, :, :]
box2 = region_cube[0:num_files, 0, :, :] - region_cube[0:num_files, -1, :, :]
box2Noise = dark_cube[0:num_files, 0, :, :] - dark_cube[0:num_files, -1, :, :]
#
if subtr_href:
for f in range(num_files):
if verbose:
print(
"lightref.shape=",
lightref.shape,
"subtr ->",
lightref[f, nt + 1],
lightref[f, 2 * nt - 1],
darkref[f, 2 * nt - 1],
)
box1[f, :, :] -= lightref[f, nt + 1]
box2[f, :, :] -= lightref[f, 2 * nt - 1]
box2Noise[f, :, :] -= darkref[f, 2 * nt - 1]
mean1 = np.mean(box1, axis=0)
mean2 = np.mean(box2, axis=0)
# med1 = np.median(mean1)
# med2 = np.median(mean2)
med21 = np.median(mean2 - mean1)
var1 = 0
var2 = 0
corr_mask = region_cube[-1, 0, :, :]
C_shift_mean = np.zeros((dy, dx))
tC_all = np.zeros((dy, dx))
for if1 in range(1, num_files):
for if2 in range(if1):
temp_box = box1[if1, :, :] - box1[if2, :, :]
iqr1 = pyIRC_percentile(temp_box, corr_mask, g_ptile) - pyIRC_percentile(
temp_box, corr_mask, 100 - g_ptile
)
temp_box = box2[if1, :, :] - box2[if2, :, :]
iqr2 = pyIRC_percentile(temp_box, corr_mask, g_ptile) - pyIRC_percentile(
temp_box, corr_mask, 100 - g_ptile
)
var1 += (iqr1 / gauss_iqr_in_sigmas) ** 2 / 2.0
var2 += (iqr2 / gauss_iqr_in_sigmas) ** 2 / 2.0
if verbose:
print("Inner loop,", if1, if2, temp_box.shape)
var1 /= num_files * (num_files - 1) / 2.0
var2 /= num_files * (num_files - 1) / 2.0
if var2 <= var1 and tslices[1] < tslices[-1]:
return [] # FAIL!
# Correlations of neighboring pixels, in DN^2
#
for if1 in range(1, num_files):
for if2 in range(if1):
temp_box = box2[if1, :, :] - box2[if2, :, :]
# Run through twice if we have noise, otherwise once
nrun = 2 if noise_corr else 1
if verbose:
print("if1,if2=", if1, if2, " nrun: ", nrun)
for icorr in range(nrun):
# clipping
cmin = pyIRC_percentile(temp_box, corr_mask, 100 * epsilon)
cmax = pyIRC_percentile(temp_box, corr_mask, 100 * (1 - epsilon))
this_mask = np.where(np.logical_and(temp_box > cmin, temp_box < cmax), 1, 0) * corr_mask
if np.sum(this_mask) < 1:
return [] # FAIL!
# mean subtraction
mean_of_temp_box = np.sum(temp_box * this_mask) / np.sum(this_mask)
if subtr_corr and newMeanSubMethod:
temp_box -= mean_of_temp_box
# Correlations in all directions
# masktmp = correlate2d(this_mask, this_mask,mode='same')
# C_all = correlate2d(this_mask*temp_box, this_mask*temp_box, mode='same')
dy2 = dy // 2
dx2 = dx // 2
masktmp = fftconvolve(this_mask, np.flip(this_mask), mode="full")[
dy2 : -dy2 + 1, dx2 : -dx2 + 1
]
C_all = fftconvolve(this_mask * temp_box, np.flip(this_mask * temp_box), mode="full")[
dy2 : -dy2 + 1, dx2 : -dx2 + 1
]
if np.any(masktmp < 1):
return []
C_all /= masktmp
if leadtrailSub:
C_pos_shift = np.zeros_like(C_all)
C_neg_shift = np.zeros_like(C_all)
C_pos_shift[:, :-8] = C_all[
:, 8:
] # values of the correlation matrix 8 columns to the right
C_neg_shift[:, 8:] = C_all[
:, :-8
] # values of the correlation matrix 8 columns to the left
# The 8 columns at the right edge just take the negative shift values,
# the 8 columns at the left edge just take the positive shift values,
# and in the middle the mean of the two shifts is computed:
C_shift_mean[:, 8:-8] = np.mean([C_pos_shift[:, 8:-8], C_neg_shift[:, 8:-8]], axis=0)
C_shift_mean[:, :8] = C_pos_shift[:, :8]
C_shift_mean[:, -8:] = C_neg_shift[:, -8:]
C_all = C_all - C_shift_mean
# need to update the lines below to use C_all
if subtr_corr and not newMeanSubMethod and not leadtrailSub:
C_all -= mean_of_temp_box**2
tC_all += C_all * (1 if icorr == 0 else -1)
if verbose:
print("pos =", if1, if2, "iteration", icorr, "cmin,cmax =", cmin, cmax)
# Below needs to be adjusted
# print ('Mask size', np.sum(this_mask), 'correlations =', maskCH, maskCV,
# 'data:', CH, CV)
temp_box = box2Noise[if1, :, :] - box2Noise[if2, :, :]
# end nested for loop
#
# Normalize covariances. Note that taking the difference of 2 frames doubled the covariance
# matrix, so we have introduced cov_clip_corr
xi = scipy.stats.norm.ppf(1 - epsilon)
cov_clip_corr = (1.0 - np.sqrt(2.0 / np.pi) * xi * np.exp(-xi * xi / 2.0) / (1.0 - 2.0 * epsilon)) ** 2
tC_all /= num_files * (num_files - 1) * cov_clip_corr
# extract 5x5 matrix in the center of tC_all here:
# hard-coded to return only 5x5 arrays, we should add option to specify
# Find the "center" of this array
c_y = dy - dy // 2
c_x = dx - dx // 2
tC_all_5x5 = tC_all[c_y - 3 : c_y + 2, c_x - 3 : c_x + 2]
decenter_tC_all = decenter(tC_all_5x5) # Might come in handy
if verbose:
print("tCH, tCV: ", decenter_tC_all[0, 1], decenter_tC_all[1, 0])
# Return the correlations
return [np.sum(this_mask), med21, var1, var2, tC_all_5x5]
[docs]
def corrstats(lightfiles, darkfiles, formatpars, box, tslices, sensitivity_spread_cut, ctrl_pars):
"""
Routine to obtain statistical properties of a region of the detector across many time slices.
Parameters
----------
lightfiles : list
The list of "light" (flat field) files.
darkfiles : list
The list of dark files.
formatpars : int
The file format.
box : list of int
The box boundaries in the form [xmin, xmax, ymin, ymax], with the usual
convention that pixels are included if xmin<=x<xmax and ymin<=y<ymax.
tslices : list if int
The list of time slices to use, with length at least 2. The first two entries
are ``tmin`` and ``tmax`` (with tmin<=t<tmax). If the length is exactly 2, then
all time slice combinations are computed (see Returns). Otherwise, the additional
entries are "deltas". For example, [4,10,1,3] means that computations are done for
4<=ti<tj<10, but only with combinations that have tj-ti equal to 1 or 3.
sensitivity_spread_cut : float
What percentage response from median flat to cut for identifying good pixels (typically 0.1).
ctrl_pars : class
A class containing the control parameters as attributes. These are optional (but
recommended); if specified, they follow the same format as in :func:`basic`.
Returns
-------
data : np.array
Array of return information, shape = (nt, nt, 6), where nt is the number of time slices
(so data for time slice pair ti,tj is in ``data[ti-tmin,tj-tmin,:]``). The fields on the last
axis are:
- Number of good pixels
- Accumulated signal S_{ab} (in DN; median of mean method).
- Variance of S_{ab} (DN^2).
- Correlation function for horizontal pixels, C_{abab}(1,0) (DN^2).
- Correlation function for vertical pixels, C_{abab}(0,1) (DN^2).
- Correlation function for diagonal pixels, C_{abab}(+/-1,+/-1), averaged
over the two diagonal directions (DN^2).
"""
# make copy of ctrl_pars, but force 5th element to be False
ctrl_pars2 = copy.copy(ctrl_pars)
ctrl_pars2.full_corr = False
tmin = tslices[0]
tmax = tslices[1]
nt = tmax - tmin
# build cube of good pixels, medians, variances, correlations
data = np.zeros((nt, nt, 6))
# and get mask (last 'time' slice) -- only thing we are extracting from region_cube_X
region_cube_X = pixel_data(
lightfiles,
formatpars,
box[:4],
[tmin, tmax - 1, tmax - 1, tmax - 1],
[sensitivity_spread_cut, True],
False,
)
# Get list of (good pix, median, var, cov_H, cov_V)
for ti in range(nt - 1):
for tj in range(ti + 1, nt):
if tslices[2:] == [] or tj - ti in tslices[2:] or tj - ti == nt - 1:
t1 = tmin + ti
t2 = tmin + tj
tarray = [t1, t2, t2, t2]
lightref = ref_array_block(lightfiles, formatpars, box[2:4], tarray, False)
darkref = ref_array_block(darkfiles, formatpars, box[2:4], tarray, False)
if not ctrl_pars.subtr_href:
lightref[:, :] = 0.0
darkref[:, :] = 0.0
region_cube = pixel_data(
lightfiles, formatpars, box[:4], tarray, [sensitivity_spread_cut, False], False
)
dark_cube = pixel_data(
darkfiles, formatpars, box[:4], tarray, [sensitivity_spread_cut, False], False
)
# switch to the mask from above
region_cube[-1, :, :, :] = region_cube_X[-1, :, :, :]
dark_cube[-1, :, :, :] = region_cube_X[-1, :, :, :]
B = basic(region_cube, dark_cube, tarray, lightref, darkref, ctrl_pars2, False)
if len(B) == 6:
data[ti, tj, :] = np.asarray(B)
# print (t1, t2, data[ti,tj,:], len(B))
return data
[docs]
def polychar(
lightfiles,
darkfiles,
formatpars,
box,
tslices,
sensitivity_spread_cut,
ctrl_pars,
addInfo,
swi,
corrstats_data=None,
):
"""
Routine to characterize of a region of the detector across many time slices.
Parameters
----------
lightfiles : list
The list of "light" (flat field) files.
darkfiles : list
The list of dark files.
formatpars : int
The file format.
box : list of int
The box boundaries in the form [xmin, xmax, ymin, ymax], with the usual
convention that pixels are included if xmin<=x<xmax and ymin<=y<ymax.
tslices : list if int
The list of time slices to use, with length at least 2. The first two entries
are ``tmin`` and ``tmax`` (with tmin<=t<tmax). If the length is exactly 2, then
all time slice combinations are computed (see Returns). Otherwise, the additional
entries are "deltas". For example, [4,10,1,3] means that computations are done for
4<=ti<tj<10, but only with combinations that have tj-ti equal to 1 or 3.
sensitivity_spread_cut : float
What percentage response from median flat to cut for identifying good pixels (typically 0.1).
ctrl_pars : class
A class containing the control parameters as attributes. These are optional (but
recommended); see the Notes.
addInfo : list
Some additional information needed for IPNL corrections to the inferred gain and IPC data.
The length may be 0 (null, no corrections), 2, or 3. If there is a correction, then the entries
are:
- ``addInfo[0]`` : str
Either 'bfe' or 'nlipc' (which form of IPNL to assume dominant).
- ``addInfo[1]`` : np.array
BFE kernel, shape (2s+1, 2s+1), centered at 0, units in inverse electrons.
- ``addInfo[2]`` : np.array, optional
1D array of polynomial coefficients, needed if ``ctrl_pars.use_allorder`` is True.
This is in DN-based units, starting with the quadratic coefficient (unit: DN^-1).
swi : class
Column table.
corrstats_data : np.array, optional
If given, saved data from :func:`corrstats` (saves time if alraedy computed).
Returns
-------
list
The list entries are [isgood (1 = good, 0 = bad), gain (e/DN), alpha_H (IPC), alpha_V (IPC),
beta (1/e), Intensity (e/frame), alpha_D (IPC), change in alpha from previous iteration
(residual)]. Returns the empty list [] in the event of a failure.
Notes
-----
The `ctrl_pars` class contains the following attributes.
They follow the same format as in :func:`basic`, except that ``use_allorder`` is added:
- ``epsilon`` : float
Fraction of data points to cut for computing correlations (default 0.01)
- ``subtr_corr`` : bool
Do mean subtraction for the IPC correlation? (default to True)
- ``noise_corr`` : bool
Do noise subtraction for the IPC correlation? (default to True)
- ``reset_frame`` : int
Reset frame (default to 0)
- ``subtr_href`` : bool
Horizontal reference pixel subtraction? (default to True)
- ``full_corr`` : bool
Which parameters to report? (default to True = standard basic pars; False = correlation data instead)
- ``leadtrailSub`` : bool
Perform lead-trail subtraction? (default to False)
- ``g_ptile`` : float
Percentile for inter-quantile range (default to 75)
- ``use_allorder`` : bool
Whether to use the full polynomial expansion for the non-linearity correction? (default to False)
"""
# Check whether we have non-linearity information
if hasattr(ctrl_pars, "use_allorder") and ctrl_pars.use_allorder and len(addInfo) < 3:
print("Error: polychar: not enough fields in addInfo")
return []
# Check time range
if len(tslices) < 4:
print("Error: polychar: not enough data", tslices)
return []
if tslices[2] >= tslices[3] or tslices[3] >= tslices[1] - tslices[0] or tslices[1] - tslices[0] < 3:
print("Error: polychar: invalid slices range", tslices)
return []
# Get correlation function data (including adjacent steps))
if corrstats_data is None:
data = corrstats(
lightfiles, darkfiles, formatpars, box, tslices + [1], sensitivity_spread_cut, ctrl_pars
)
else:
data = np.copy(corrstats_data)
# check if this is good
nt = tslices[1] - tslices[0]
for ti in range(nt - 1):
for tj in range(ti + 1, nt):
if data[ti, tj, 0] == 0 and tj - ti in [1, tslices[2], tslices[3]]:
return [0, 0, 0, 0, 0, 0]
# Determine whether we are applying corrections
applyCorr = False
if len(addInfo) >= 2:
applyCorr = True
typeCorr = addInfo[0]
ipnl = addInfo[1]
sBFE = np.shape(ipnl)[0] // 2
# Fit of differences as a function of slice number
# slope = -2*beta*I^2/g
# intercept = (I - beta I^2)/g
npts = tslices[1] - tslices[0] - 1
diff_frames = np.zeros(npts)
for j in range(npts):
diff_frames[j] = data[j, j + 1, 1] # median from frame tslices[0]+j -> tslices[0]+j+1
slopemed, icpt = np.linalg.lstsq(
np.vstack([np.array(range(npts)) + tslices[0] - ctrl_pars.reset_frame, np.ones(npts)]).T,
diff_frames,
rcond=-1,
)[0]
# If using 'allorder', let's subtract out the higher-order terms:
if hasattr(ctrl_pars, "use_allorder") and ctrl_pars.use_allorder:
xr = np.array(range(npts)) + tslices[0] - ctrl_pars.reset_frame
i = 100
err = 10
etarget = 1e-9 * np.abs(icpt)
while i >= 0 and err > etarget:
I__g = icpt - 0.5 * slopemed # might use this in the future # noqa: F841
diff_frames_reduced = diff_frames.copy()
icpt_old = icpt
slopemed_old = slopemed
for j in range(3, swi.p + 1):
diff_frames_reduced -= (
addInfo[2][j - 2] * ((xr + 1) ** j - xr**j) * (icpt - slopemed * 0.5) ** j
)
slopemed, icpt = np.linalg.lstsq(np.vstack([xr, np.ones(npts)]).T, diff_frames_reduced, rcond=-1)[
0
]
err = np.sqrt((icpt - icpt_old) ** 2 + (slopemed - slopemed_old) ** 2)
if i == 0:
print("higher order loop failed to converge {:12.5E} vs {:12.5E} (target)", err, etarget)
return []
# Difference of correlation functions
#
# Cdiff = I/g^2 * ((1-4a)^2 + 2aH^2 + 2aV^2) * t_{bd}
# - 4(1-8a)beta I^2/g^2 * (t_{ad}t_d - t_{ab}t_b + (e-1)/2*t_{bd})
# where e = npts2 is number of bins averaged together
#
# and horizontal and vertical cross-correlations
# CH = 2 I t_{ab} / g^2 * ( 1-4a - 4 beta (I t_b + 1/2 + (e-1)/2*I) ) * aH
# CV = 2 I t_{ab} / g^2 * ( 1-4a - 4 beta (I t_b + 1/2 + (e-1)/2*I) ) * aV
#
npts2 = tslices[1] - tslices[0] - tslices[3]
Cdiff = CV = CH = CD = 0.0
for j in range(npts2):
Cdiff += data[j, j + tslices[3], 2] - data[j, j + tslices[2], 2]
CH += data[j, j + tslices[3], 3]
CV += data[j, j + tslices[3], 4]
CD += data[j, j + tslices[3], 5]
Cdiff /= npts2
CH /= npts2
CV /= npts2
CD /= npts2
# initialize with no IPC or NL
alphaH = alphaV = alphaD = alpha = beta = 0.0
da = 1.0
# dummy initializations; these get over-written before they are used
I_ = g = 1.0
Cdiffcorr = 0.0
iCycle = 0
nCycle = 100
while iCycle < nCycle:
alphaH_old = alphaH
alphaV_old = alphaV
alphaD_old = alphaD
g_old = g # to track convergence
# Get combination of I and gain from difference of correlation functions
tbrack = (
tslices[3] * (tslices[0] + tslices[3] - ctrl_pars.reset_frame)
- tslices[2] * (tslices[0] + tslices[2] - ctrl_pars.reset_frame)
+ (npts2 - 1) / 2.0 * (tslices[3] - tslices[2])
)
I__g2 = (
(Cdiff - Cdiffcorr + 4.0 * (1.0 - 8.0 * alpha) * beta * I_**2 / g**2 * tbrack)
/ (tslices[3] - tslices[2])
/ ((1.0 - 4 * alpha - 4 * alphaD) ** 2 + 2 * alphaH**2 + 2 * alphaV**2 + 4 * alphaD**2)
)
# Now use slopemed = -2 beta I^2/g, icpt = (I - beta I^2)/g, and I/g^2 to solve for I, beta, and g
g = (icpt - slopemed / 2.0) / I__g2
I_ = I__g2 * g**2
beta = -g * slopemed / 2.0 / I_**2
# Corrections to horiz. and vert. IPC
#
CHcorr = CVcorr = CDcorr = 0.0
if applyCorr:
if typeCorr.lower() == "bfe":
CHcorr = (ipnl[sBFE, sBFE + 1] + ipnl[sBFE, sBFE - 1]) / 2.0 * (I_ / g * tslices[3]) ** 2
CVcorr = (ipnl[sBFE + 1, sBFE] + ipnl[sBFE - 1, sBFE]) / 2.0 * (I_ / g * tslices[3]) ** 2
CDcorr = (
(
ipnl[sBFE + 1, sBFE + 1]
+ ipnl[sBFE + 1, sBFE - 1]
+ ipnl[sBFE - 1, sBFE + 1]
+ ipnl[sBFE - 1, sBFE - 1]
)
/ 4.0
* (I_ / g * tslices[3]) ** 2
)
Cdiffcorr = ipnl[sBFE, sBFE] * (I_ / g) ** 2 * (tslices[3] ** 2 - tslices[2] ** 2)
if typeCorr.lower() == "nlipc":
CHcorr = (
(ipnl[sBFE, sBFE + 1] + ipnl[sBFE, sBFE - 1])
/ 2.0
* (I_ / g) ** 2
* tslices[3]
* (tslices[0] + tslices[3] + (npts2 - 1) * 0.5)
* 2
)
CVcorr = (
(ipnl[sBFE + 1, sBFE] + ipnl[sBFE - 1, sBFE])
/ 2.0
* (I_ / g) ** 2
* tslices[3]
* (tslices[0] + tslices[3] + (npts2 - 1) * 0.5)
* 2
)
CDcorr = (
(
ipnl[sBFE + 1, sBFE + 1]
+ ipnl[sBFE + 1, sBFE - 1]
+ ipnl[sBFE - 1, sBFE + 1]
+ ipnl[sBFE - 1, sBFE - 1]
)
/ 4.0
* (I_ / g) ** 2
* tslices[3]
* (tslices[0] + tslices[3] + (npts2 - 1) * 0.5)
* 2
)
Cdiffcorr = (
ipnl[sBFE, sBFE]
* (I_ / g) ** 2
* (
(tslices[0] + tslices[3]) * tslices[3]
- (tslices[0] + tslices[2]) * tslices[2]
+ (tslices[3] - tslices[2]) * (npts2 - 1) * 0.5
)
)
# apply corrections from ftsolve
if ctrl_pars.fullnl and typeCorr.lower() == "bfe":
beta_cm = beta
if ctrl_pars.use_allorder:
beta_cm = -addInfo[2] / g ** np.linspace(1, swi.p - 1, num=swi.p - 1)
if Test_SubBeta:
beta_cm = beta
t0 = tslices[0] - ctrl_pars.reset_frame
CF_BigStep = solve_corr_many(
ipnl,
21,
I_,
g,
beta_cm,
0.0,
[t0, t0 + tslices[3], t0, t0 + tslices[3], npts2],
[alphaV, alphaH, alphaD],
[0.0, 0.0, 0.0],
sBFE,
)
CF_SmallStep = solve_corr_many(
ipnl,
21,
I_,
g,
beta_cm,
0.0,
[t0, t0 + tslices[2], t0, t0 + tslices[2], npts2],
[alphaV, alphaH, alphaD],
[0.0, 0.0, 0.0],
sBFE,
)
Cdiffcorr = (
CF_BigStep[sBFE, sBFE]
- CF_SmallStep[sBFE, sBFE]
- (
I_
/ g**2
* ((1 - 4 * alpha - 4 * alphaD) ** 2 + 2 * alphaH**2 + 2 * alphaV**2 + 4 * alphaD**2)
* (tslices[3] - tslices[2])
- 4 * (1 - 8 * alpha) * beta * I_**2 / g**2 * tbrack
)
)
ad3 = tslices[0] + tslices[3] - ctrl_pars.reset_frame
CHcorr = (CF_BigStep[sBFE, sBFE + 1] + CF_BigStep[sBFE, sBFE - 1]) / 2.0 - (
2.0
* I_
/ g**2
* tslices[3]
* (1.0 - 4 * alpha - 4 * alphaD - 4 * beta * (I_ * ad3 + (npts2 - 1) * 0.5 * I_ + 0.5))
* alphaH
+ 4.0 * I_ / g**2 * tslices[3] * alphaV * alphaD
)
CVcorr = (CF_BigStep[sBFE + 1, sBFE] + CF_BigStep[sBFE - 1, sBFE]) / 2.0 - (
2.0
* I_
/ g**2
* tslices[3]
* (1.0 - 4 * alpha - 4 * alphaD - 4 * beta * (I_ * ad3 + (npts2 - 1) * 0.5 * I_ + 0.5))
* alphaV
+ 4.0 * I_ / g**2 * tslices[3] * alphaH * alphaD
)
CDcorr = (
CF_BigStep[sBFE + 1, sBFE + 1]
+ CF_BigStep[sBFE - 1, sBFE + 1]
+ CF_BigStep[sBFE + 1, sBFE - 1]
+ CF_BigStep[sBFE - 1, sBFE - 1]
) / 4.0 - (
2.0 * I_ / g**2 * tslices[3] * (1.0 - 4 * alpha - 4 * alphaD) * alphaD
+ 2.0 * I_ / g**2 * tslices[3] * alphaH * alphaV
)
factor = (
2.0
* I__g2
* tslices[3]
* (
1.0
- 4.0 * alpha
- 4.0 * alphaD
- 4.0
* beta
* (I_ * (tslices[0] + tslices[3] - ctrl_pars.reset_frame + (npts2 - 1.0) / 2.0) + 0.5)
)
)
factor_raw = 2.0 * I__g2 * tslices[3]
alphaH = (CH - CHcorr - 2.0 * alphaV * alphaD * factor_raw) / factor
alphaV = (CV - CVcorr - 2.0 * alphaH * alphaD * factor_raw) / factor
alphaD = ((CD - CDcorr) / factor_raw - alphaH * alphaV) / (1.0 - 4.0 * alpha - 4.0 * alphaD)
alpha = (alphaH + alphaV) / 2.0
da = np.abs(alphaH_old - alphaH) + np.abs(alphaV_old - alphaV) + np.abs(alphaD_old - alphaD)
dg = np.abs(g_old - g)
iCycle += 1
if iCycle < nCycle - 2 and da < 1e-8 and dg < 1e-8:
iCycle = nCycle - 2 # fast exit from loop
return [1, g, alphaH, alphaV, beta, I_, alphaD, da]
[docs]
def bfe(region_cube, tslices, basicinfo, ctrl_pars_bfe, swi, verbose):
"""
Routines to compute the BFE coefficients.
Parameters
----------
region_cube : np.array
4D array of the region of interest. shape: (num_files+1, nt, dy, dx),
where num_files is the number of files, nt is the number of time slices, and
(dy, dx) is the shape of the region on the SCA used. The last slice, ``region_cube[-1,:,:,:]``,
is the mask (1 for good, 0 for bad).
tslices : list of int
A list of at least 4 time slices. The first two and last two ("a", "b", "c", and "d") are
used for the BFE determination.
basicinfo : list
Output from :func:`basic` (inclides gains, IPC, and non-linearity).
ctrl_pars_bfe : class
Parameters to control BFE determination; see Notes.
swi : class
Column table.
verbose : bool
Whether to talk a lot.
Returns
-------
np.array
The BFE kernel, shape (2s+1, 2s+1), Antilogus coefficients in inverse electrons.
Notes
-----
The (optional) attributes in `ctrl_pars_bfe` are::
- ``epsilon`` : float
Fraction of data to cut in computing correlation coefficients (default to 0.01).
- ``treset`` : int or float
Reset frame (default to 0). Fractional values are possible.
- ``BSub`` : bool
Perform baseline subtraction? (default to True)
- ``vis`` : bool
Does this have visible light information (for quantum yield)? (default to False)
- ``Phi`` : np.array
2D quantum yield + charge diffusion kernel (only used if `ctrl_pars_bfe`.vis is True).
This is omega/(1+omega)*p2, where 1+omega is the quantum yield (omega is the probability
of getting 2 charges from 1 photon) and p2[s+dy,s+dx] is the pairwise probability that two charges
generated at the same point land in the pixel (dx,dy). Here the kernel has shape (2s+1, 2s+1),
centered at zero (so it is symmetric).
"""
N = 21 # <-- size for ftsolve
# Extract parameters from basicinfo
gain = basicinfo[swi.g]
aH = basicinfo[swi.alphaH]
aV = basicinfo[swi.alphaV]
beta = basicinfo[swi.beta]
I_ = basicinfo[swi.I]
# Extract basic parameters
num_files = region_cube.shape[0] - 1
nt = region_cube.shape[1]
dy = region_cube.shape[2]
dx = region_cube.shape[3]
# npix = dx * dy
if nt != len(tslices):
print("Error in basic: incompatible number of time slices")
exit()
if verbose:
print("nfiles = ", num_files, ", ntimes = ", nt, ", dx,dy=", dx, dy)
treset = 0
if hasattr(ctrl_pars_bfe, "treset"):
treset = ctrl_pars_bfe.treset
# for visible flats
hasvis = False
if hasattr(ctrl_pars_bfe, "vis") and ctrl_pars_bfe.vis:
hasvis = True
normPhi = np.sum(ctrl_pars_bfe.Phi) # this is omega/(1+omega)
omega = normPhi / (1 - normPhi)
p2 = np.zeros_like(ctrl_pars_bfe.Phi)
if np.abs(normPhi) > 1e-49:
p2 = ctrl_pars_bfe.Phi / normPhi # this prevents an exception if omega=0
p2 = pad_to_N(p2, N) # still centered
# BFE kernel size:
# sBFE = range; fsBFE = full size
sBFE = swi.s
fsBFE = 2 * sBFE + 1
sBFE_out = sBFE
fsBFE_out = fsBFE
# replace beta with a scalar value if necessary
# note beta[0] is now 2nd order coef (in DN^-1) is to be converted to beta (in e^-1) and has opposite sign
if ctrl_pars_bfe.fullnl and ctrl_pars_bfe.use_allorder:
beta = -beta / gain ** np.linspace(1, swi.p - 1, num=swi.p - 1)
if ctrl_pars_bfe.fullnl and ctrl_pars_bfe.use_allorder and Test_SubBeta:
beta = beta[0]
# Baseline subtraction -- requires bigger box
BSub = True
if hasattr(ctrl_pars_bfe, "BSub"):
BSub = ctrl_pars_bfe.BSub
if BSub:
sBFE = max(sBFE_out, 10)
fsBFE = 2 * sBFE + 1
pad = 5 # Number of pixels in corr. fcn. to take for the baseline on each side in each row
# Cut fraction and correction
epsilon = 0.01
if hasattr(ctrl_pars_bfe, "epsilon"):
epsilon = ctrl_pars_bfe.epsilon
xi = scipy.stats.norm.ppf(1 - epsilon)
cov_clip_corr = (1.0 - np.sqrt(2.0 / np.pi) * xi * np.exp(-xi * xi / 2.0) / (1.0 - 2.0 * epsilon)) ** 2
# Build the two slices to correlate
box1 = region_cube[0:num_files, 0, :, :] - region_cube[0:num_files, 1, :, :]
box3 = region_cube[0:num_files, -2, :, :] - region_cube[0:num_files, -1, :, :]
corr_mask = region_cube[-1, 0, :, :]
# setup for BFE kernel
numBFE = np.zeros((fsBFE, fsBFE))
denBFE = np.zeros((fsBFE, fsBFE))
# Loop over the flat pairs we are going to use
for if1 in range(1, num_files):
for if2 in range(if1):
# Build slices and mask
slice_ab = box1[if1, :, :] - box1[if2, :, :]
slice_cd = box3[if1, :, :] - box3[if2, :, :]
ab_min = pyIRC_percentile(slice_ab, corr_mask, 100 * epsilon)
ab_max = pyIRC_percentile(slice_ab, corr_mask, 100 * (1 - epsilon))
cd_min = pyIRC_percentile(slice_cd, corr_mask, 100 * epsilon)
cd_max = pyIRC_percentile(slice_cd, corr_mask, 100 * (1 - epsilon))
this_file_mask_ab = np.where(np.logical_and(slice_ab > ab_min, slice_ab < ab_max), corr_mask, 0)
this_file_mask_cd = np.where(np.logical_and(slice_cd > cd_min, slice_cd < cd_max), corr_mask, 0)
if verbose:
print(
if1,
if2,
slice_ab.shape,
slice_cd.shape,
np.sum(this_file_mask_ab),
np.sum(this_file_mask_cd),
)
# Mean subtraction
slice_ab -= pyIRC_mean(slice_ab, this_file_mask_ab)
slice_cd -= pyIRC_mean(slice_cd, this_file_mask_cd)
# Set masked values to zero
slice_ab *= this_file_mask_ab
slice_cd *= this_file_mask_cd
# Now get the correlation function ...
# format is: numerator and denominator of C_{abcd}(2*sBFE-i,2*sBFE-j)
for j in range(fsBFE):
for i in range(fsBFE):
abminX = 0
abmaxX = dx
abminY = 0
abmaxY = dy
if i >= sBFE:
abmaxX += sBFE - i
else:
abminX += sBFE - i
if j >= sBFE:
abmaxY += sBFE - j
else:
abminY += sBFE - j
cdminX = abminX + i - sBFE
cdmaxX = abmaxX + i - sBFE
cdminY = abminY + j - sBFE
cdmaxY = abmaxY + j - sBFE
# Add up contributions to the correlation function
denBFE[j, i] += np.sum(
this_file_mask_ab[abminY:abmaxY, abminX:abmaxX]
* this_file_mask_cd[cdminY:cdmaxY, cdminX:cdmaxX]
)
numBFE[j, i] += (
np.sum(
slice_ab[abminY:abmaxY, abminX:abmaxX] * slice_cd[cdminY:cdmaxY, cdminX:cdmaxX]
)
/ 2.0
)
# division by 2 since differencing two images doubles the answer
BFEK = numBFE / (denBFE + 1e-99)
BFEK *= gain**2 / (I_**2 * (tslices[1] - tslices[0]) * (tslices[-1] - tslices[-2]) * cov_clip_corr)
# Baseline subtraction
if BSub:
for j in range(fsBFE):
rowBL = (np.mean(BFEK[j, 0:pad]) + np.mean(BFEK[j, -pad:])) / 2.0
BFEK[j, :] -= rowBL
# Implement cr_converge.
if ctrl_pars_bfe.fullnl:
avals = [basicinfo[swi.alphaV], basicinfo[swi.alphaH], basicinfo[swi.alphaD]]
avals_nl = [0, 0, 0]
sigma_a = 0
tol = 1.0e-11 # Pick a tolerance below which the two Crs are considered equal
fsBFE_out = 2 * sBFE_out + 1
observed_Cr = BFEK[sBFE - sBFE_out : sBFE + sBFE_out + 1, sBFE - sBFE_out : sBFE + sBFE_out + 1]
BFEK_model = np.zeros((fsBFE_out, fsBFE_out)) + 1e-15
element_diff = 10
iters = 0
while element_diff > tol and iters <= 100:
# Note: solve_corr takes centered things, decenters/calculates internally
if hasvis:
theory_Cr = solve_corr_vis(
BFEK_model,
N,
I_,
gain,
beta,
sigma_a,
[t - treset for t in tslices],
avals,
avals_nl,
sBFE_out,
omega,
p2,
) * ((gain**2) / (I_**2 * (tslices[1] - tslices[0]) * (tslices[-1] - tslices[-2])))
else:
theory_Cr = solve_corr(
BFEK_model, N, I_, gain, beta, sigma_a, [t - treset for t in tslices], avals, avals_nl
) * ((gain**2) / (I_**2 * (tslices[1] - tslices[0]) * (tslices[-1] - tslices[-2])))
if np.isnan(theory_Cr).any():
warnings.warn("BFE loop diverged, generated NaN")
return np.zeros((fsBFE_out, fsBFE_out)) + np.nan
difference = theory_Cr - observed_Cr
element_diff = np.amax(abs(difference))
BFEK_model -= difference[::-1, ::-1]
iters += 1
if verbose:
print(iter, BFEK_model)
if iters > 99:
warnings.warn("WARNING: NL loop has iterated 100 times")
return np.zeros((fsBFE_out, fsBFE_out)) + np.nan
return BFEK_model
else:
# Corrections for classical non-linearity
BFEK[sBFE, sBFE] += 2 * (1 - 4 * (aH + aV)) * beta
if sBFE >= 1:
BFEK[sBFE, sBFE + 1] += 4 * aH * beta
BFEK[sBFE, sBFE - 1] += 4 * aH * beta
BFEK[sBFE + 1, sBFE] += 4 * aV * beta
BFEK[sBFE - 1, sBFE] += 4 * aV * beta
return BFEK[sBFE - sBFE_out : sBFE + sBFE_out + 1, sBFE - sBFE_out : sBFE + sBFE_out + 1]
[docs]
def hotpix(darkfiles, formatpars, tslices, pars, verbose):
"""
Selects hot pixels.
Parameters
----------
darkfiles : list of str
A list of the filenames of the dark exposures.
formatpars : int
The format code.
tslices : list of int
The time slices to read (the first slice is 1).
pars : np.array or array-like
Parameters controlling the hot pixel selection. These should be
``[Smin, Smax, stability, f_isolation]`` (see Notes for detailed meaning).
verbose : bool
Whether to print lots of information.
Returns
-------
row, col : np.array of int
The row and column values of the selected hot pixels. The two arrays have the same
length; the ``i``th hot pixel is at ``[row[i],col[i]]``.
Notes
-----
The hot pixels in the array must meet the following criteria:
- The apparent brightness in time slices up through tslices[-1] is assessed.
Pixels are required to have a dark signal in DN between ``Smin`` and ``Smax``.
- Repeatable to within a top-to-bottom error of ``stability`` as a fraction of the
maximum signal (e.g. 0.1 for 10% repeatability).
- Isolation: if ``f_isolation``>0, rejects pixels with neighbors that are at least this many
times as bright as this pixel itself (e.g. 0.1 for 10% isolation).
"""
# Build array for the dark cube
ndarks = len(darkfiles)
N = get_nside(formatpars)
cube = np.zeros((ndarks, N, N))
for f in range(ndarks):
CDS = load_segment(darkfiles[f], formatpars, [0, N, 0, N], [1, tslices[-1]], False)
cube[f, :, :] = CDS[0, :, :] - CDS[1, :, :]
# Extract information on the pixels
this_hot = np.zeros((N, N))
ave_cube = np.mean(cube, axis=0)
d_cube = np.max(cube, axis=0) - np.min(cube, axis=0)
if verbose:
print("time slices for hot pixel analysis ->", tslices)
print(ave_cube)
print("->", ave_cube.shape)
print(d_cube)
print("->", d_cube.shape)
this_hot = np.where(np.logical_and(ave_cube >= pars[0], ave_cube <= pars[1]), 1, 0)
# Isolation cut
if verbose:
print("Start with", np.sum(this_hot), "pixels before isolation cut")
if pars[3] > 0:
C = 2
M = np.ones((2 * C + 1, 2 * C + 1))
M[C, C] = 0
isolation_mask = scipy.ndimage.maximum_filter(ave_cube, footprint=M, mode="constant", cval=0)
# Also avoid pixels that border on reference pixels
this_hot[: 4 + C, :] = 0
this_hot[-(4 + C) :, :] = 0
this_hot[:, : 4 + C] = 0
this_hot[:, -(4 + C) :] = 0
this_hot *= np.where(isolation_mask <= pars[3] * ave_cube, 1, 0)
if verbose:
print("Start with", np.sum(this_hot), "pixels")
for t in tslices[1:]:
for f in range(ndarks):
CDS = load_segment(darkfiles[f], formatpars, [0, N, 0, N], [1, t], False)
cube[f, :, :] = CDS[0, :, :] - CDS[1, :, :]
d_cube = np.max(cube, axis=0) - np.min(cube, axis=0)
this_hot *= np.where(d_cube <= pars[2] * ave_cube, 1, 0)
if verbose:
print(np.sum(this_hot))
return np.where(this_hot > 0)
[docs]
def hotpix_ipc(y, x, darkfiles, formatpars, tslices, pars, verbose):
"""
Return IPC data from a list of hot pixels.
Parameters
----------
y, x : np.array of int
Tables of hot pixel coordinates to use (probably selected from :func:`hotpix`).
darkfiles : list of str
List of dark files to use.
formatpars : int
Format code for dark files.
tslices : list of int
List of time slices to report.
pars : list, [np.array, bool]
Parameters to control data selection. If not empty, `pars`[0] is a map of the
non-linearity times gain (units: 1/DN) and `pars`[1] is a boolean for whether
the non-linearity should be referenced to a median stack of initial image.
verbose : bool
Whether to talk a lot.
Returns
-------
np.array
Data cube with the signal in DN from the hot pixels. The shape is (npix, nt, 10), where npix is the
number of hot pixels (length of `y` and `x`); and nt is the number of time stamps
(length of `tslices`). The last index indicates the position: 0 = that pixel; 1 = right; 2 = up;
3 = left; 4 = downl; 5 = upper-right; 6 = upper-left; 7 = lower-left; 8 = lower-right;
9 = background (from the next 5x5-3x3=13 pixels out).
"""
# Build array for the dark cube
ndarks = len(darkfiles)
N = get_nside(formatpars)
cube = np.zeros((ndarks, N, N))
nt = len(tslices)
npix = len(x)
data = np.zeros((npix, nt, 10))
# offset table
dx = [0, 1, 0, -1, 0, 1, -1, -1, 1]
dy = [0, 0, 1, 0, -1, 1, 1, -1, -1]
# Perform nonlinearity correction?
do_nonlin = False
if len(pars) >= 1 and (type(pars[0]) is np.ndarray):
do_nonlin = True
m = pars[0]
beta_gain = np.zeros((N, N))
(ny1, nx1) = np.shape(m)
kx1 = N // nx1
ky1 = N // ny1
for i in range(nx1):
for j in range(ny1):
beta_gain[ky1 * j : ky1 * (j + 1), kx1 * i : kx1 * (i + 1)] = m[j, i]
# now beta_gain is an NxN map of beta*gain
if verbose:
print("beta*gain =", beta_gain)
# baseline for NL correction is median image?
medbaseline_nonlin = False
if len(pars) >= 2:
medbaseline_nonlin = pars[1]
# background mask
bkmask = np.ones((5, 5))
bkmask[1:4, 1:4] = 0.0
fourmask = False
if fourmask:
bkmask[:, :] = 0.0
bkmask[2, 0] = bkmask[2, 4] = bkmask[0, 2] = bkmask[4, 2] = 1.0
if verbose:
print("bkmask =", bkmask)
# 16 ones and 9 zeros
# now make data cube
for jt in range(nt):
for f in range(ndarks):
CDS = load_segment(darkfiles[f], formatpars, [0, N, 0, N], [1, tslices[jt]], False)
cube[f, :, :] = CDS[0, :, :] - CDS[1, :, :]
if do_nonlin:
# non-linearity correction, if turned on
cube_corr = cube[f, :, :]
if medbaseline_nonlin:
cube_corr = (
2 * scipy.ndimage.median_filter(CDS[0, :, :], size=3) - CDS[0, :, :] - CDS[1, :, :]
)
cube[f, :, :] = cube[f, :, :] * (1.0 + beta_gain * cube_corr)
medframe = np.median(cube, axis=0)
if verbose:
print("med", np.shape(medframe), jt)
for jpix in range(npix):
for jpos in range(9):
x_ = x[jpix] + dx[jpos]
y_ = y[jpix] + dy[jpos]
data[jpix, jt, jpos] = medframe[y_, x_]
data[jpix, jt, 9] = (
25.0 / 16.0 * np.mean(bkmask * medframe[y[jpix] - 2 : y[jpix] + 3, x[jpix] - 2 : x[jpix] + 3])
)
if fourmask:
data[jpix, jt, 9] *= 16.0 / 4.0
return data
[docs]
def slidemed_percentile(x, y, p, mrange=[-1, 1], niter=64, pivot="pos"): # list OK # noqa: B006
"""
Sliding median function.
Takes in points specified by `x` and `y` arrays, and percentile `p`,
and returns slope m such that p% of the data are below the line y = m*x.
(If all values in `x` are positive, this is simply a percentile of `y`/`x`.
But this version is not biased when the noise causes a few values to fluctuate
negative.)
Parameters
----------
x, y : np.array of float
Vectors of the same length.
p : float
Desired percentile.
mrange : list, optional
Slope range to search, length 2 ([mmin, mmax]).
niter : int, optional
Number of bisections to perform.
pivot : str, optional
The pivot is not used but is here for forward compatibility; right now it assumes
that the pivot point of the distribution is at x>0 (hence ordering in the bisection).
In a future release if we need to change this the functionality is there.
Returns
-------
float
The slope of the line meeting that percentile criterion.
"""
m1 = mrange[0]
m2 = mrange[1]
for _ in range(niter):
m = (m1 + m2) / 2.0
if np.nanpercentile(y - m * x, p) > 0:
m1 = m
else:
m2 = m
return m
[docs]
def get_vmin_vmax(mydata, qext):
"""
Generates min and max range for a color bar based on inter-quartile range.
Parameters
----------
mydata : np.array
The data to consider
qext : float
Number of interquartile ranges to extend (should be >=0, with 0
corresponding to 25th through 75th percentile).
Returns
-------
float, float
The minimum and maximum of the scale.
"""
Q1 = np.nanpercentile(mydata, 25)
Q2 = np.nanpercentile(mydata, 75)
return Q1 - (Q2 - Q1) * qext, Q2 + (Q2 - Q1) * qext