Source code for solid_waffle.flat_simulator.simulate_flat

"""
Flat simulator.

Notes
-----
Long description::

    Start with a realization of a perfect detector plus some BFE effect to
    the charge in a given pixel
    1.  The mean charge <Q_a(i,j)> = It_a + 0.5*Sigma_a*I^2*t_a^2 (Eqn 36)
    2.  Realization of a 4096 x 4096 pixel^2 grid with 66 time samples
    This final result data cube will then be [4k, 4k, 66] in dimensions
    and will require some kind of identifying header, currently taken
    from one of the DCL flats.  This is charge.
    3.  Convert to ADU/DN via gain ~ number(e-)/counts;  set ~ FWD/2^16

    Run tests with 128x128 or something small...
    current I is per pixel (units: e/s)
    time t (units: s)
    time step for read from t_a to t_a+1 (will need to check # for convergence)

    NOTE: to run some modes, one needs:
     ** a copy of a DCL flat file with the name set below

"""

import re
import sys

import fitsio
import numpy as np
from fitsio import FITSHDR

from ..ftsolve import p2kernel
from ..pyirc import get_nside
from .detector_functions import TestKernels, calc_area_defect, calculate_ipc


[docs] def PairPoisson(myMean, myShape, QY_offset, QY_p2, RNG): """ Build distribution of pairs of points. Parameters ---------- myMean : float mean number of *photons* per pixel (number of e is 2x larger). myShape : (int, int) Shape of numpy array to generate. QY_offset : int Maximum offset of the "other" electron to consider (usually 2 is enough). QY_p2 : np.array The pairwise probability array. RNG : numpy.random.Generator or numpy.random.RandomState The random number generator. Returns ------- np.array Pair-Poisson random array, including both Poisson photon generation and then the electron pairs being collected into pixels; shape = `myShape`.. """ Ptot = np.zeros(myShape) (yg, xg) = myShape for j in range(2 * QY_offset + 1): for i in range(2 * QY_offset + 1): dP = RNG.poisson(myMean * QY_p2[j, i], (yg + 2 * QY_offset, xg + 2 * QY_offset)) Ptot += dP[QY_offset : yg + QY_offset, QY_offset : xg + QY_offset] Ptot += dP[j : yg + j, i : xg + i] return Ptot
[docs] class Simulation: """ Run a flat field simulation. Parameters ---------- cfg : str The configuration as a multiline string. verbose : bool, optional Whether to talk a lot. Attributes ---------- pars : dict Parameter dictionary for this simulation. Methods ------- __init__ Constructor. run Runs the simulation. """
[docs] def __init__(self, cfg, verbose=True): # Defaults formatpars = 1 tsamp = 66 substep = 2 I_ = 10.0 QE = 0.8 delta_tsamp = 3.0 # arbitrary for now (s) gain = 1.5 # arbitrary scalar e-/DN outfile = "DefaultOutput.fits" wavemode = "ir" # options are 'ir' or 'vis'; used only for BFE kernel choice rngseed = 1000 noisemode = "none" noisefile = None bfemode = "true" lipcmode = "false" lipc_alpha = [0.01] nlmode = "false" nlbeta = 1.5 # (ppm/e-) # nlcoeffs_arr are [c_2,c_3,c_4] with c_j in units of ppm electrons^(1-j) nlcoeffs_arr = [-1.5725, 1.9307e-5, -1.4099e-10] reset_frames = [0] resetlevel = 0.0 # quantum yield defaults QY_omega = 0.0 QY_offset = 2 QY_p2 = np.zeros((5, 5)) QY_p2[2, 2] = 1 # RNG choice legacy = False # Read in information for line in cfg.splitlines(): # Format m = re.search(r"^FORMAT:\s*(\d+)", line) if m: formatpars = int(m.group(1)) # Number of reads m = re.search(r"^NREADS:\s*(\d+)", line) if m: tsamp = int(m.group(1)) # substeps m = re.search(r"^SUBSTEPS:\s*(\d+)", line) if m: substep = int(m.group(1)) # Time step (s) m = re.search(r"^DT:\s*(\S+)", line) if m: delta_tsamp = float(m.group(1)) # Gain (e/DN) m = re.search(r"^GAIN:\s*(\S+)", line) if m: gain = float(m.group(1)) # Quantum yield m = re.search(r"^QY:\s*(\d.*)", line) if m: QY_pars_str = m.group(1).split(" ") QY_omega = float(QY_pars_str[0]) # sig = float(m.group(2)) QY_cov = [float(QY_pars_str[x]) for x in range(1, len(QY_pars_str))] # QY_cxx = float(m.group(2)) # QY_cxy = float(m.group(3)) # QY_cyy = float(m.group(4)) QY_offset = 2 # QY_p2 = p2kernel(np.asarray([sig**2,0,sig**2]), QY_offset, 256) QY_p2 = p2kernel(np.asarray(QY_cov), QY_offset, 256) # Illumination (photons/s/pixel) m = re.search(r"^ILLUMINATION:\s*(\S+)", line) if m: I_ = float(m.group(1)) # QE (Illumination * QE = current, e/s/pixel) m = re.search(r"^QE:\s*(\S+)", line) if m: QE = float(m.group(1)) # RNG seed m = re.search(r"^RNGSEED:\s*(\d+)", line) if m: rngseed = int(m.group(1)) # Noise m = re.search(r"^NOISE:\s*(\S+)", line) if m: noisemode = m.group(1) m = re.search(r"^NOISE:\s*(\S+)\s+(\S+)", line) if m: noisemode = m.group(1) if noisemode != "none": noisefile = m.group(2) # Wavelength mode, affects BFE choice m = re.search(r"^WAVEMODE:\s*(\S+)", line) if m: wavemode = m.group(1) # BFE m = re.search(r"^BFE:\s*(\S+)", line) if m: bfemode = m.group(1) # linear IPC m = re.search(r"^L_IPC:\s*(\S+)\s+(\S.*)", line) if m: lipcmode = m.group(1) if lipcmode == "true": lipc_alpha_str = m.group(2).split(" ") lipc_alpha = [float(lipc_alpha_str[x]) for x in range(len(lipc_alpha_str))] # non-linearity beta m = re.search(r"^NL:\s*(\S+)\s+(\S.*)", line) if m: nlmode = m.group(1) if nlmode == "quadratic": nlbeta = float(m.group(2)) elif nlmode == "quartic": nlcoeffs_str = m.group(2).split(" ") nlcoeffs_arr = [float(nlcoeffs_str[x]) for x in range(len(nlcoeffs_str))] # Reset level (in e) m = re.search(r"^RESET_E:\s*(\S+)", line) if m: resetlevel = float(m.group(1)) # Output file m = re.search(r"^OUTPUT:\s*(\S+)", line) if m: outfile = m.group(1) # Use legacy random number generator? m = re.search(r"^LEGACY\s*$", line) if m: legacy = True # Stuff to save save_pars = [ "formatpars", "tsamp", "substep", "I_", "QE", "delta_tsamp", "gain", "outfile", "wavemode", "rngseed", "noisemode", "noisefile", "bfemode", "lipcmode", "lipc_alpha", "nlmode", "nlbeta", "nlcoeffs_arr", "reset_frames", "resetlevel", "QY_omega", "QY_offset", "QY_p2", "legacy", ] # now save the dictionary
[docs] self.pars = {}
for i in save_pars: self.pars[i] = locals()[i]
[docs] def run(self, verbose=False): """ Runs the simulation. Parameters ---------- verbose : bool, optional Whether to print lots of status updates. Returns ------- None """ # data cube attributes N = nx = ny = get_nside(self.pars["formatpars"]) # possibly redundant with nx,ny # Reference pixels hard-coded to 4 rows/cols around border, true for # all except WFC3 which has 5 xmin, xmax, ymin, ymax = 4, N - 4, 4, N - 4 # Extent of non-reference pixels nt_step = ( self.pars["tsamp"] * self.pars["substep"] ) # number of tot timesteps depending on convergence needs delta_t = (self.pars["delta_tsamp"] * self.pars["tsamp"]) / nt_step # time between timesteps allQ = np.zeros((self.pars["substep"], nx, ny)) data_cube_Q = np.zeros((self.pars["tsamp"], nx, ny)) data_cube_S = np.zeros_like(data_cube_Q) offset_frame = np.zeros((1, nx, ny)) count = 1 if verbose: print("side length =", N) print( "samples:", self.pars["tsamp"], "x", self.pars["delta_tsamp"], "s; # substep =", self.pars["substep"], ) print("Illumination:", self.pars["I_"], "ph/s/pix; QE =", self.pars["QE"]) print("RNG seed ->", self.pars["rngseed"]) RNG = ( np.random.RandomState(seed=self.pars["rngseed"]) if self.pars["legacy"] else np.random.Generator(seed=self.pars["rngseed"]) ) # Reset first frame if needed offset_frame[:, :, :] = self.pars["resetlevel"] if 0 in self.pars["reset_frames"]: allQ[0, :, :] = data_cube_Q[0, :, :] = offset_frame # Start with 0 charge in the first frame (t=0) mean = self.pars["I_"] * delta_t for tdx in range(1, nt_step): # Create realization of charge # This version uses less memory, but probably still sub-optimal idx = tdx % self.pars["substep"] # Charge accumulates, dependent on quantum efficiency of the pixel # and the brighter-fatter effect. First timestep is Poisson realization if tdx == 1: allQ[idx, :, :] = allQ[idx - 1, :, :] allQ[idx, xmin:xmax, ymin:ymax] += RNG.poisson( self.pars["QE"] * mean * (1.0 - self.pars["QY_omega"]) / (1.0 + self.pars["QY_omega"]), allQ[idx, xmin:xmax, xmin:xmax].shape, ) allQ[idx, xmin:xmax, ymin:ymax] += PairPoisson( self.pars["QE"] * mean * self.pars["QY_omega"] / (1.0 + self.pars["QY_omega"]), allQ[idx, xmin:xmax, xmin:xmax].shape, self.pars["QY_offset"], self.pars["QY_p2"], RNG, ) else: # If not the first step, and the brighter-fatter effect is turned # on, then now loop through all pixels if self.pars["bfemode"] == "true": # Calculate the area defect by taking a convolution of the bfe # kernel (flipped in the calc_area_defect function) and the charge Q # Area defects are magnified by (1+QY_omega)/(1-QY_omega) since I am only applying them # to the 1-electron events if self.pars["wavemode"] == "ir": a_coeff = TestKernels.get_bfe_kernel_5x5_ir() elif self.pars["wavemode"] == "vis": a_coeff = TestKernels.get_bfe_kernel_5x5_vis() else: if verbose: print("wavemode set to unknown value, defaulting BFE to IR") a_coeff = TestKernels.get_bfe_kernel_5x5() area_defect = calc_area_defect( (1.0 + self.pars["QY_omega"]) / (1.0 - self.pars["QY_omega"]) * a_coeff, allQ[idx - 1, xmin:xmax, ymin:ymax], ) meanQ = ( area_defect * mean * self.pars["QE"] * (1.0 - self.pars["QY_omega"]) / (1.0 + self.pars["QY_omega"]) ) allQ[idx, xmin:xmax, ymin:ymax] = allQ[idx - 1, xmin:xmax, ymin:ymax] + RNG.poisson(meanQ) allQ[idx, xmin:xmax, ymin:ymax] += PairPoisson( self.pars["QE"] * mean * self.pars["QY_omega"] / (1.0 + self.pars["QY_omega"]), allQ[idx, xmin:xmax, xmin:xmax].shape, self.pars["QY_offset"], self.pars["QY_p2"], RNG, ) else: # Otherwise Poisson draw the charge as before allQ[idx, :, :] = allQ[idx - 1, :, :] allQ[idx, xmin:xmax, ymin:ymax] += RNG.poisson( self.pars["QE"] * mean * (1.0 - self.pars["QY_omega"]) / (1.0 + self.pars["QY_omega"]), allQ[idx, xmin:xmax, ymin:ymax].shape, ) allQ[idx, xmin:xmax, ymin:ymax] += PairPoisson( self.pars["QE"] * mean * self.pars["QY_omega"] / (1.0 + self.pars["QY_omega"]), allQ[idx, xmin:xmax, xmin:xmax].shape, self.pars["QY_offset"], self.pars["QY_p2"], RNG, ) if idx == 0: data_cube_Q[count, :, :] = allQ[idx, :, :] allQ = np.zeros((self.pars["substep"], nx, ny)) # if this is a reset frame set start to offset if count in self.pars["reset_frames"]: allQ[0, :, :] = offset_frame else: allQ[0, :, :] = data_cube_Q[count, :, :] count += 1 if verbose: print("time: %d" % count) # Add in IPC before the noise if the mode is turned on if self.pars["lipcmode"] == "true": data_cube_Q[:, xmin:xmax, ymin:ymax] = calculate_ipc( data_cube_Q[:, xmin:xmax, ymin:ymax], self.pars["lipc_alpha"] ) else: pass # Apply non-linearity if mode turned on; assumed to act after IPC if self.pars["nlmode"] == "quadratic": data_cube_Q[:, xmin:xmax, ymin:ymax] -= (1.0e-6 * self.pars["nlbeta"]) * data_cube_Q[ :, xmin:xmax, ymin:ymax ] ** 2 if verbose: print("Applying non-linearity at leading order coefficient (quadratic term)") elif self.pars["nlmode"] == "quartic": data_cube_Q[:, xmin:xmax, ymin:ymax] += ( 1.0e-6 * self.pars["nlcoeffs_arr"][0] * data_cube_Q[:, xmin:xmax, ymin:ymax] ** 2 + 1.0e-6 * self.pars["nlcoeffs_arr"][1] * data_cube_Q[:, xmin:xmax, ymin:ymax] ** 3 + 1.0e-6 * self.pars["nlcoeffs_arr"][2] * data_cube_Q[:, xmin:xmax, ymin:ymax] ** 4 ) if verbose: print("Applying non-linearity polynomial to quartic term") else: if verbose: print("No additional non-linearity (Beta) applied") # Read in the read noise from a fits file generated with Bernie's ngxhrg # noisemode 'last' uses one realization because the full one takes # a long time to create if self.pars["noisemode"].lower() == "last": noise = fitsio.read(self.pars["noisefile"]) data_cube_Q[-1, :, :] += noise # Adding to only the final time elif self.pars["noisemode"].lower() == "full": noise = fitsio.read(self.pars["noisefile"]) data_cube_Q += noise # Adding the noise at all reads elif self.pars["noisemode"].lower() == "none": pass else: # a small amount of Gaussian noise, 12 e data_cube_Q += 10 * RNG.normal(size=(self.pars["tsamp"], N, N)) # Convert charge to signal, clipping values<0 and >2**16 data_cube_S = np.array(np.clip(data_cube_Q / self.pars["gain"], 0, 65535), dtype=np.uint16) # Write simple header, todo: add more thorough comments hdr = FITSHDR() hdr["GAIN"] = self.pars["gain"] hdr["ILLUMIN"] = self.pars["I_"] hdr["QE"] = self.pars["QE"] hdr["LEGACY"] = self.pars["legacy"] hdr["RNGSEED"] = self.pars["rngseed"] if self.pars["lipcmode"] == "true": hdr["LINIPC"] = self.pars["lipc_alpha"][0] if self.pars["nlmode"] == "quadratic": hdr["BETA"] = self.pars["nlbeta"] if self.pars["nlmode"] == "quartic": hdr["NLCOEFFS_c2"] = self.pars["nlcoeffs_arr"][0] hdr["NLCOEFFS_c3"] = self.pars["nlcoeffs_arr"][1] hdr["NLCOEFFS_c4"] = self.pars["nlcoeffs_arr"][2] if self.pars["bfemode"] == "true": hdr["BFE_A00"] = a_coeff[2][2] # Hard-coded to expect 5x5 a coeffs # Open up an example DCL flat file and save the data cube # dclfile = 'Set_001_Test_0002.fits' fitsio.write(self.pars["outfile"], data_cube_S, header=hdr, clobber=True)
# Try compression of data cube into file # End of script
[docs] def run_config(filename): """ Runs a configuration from a file name. Parameters ---------- filename : str The name of the file. Returns ------- None """ with open(filename) as myf: sim = Simulation(myf.read(), verbose=True) print(sim.pars) sim.run(verbose=True)
if __name__ == "__main__": run_config(sys.argv[1])