Source code for solid_waffle.linearity_run

"""
Builds a linearity file.

Functions
---------
build_linearity_file
    Main routine to build the linearity file.

"""

import json
import re
import sys
from datetime import datetime, timezone

import asdf
import numpy
from astropy.io import fits
from scipy.special import eval_legendre

from . import pyirc


[docs] def build_linearity_file(infile): """ Main routine to build the linearity file. This is a script that reads a JSON configuration file. Parameters ---------- infile : str or dict The JSON configuration file (see Notes for details). If a string, then reads that file; if a dictionary, then takes as the script. Returns ------- None """ # get the parameters for this run if isinstance(infile, str): with open(infile, "r") as file: pars = json.load(file) script = file.read() else: pars = infile script = json.dumps(pars) # get sizes of things nside = pyirc.get_nside(pars["RAMPS"][0]["FORMAT"]) ncblock = 32 # split into blocks to speed things up -- must be a power of 2 dx = nside // ncblock nramps = len(pars["RAMPS"]) # global sign parameter -- are we fitting for *increasing* ramps or *decreasing*? # sign=1 for increasing sign = -1 if "SIGN" in pars: sign = int(pars["SIGN"]) print("Ramp direction:", sign) # we'll use a lot of these p_order = int(pars["P_ORDER"]) # is there a dark in the linearity runs? # If so, can specify which ramp number # (-1 for the last one) use_dark = None if "DARK" in pars: use_dark = int(pars["DARK"]) # timiing information tframe = 3.04 if "TFRAME" in pars: tframe = float(pars["TFRAME"]) # fraction of mean slope where we say something is saturated slopecut = 0.1 if "SLOPECUT" in pars: slopecut = float(pars["SLOPECUT"]) # definition for getting cube def get_median_cube(flist, fileformat=None, xmin=None, xmax=None, tstart=None, tend=None): # the flist is a list of files # tstart and tend are slices in Fortran convention # all the named arguments are required cube = numpy.zeros((tend - tstart + 1, nside, dx)) for t in range(tstart, tend + 1): temp = numpy.zeros((nside, dx, len(flist)), dtype=numpy.float32) for k in range(len(flist)): temp[:, :, k] = pyirc.load_segment(flist[k], fileformat, [xmin, xmax, 0, nside], [t], False) cube[t - tstart, :, :] = numpy.median(temp, axis=-1) if sign > 0: cube = 65535 - cube # flip to upward-going ramp return cube # output image cube: # 1st axis is Legendre polymonial coefficients, order 0..p_order # then: # p_order+1 -> Smin # p_order+2 -> Sref # p_order+3 -> Smax # p_order+4 -> maxerr # p_order+5 -> quality (0=OK) image_all = numpy.zeros((p_order + 6, nside, nside)) # full error array, indicating specific ramps err_cube = numpy.zeros((nramps, nside, nside), dtype=numpy.uint16) # pixel flats pflat = numpy.zeros((nramps, nside, nside), dtype=numpy.float32) # flag reference pixels # note only the first 24 flags should be used so that this can be # expressed exactly in IEEE 754 dq = numpy.zeros((nside, nside), dtype=numpy.uint32) dq[:4, :] |= 2**31 dq[-4:, :] |= 2**31 dq[:, :4] |= 2**31 dq[:, -4:] |= 2**31 # get bias information, from either ASDF bias reference file or FITS noise output from # solid_waffle.noise_run. infile = pars["BIAS"]["FILE"] if infile[-5:].lower() == ".asdf": with asdf.open(infile) as f: x = f for leaf in pars["BIAS"]["PATH"]: x = x[leaf] image_all[p_order + 2, :, :] = x[int(pars["BIAS"]["SLICE"]), :, :] elif infile[-5:].lower() == ".fits": with fits.open(infile) as f: image_all[p_order + 2, :, :] = f["NOISE"].data[int(f["NOISE"].header["BIAS"]), :, :] else: raise ValueError("Only ASDF or FITS bias input currently accepted. Can't read " + infile) # loop over strips. optional to stop somewhere ncblock_use = ncblock if "STOP" in pars: ncblock_use = min(ncblock, int(pars["STOP"])) for i in range(ncblock_use): xmin = dx * i xmax = xmin + dx # get file names: flists is going to be a list of lists flists = [] ntslice = numpy.zeros((nramps,), dtype=numpy.int16) for j in range(nramps): m = re.search(r"^(.+)/(\d\d\d\d\d\d\d\d)([^/]+)\_(\d+).fits", pars["RAMPS"][j]["FILE"]) if m: prefix = m.group(1) stamp = int(m.group(2)) body = m.group(3) else: print("Match failed.") exit() filelist = [] for k in range(pars["RAMPS"][j]["NRAMP"]): thisname = ( prefix + f"/{stamp:d}" + body + "_{:03d}.fits".format(k + pars["RAMPS"][j]["START"]) ) filelist.append(thisname) flists.append(filelist) ntslice[j] = pyirc.get_num_slices(pars["RAMPS"][j]["FORMAT"], filelist[0]) # now get the median cubes # also need to get the Smin and Smax for j in range(nramps): print("Strip", i, "files -->", j, ntslice[j], flists[j]) sys.stdout.flush() cube = get_median_cube( flists[j], fileformat=pars["RAMPS"][j]["FORMAT"], xmin=xmin, xmax=xmax, tstart=pars["RAMPS"][j]["TSTART"], tend=ntslice[j], ) print(numpy.median(cube, axis=(1, 2))) Smin_ = numpy.amin(cube, axis=0) Smax_ = numpy.amax(cube, axis=0) # mask later parts of the ramp if they saturate halfframe = (numpy.shape(cube)[0] + 1) // 2 diff = (cube[halfframe, :, :] - cube[0, :, :]) / halfframe for k in range(numpy.shape(cube)[0] - 2, p_order, -1): # diff = (Smax_-Smin_)/(numpy.shape(cube)[0]-1) # <-- old criterion if sign > 0: cuthere = cube[k + 1, :, :] - cube[k - 1, :, :] < 2 * slopecut * diff Smax_ = numpy.where(cuthere, cube[k, :, :], Smax_) else: cuthere = cube[k + 1, :, :] - cube[k - 1, :, :] > 2 * slopecut * diff Smin_ = numpy.where(cuthere, cube[k, :, :], Smin_) if j == 0: Smin = Smin_ Smax = Smax_ else: Smin = numpy.minimum(Smin, Smin_) Smax = numpy.maximum(Smax, Smax_) print( " Smin deciles:", [numpy.round(numpy.percentile(Smin, 10 * ip), 1) for ip in range(1, 10)] ) print( " Smax deciles:", [numpy.round(numpy.percentile(Smax, 10 * ip), 1) for ip in range(1, 10)] ) # set some more quality flags LowSignalRange = numpy.abs(Smax - Smin) < 1024 dq[:, xmin:xmax] |= numpy.where(LowSignalRange, 2**20, 0).astype(numpy.uint32) # now extend range to cover the bias level if sign > 0: Smin = numpy.minimum(Smin, image_all[p_order + 2, :, xmin:xmax] - float(pars["NEGATIVEPAD"])) else: Smax = numpy.maximum(Smax, image_all[p_order + 2, :, xmin:xmax] + float(pars["NEGATIVEPAD"])) # clip Smin = numpy.clip(Smin, 0, 65534) Smax = numpy.clip(Smax, 0, 65534) # <-- update to avoid accepting 65535 as unsaturated # pad to avoid singularities Smin -= 0.5 Smax += 0.5 # now get the coefficients in the cost function # cost = sum [phi(S) - a f(S) - c]^2 # where f(S) = timestamp # phi(S) = sum_L=0^p g_L P_L(z) # where z = S rescaled into -1 .. +1, z = (S-16383.5)/32768. # so for each pixel, there is a (p+1+2*nramps) x (p+1+2*nramps) matrix Amat = numpy.zeros((p_order + 1 + 2 * nramps, p_order + 1 + 2 * nramps, nside, dx)) # build matrices for j in range(nramps): offset_j = p_order + 1 + 2 * j # starting index for this ramp cube = get_median_cube( flists[j], fileformat=pars["RAMPS"][j]["FORMAT"], xmin=xmin, xmax=xmax, tstart=pars["RAMPS"][j]["TSTART"], tend=ntslice[j], ) for tt in range(numpy.shape(cube)[0]): z = -1.0 + 2.0 * (cube[tt, :, :] - Smin) / (Smax - Smin) # put in -1 .. +1 mask = numpy.where(numpy.abs(z) < 1, 1, 0) # if outside the range, mask that pixel u = numpy.zeros((p_order + 1, nside, dx)) for L in range(p_order + 1): u[L, :, :] = eval_legendre(L, z) for L in range(p_order + 1): for Lp in range(p_order + 1): Amat[L, Lp, :, :] += mask * u[L, :, :] * u[Lp, :, :] Amat[offset_j, L, :, :] -= mask * u[L, :, :] * tt Amat[L, offset_j, :, :] -= mask * u[L, :, :] * tt Amat[offset_j + 1, L, :, :] -= mask * u[L, :, :] Amat[L, offset_j + 1, :, :] -= mask * u[L, :, :] Amat[offset_j, offset_j, :, :] += mask * tt**2 Amat[offset_j, offset_j + 1, :, :] += mask * tt Amat[offset_j + 1, offset_j, :, :] += mask * tt Amat[offset_j + 1, offset_j + 1, :, :] += mask # check invertibility D = numpy.abs(numpy.linalg.det(numpy.transpose(Amat[2:, 2:, :, :], axes=(2, 3, 0, 1)))) Dmed = numpy.median(D) NearSingularMatrix = 1e-12 * Dmed > D dq[:, xmin:xmax] |= numpy.where(NearSingularMatrix, 2**20, 0).astype(numpy.uint32) # to prevent instabilities, effectively pin the higher-order coefficients in the low signal cases for l_ in range(2, p_order + 1 + 2 * nramps): Amat[l_, l_, :, :] += numpy.where(NearSingularMatrix, 1e24, 0) # now get the coefficients assuming we start with 0, 1 coef = numpy.zeros((p_order + 1, nside, dx)) coef[1, :, :] = 1.0 v = -numpy.linalg.solve( numpy.transpose(Amat[2:, 2:, :, :], axes=(2, 3, 0, 1)), numpy.transpose(Amat[1, 2:, :, :], axes=(1, 2, 0)), ) coef[2:, :, :] = numpy.transpose(v[:, :, : p_order - 1], axes=(2, 0, 1)) # get linearization error err = numpy.zeros((nside, dx)) err2 = numpy.zeros((nramps, nside, dx)) for j in range(nramps): cube = get_median_cube( flists[j], fileformat=pars["RAMPS"][j]["FORMAT"], xmin=xmin, xmax=xmax, tstart=pars["RAMPS"][j]["TSTART"], tend=ntslice[j], ) for tt in range(numpy.shape(cube)[0]): z = -1.0 + 2.0 * (cube[tt, :, :] - Smin) / (Smax - Smin) # put in -1 .. +1 pred = numpy.copy(z) for L in range(2, p_order + 1): pred += v[:, :, L - 2] * eval_legendre(L, z) this_err = pred - (v[:, :, p_order - 1 + 2 * j] * tt + v[:, :, p_order + 2 * j]) this_err = numpy.where(numpy.abs(z) < 1, this_err, 0.0) err = numpy.maximum(err, numpy.abs(this_err)) err2[j, :, :] = numpy.maximum(err2[j, :, :], numpy.abs(this_err)) # now do the rescaling to slope 1 at the bias point zbias = (image_all[p_order + 2, :, xmin:xmax] - Smin) / ( Smax - Smin ) * 2.0 - 1.0 # z value at the bias slope = numpy.zeros((nside, dx)) for L in range(1, p_order + 1): # this is to get dP_L(z)/dz, which is equal to sum_{L'<L, L-L' odd} (2L'+1) P_L'(z) dPl = numpy.zeros((nside, dx)) for Lp in range((L - 1) % 2, L, 2): dPl += (2 * Lp + 1) * eval_legendre(Lp, zbias) slope += coef[L, :, :] * dPl del dPl slope *= 2.0 / (Smax - Smin) # this is dz/dS coef = coef / slope[None, :, :] err = err / slope # also need to re-scale the fit error - this is now in linearized DN err2 = err2 / slope[None, :, :] # also need to re-scale the fit error - this is now in linearized DN print(" err deciles:", [numpy.round(numpy.percentile(err, 10 * ip), 1) for ip in range(1, 10)]) # get the P-flat fields for j in range(nramps): pflat[j, :, xmin:xmax] = (v[:, :, p_order - 1 + 2 * j] / slope / tframe).astype(numpy.float32) del slope # and now the intercept icpt = numpy.zeros((nside, dx)) for L in range(1, p_order + 1): icpt += coef[L, :, :] * eval_legendre(L, zbias) coef[0, :, :] = -icpt del icpt # monotonicity checks nzstep = 512 # number of grid points to check monotonicity for zstep in range(nzstep + 1): z = -1.0 + 2 * zstep / nzstep Sprime = numpy.zeros((nside, dx)) for L in range(1, p_order + 1): # this is to get dP_L(z)/dz, which is equal to sum_{L'<L, L-L' odd} (2L'+1) P_L'(z) dPl = numpy.zeros((nside, dx)) for Lp in range((L - 1) % 2, L, 2): dPl += (2 * Lp + 1) * eval_legendre(Lp, zbias) Sprime += coef[L, :, :] * dPl del dPl dq[:, xmin:xmax] |= numpy.where(Sprime <= 0, 2**20, 0).astype(numpy.uint32) image_all[: p_order + 1, :, xmin:xmax] = coef image_all[p_order + 1, :, xmin:xmax] = Smin image_all[p_order + 3, :, xmin:xmax] = Smax image_all[p_order + 4, :, xmin:xmax] = err err_cube[:, :, xmin:xmax] = numpy.clip(numpy.round(err2), 0, 65535).astype(numpy.uint16) # 4D or 3D to 2D projection, displaying lots of strips left to right for visualization # (not actually needed in the final script) def to2D(arr): d = numpy.shape(arr) if len(d) == 4: return numpy.transpose(arr, axes=(2, 0, 1, 3)).reshape(d[2], -1) else: return numpy.transpose(arr, axes=(1, 0, 2)).reshape(d[1], -1) # now OK to convert to float32 image_all = image_all.astype(numpy.float32) # quality flags store exactly in IEEE754 32-bit arithmetic image_all[p_order + 5, :, :] = dq.astype(numpy.float32) # save the data cube Im = fits.PrimaryHDU(image_all) Im.header["P_ORDER"] = p_order Im.header["NSIDE_Y"] = nside Im.header["NSIDE_X"] = nside for L in range(p_order): Im.header[f"SLICE{L+1:03d}"] = f"Legendre coef L={L:d}" Im.header[f"SLICE{p_order+1:03d}"] = "min signal in fit" Im.header[f"SLICE{p_order+2:03d}"] = "ref signal in fit (usually bias level; slope=1)" Im.header[f"SLICE{p_order+3:03d}"] = "max signal in fit" Im.header[f"SLICE{p_order+4:03d}"] = "max error" Im.header[f"SLICE{p_order+5:03d}"] = "quality" Im.header["QBIT20"] = "Bit 20: failed or unstable nonlinearity solution" Im.header["QBIT31"] = "Bit 31: ref pixel" # do the dark subtraction dark = numpy.zeros((nside, nside), dtype=numpy.float32) if use_dark is not None: dark = pflat[use_dark, :, :] pflat = pflat - dark[None, :, :] # quality flag information for j in range(32): print(f"flag {j:2d}, count {numpy.count_nonzero(dq&(2**j)):7d}") fits.HDUList([Im, fits.ImageHDU(pflat, name="PFLAT")]).writeto( pars["OUTPUT"][:-5] + ".fits", overwrite=True ) # where the input data came from ... pedigree = "DUMMY" if "PEDIGREE" in pars: pedigree = pars["PEDIGREE"] # ASDF output tree = { "roman": { "meta": { "author": "linearity_run.py", "description": "linearity_run.py", "instrument": {"detector": "WFI{:02d}".format(int(pars["SCA"])), "name": "WFI"}, "origin": "PIT - solid-waffle", "date": datetime.now(timezone.utc).isoformat(), "pedigree": pedigree, "reftype": "LINEARITYLEGENDRE", "telescope": "ROMAN", "useafter": "!time/time-1.2.0 2020-01-01T00:00:00.000", }, "data": image_all[: p_order + 1, :, :], "pflat": pflat, "dark": dark, "dq": dq, "Smin": image_all[p_order + 1, :, :], "Sref": image_all[p_order + 2, :, :], "Smax": image_all[p_order + 3, :, :], "maxerr": numpy.clip(numpy.round(image_all[p_order + 4, :, :]), 0, 65535).astype(numpy.uint16), "ramperr": err_cube, }, "notes": { "script": script, "units": "data, maxerr, ramperr: DN_lin; pflat, dark: DN_lin/s; Smin, Sref, Smax: DN_raw", }, } asdf.AsdfFile(tree).write_to(pars["OUTPUT"][:-5] + ".asdf")
if __name__ == "__main__": build_linearity_file(sys.argv[1])