"""Code to load different detector responses. """
from dataclasses import dataclass
import logging
import os
import pathlib
import sys
from astropy.io import fits
import astropy.units as u
from matplotlib.colors import LogNorm, Normalize
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import numpy as np
import response_tools
from response_tools.util import BaseOutput
FILE_PATH = response_tools.responseFilePath
RESPONSE_INFO_TYPE = response_tools.contextResponseInfo["files"]["detectors"]
ASSETS_PATH = os.path.join(pathlib.Path(__file__).parent, "assets", "response-tools-figs", "det-resp-figs")
[docs]
@dataclass
class DetectorResponseOutput(BaseOutput):
"""Class for keeping track of effective area response values."""
# numbers
input_energy_edges: u.Quantity # photon axis
output_energy_edges: u.Quantity # count axis
detector_response: u.Quantity # rmf
# bookkeeping
detector: str
# any other fields needed can be added here
# can even add with a default so the input is not required for every other instance
[docs]
def cdte_det_resp_rmf(file):
"""Return the redistribution matrix for CdTe from a given file.
Parameters
----------
file : `str`
The file for the RMF.
Returns
-------
: `DetectorResponseOutput`
An object containing all the redistribution matrix information.
See accessible information using `.contents` on the output.
"""
e_lo, e_hi, ngrp, fchan, nchan, matrix = _read_rmf(file)
fchan_array = col2arr_py(fchan)
nchan_array = col2arr_py(nchan)
energies = np.append(e_lo, e_hi[-1])<<u.keV
return DetectorResponseOutput(filename=file,
function_path=f"{sys._getframe().f_code.co_name}",
input_energy_edges=energies,
output_energy_edges=energies,
detector_response=vrmf2arr_py(data=matrix,
n_grp_list=ngrp,
f_chan_array=fchan_array,
n_chan_array=nchan_array)<<(u.ct/u.ph),
detector="CdTe-General-Detector-Response"
)
[docs]
@u.quantity_input(pitch=u.um)
def cdte_det_resp(cdte:int=None, region:int=None, pitch=None, side:str="merged", event_type:str="all", file=None):
"""Return the redistribution matrix for CdTe from a given file.
Requires the `cdte` input and _either_ `region` _xor_ `pitch`.
Wrapper for `cdte_det_resp_rmf` with bookkeeping.
Parameters
----------
cdte : `int`
The CdTe detector number. Must be in [1,2,3,4].
region : `int`
The region of the CdTe detector required. Either provide
`region` _xor_ `pitch`. The `region` maps onto the pitches used
across the detector.
- Region 0 -> 60<<astropy.units.um
- Region 1 -> 80<<astropy.units.um
- Region 2 -> 100<<astropy.units.um
pitch : `astropy.units.quantity.Quantity`
Instead of `region`, it might be more usefule to specify the
pitch in physical units (must b convertable to
`astropy.units.um`). Either provide `region` _xor_ `pitch`.
The pitches map onto the `region` input.
- 60<<astropy.units.um -> Region 0
- 80<<astropy.units.um -> Region 1
- 100<<astropy.units.um -> Region 2
side : `str`
Define the side on the detector the user requires the response
from. Must be in ["pt", "merged"].
Default: "merged"
event_type : `str`
Define the type of event trigger being considered in the
response. Must be in ["1hit", "2hit", ("all", "mix")].
Note: \"all\" and \"mix\" are the same but some from different
naming conventions on the merged and individual detector sides.
This will be fixed at some point in the future.
Default: "all"
file : `str`
The file for the RMF.
Returns
-------
: `DetectorResponseOutput`
An object containing all the redistribution matrix information.
See accessible information using `.contents` on the output.
"""
have_region, have_pitch = (region is not None), (pitch is not None)
pitch2region = {60<<u.um:0, 80<<u.um:1, 100<<u.um:2}
valid_region, valid_pitch = (region in list(pitch2region.values())), (pitch in list(pitch2region.keys()))
if (cdte is None) or (cdte not in [1,2,3,4]):
logging.warning(f"In {sys._getframe().f_code.co_name}, `cdte` must be given and in [1,2,3,4].")
return
if ((not have_region) and (not have_pitch)) \
or (have_region and have_pitch):
logging.warning(f"In {sys._getframe().f_code.co_name}, either `region` [0,1,2] _xor_ `pitch` [60<<u.um,80<<u.um,100<<u.um] must be given.")
return
if have_region and (not valid_region):
logging.warning(f"In {sys._getframe().f_code.co_name}, the `region` must be in [0,1,2].")
return
if have_pitch and (not valid_pitch):
logging.warning(f"In {sys._getframe().f_code.co_name}, the `pitch` must be in [60<<u.um,80<<u.um,100<<u.um].")
return
if side not in ["pt", "merged"]:
logging.warning(f"In {sys._getframe().f_code.co_name}, the `side` must be in [\"pt\", \"merged\"].")
return
if event_type not in ["1hit", "2hit", "all", "mix"]:
logging.warning(f"In {sys._getframe().f_code.co_name}, the `event_type` must be in [\"1hit\", \"2hit\", (\"all\", \"mix\")].")
return
if side=="merged" and event_type=="mix":
event_type="all"
elif side in ["pt"] and event_type=="all":
event_type="mix"
region = pitch2region[pitch] if have_pitch else region
_f = os.path.join(FILE_PATH,
RESPONSE_INFO_TYPE[f"cdte_det_{side}_resp"],
f"Resp_3keVto30keV_CdTe{cdte}_reg{region}_{event_type}.rmf") if file is None else file
r = cdte_det_resp_rmf(_f)
r.update_function_path(sys._getframe().f_code.co_name)
r.detector = f"CdTe{cdte}-Detector-Response"
return r
[docs]
def cmos_det_resp(file=None, telescope=None):
"""Return the redistribution matrix for CMOS from a given file.
Parameters
----------
file : `str`
The file for the RMF.
telescope : `int`
Either 0 or 1.
Returns
-------
: `DetectorResponseOutput`
An object containing all the redistribution matrix information.
See accessible information using `.contents` on the output.
"""
if (telescope is None) or (telescope not in [0,1]):
logging.warning(f"The `telescope` input in {sys._getframe().f_code.co_name} must be 0 or 1.")
return
_f = os.path.join(FILE_PATH, RESPONSE_INFO_TYPE[f"cmos_det_telescope-{telescope}_resp"]) if file is None else file
with fits.open(_f) as hdul:
matrix, counts, energy = hdul[1].data<<(u.DN/u.ph), hdul[2].data<<u.DN, hdul[3].data<<u.keV # units?
# counts & energy come as the bin centers, not edges
# check for uniform binning and convert to bin edges...
# round to avoid differences in float precision
diff_counts, diff_energy = round(np.diff(counts), 3), round(np.diff(energy), 3)
assert np.min(diff_counts)==np.max(diff_counts), f"In {sys._getframe().f_code.co_name}, the counts axis values do not appear to follow uniform binning and so cannot be automatically converted from bin centers to edges:\n{diff_counts}"
assert np.min(diff_energy)==np.max(diff_energy), f"In {sys._getframe().f_code.co_name}, the energy axis values do not appear to follow uniform binning and so cannot be automatically converted from bin centers to edges:\n{diff_energy}"
counts_edges = np.append(counts-(0.5*diff_counts[0]), counts[-1]+(0.5*diff_counts[0]))
energy_edges = np.append(energy-(0.5*diff_energy[0]), energy[-1]+(0.5*diff_energy[0]))
return DetectorResponseOutput(filename=file,
function_path=f"{sys._getframe().f_code.co_name}",
input_energy_edges=energy_edges,
output_energy_edges=counts_edges,
detector_response=matrix,
detector=f"CMOS{telescope}-Detector-Response"
)
def _read_rmf(file):
"""
Read a .rmf file and extract useful information from it.
Parameters
----------
file : `str`, `file-like` or `pathlib.Path`
A .rmf file (see `~astropy.fits.io.open` for details).
Returns
-------
`tuple`
The low and high boundary of energy bins (data['energ_lo'],
data['energ_hi']), number of sub-set channels in the energybin
(data['n_grp']), starting index of each sub-set of channels
(data['f_chan']),number of channels in each sub-set
(data['n_chan']), redistribution matrix [counts per photon]
(data['matrix']).
"""
with fits.open(file) as hdul:
data = hdul[2].data
return data["energ_lo"], data["energ_hi"], data["n_grp"], data["f_chan"], data["n_chan"], data["matrix"]
[docs]
def col2arr_py(data):
"""Takes a list of parameters for each energy channel from a .rmf
file and returns it in the correct format [1].
This original function is taken from Sunkit-spex [2]
- Tweaked to change `r` below to `[r]` since the values in data
aren't lists (e.g., NuSTAR)
[1] https://lost-contact.mit.edu/afs/physics.wisc.edu/home/craigm/lib/idl/util/vcol2arr.pro
[2] https://github.com/sunpy/sunkit-spex
Parameters
----------
data : array/list-like object
One parameter's array/list from the .rmf file.
Returns
-------
A 2D numpy array of the correctly ordered input data.
Example
-------
>>> data = FITS_rec([( 1.6 , 1.64, 1, [0] , [18] , [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]),
( 1.64, 1.68, 1, [0] , [20] , [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]),
( 1.68, 1.72, 2, [0,22], [20,1], [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]),
dtype=(numpy.record, [('ENERG_LO', '>f4'), ('ENERG_HI', '>f4'), ('N_GRP', '>i2'),
('F_CHAN', '>i4', (2,)), ('N_CHAN', '>i4', (2,)), ('MATRIX', '>i4', (2,))]))
# max row length of 2 so 2 columns, each row is an energy channel.
>>> col2arr_py(data['F_CHAN'])
array([[ 0., 0.],
[ 0., 0.],
[ 0., 22.]])
"""
# this is the quicker way I have chosen to do in Python (this may be revised later but is ~30x faster than way below in Python)
max_len = np.max([len([r]) for r in data]) # find max row length
chan_array_py = np.array(
[[*[r], *(max_len - len([r])) * [0]] for r in data]
) # make each row that length (padding with 0)
return chan_array_py
[docs]
def vrmf2arr_py(data=None, n_grp_list=None, f_chan_array=None, n_chan_array=None):
"""Takes redistribution parameters for each energy channel from a
.rmf file and returns it in the correct format [1].
This original function is taken from Sunkit-spex [2].
[1] https://lost-contact.mit.edu/afs/physics.wisc.edu/home/craigm/lib/idl/spectral/vrmf2arr.pro
[2] https://github.com/sunpy/sunkit-spex
Parameters
----------
data : array/list-like object
Redistribution matrix parameter array/list from the .rmf
file. Units are counts per photon.
Default : None
no_of_channels : int
Number of entries is the total number of photon channels,
the entries themselves show the total number
of count channels to which that photon channel contributes.
Default : None
f_chan_array : numpy.array
The index of each sub-set channel from each energy bin from
the .rmf file run through col2arr_py().
Default : None
n_chan_array : numpy.array
The number of sub-set channels in each index for each energy
bin from the .rmf file run through col2arr_py().
Default : None
Returns
-------
A 2D numpy array of the correctly ordered input data with dimensions
of energy in the rows and channels in the columns.
Example
-------
>>> d_rmf = 'directory/'
>>> f_rmf = 'file.rmf'
>>> e_lo, e_hi, ngrp, fchan, nchan, matrix = det_io._read_rmf(d_rmf+f_rmf)
>>> fchan_array = nu_spec.col2arr_py(fchan)
>>> nchan_array = nu_spec.col2arr_py(nchan)
>>> rmf = nu_spec.vrmf2arr_py(data=matrix,
n_grp_list=ngrp,
f_chan_array=fchan_array,
n_chan_array=nchan_array)
# rows = photon/energy channels, columns = counts channels
>>> rmf
array([[0.00033627, 0.0007369 , 0.00113175, ..., 0. , 0. , 0. ],
[0.00039195, 0.00079259, 0.00138341, ..., 0. , 0. , 0. ],
[0.00042811, 0.00083381, 0.00157794, ..., 0. , 0. , 0. ],
...,
[0. , 0. , 0. , ..., 0.00408081, 0.00409889, 0.00403308],
[0. , 0. , 0. , ..., 0.00405333, 0.00413722, 0.00413216],
[0. , 0. , 0. , ..., 0. , 0. , 0. ]])
What's Going On?
----------------
The RMF file has the photon-to-counts conversion information in it.
The martix has the photon-to-count conversion value for each count
channel (column) that is involved with theach photon channel (rows).
E.g., matrix = [ [a, b, c, d, e, f, ...] , ... ]
F_chan is the starting index of contiguous counts channels that are
involved with the photon channel.
E.g., f_chan = [ [0, 5, 0, 0, 0, ...] , ... ]
For the first photon channel, there are rows of counts channels
starting at index 0 and 5 N_chan is the corresponding number of
counts channels from each index in the f_chan array.
E.g., n_chan = [ [2, 3, 0, 0, 0, ...] , ... ]
Starting at index 0 for the first photon channel we have the first 2
matrix values, then at index 5 we have the next 3.
The total of each row is the same as the n_grp_list and the number
of entries in each row of the matrix entry.
Putting all this together, the rmf matrix is:
rmf_matrix = [ [a, b, 0, 0, 0, c , d , e, 0 , 0 , ...] , ... ]
Index 0 (f_chan) with 2 entries (n_chan) with photon-to-counts
conversion (matrix).
"""
# this was is about >6x quicker in than the IDL code written in Python
n_grp_list = n_grp_list.astype("<i2") # change from ‘big-endian’ (">i2") to ‘little-endian’ ("<i2")
# find the non-zero entries in Nchan, this is the number to counts channels
# in a row that contribute so will have a value if it is useful
b = np.nonzero(n_chan_array)
# now only want the useful entries from the pre-formatted Nchan and Fchan arrays
c = f_chan_array[b]
d = n_chan_array[b]
# to help with indexing, this provides a running sum of the number of counts
# channels that a single photon channel contributes to
e = np.cumsum(n_chan_array, axis=1)
# these entries will give the final indices in the row on counts channels
final_inds = e[b]
# need to find the starting index so -1, but that means any entry that is
# -1 will be where a zero is needed
starting_inds = b[1] - 1
# get the starting indices but the ones that should be 0 are replaced with
# the final one in the list at the minute (-1 in starting_inds)
start_inds = np.cumsum(n_chan_array, axis=1)[(b[0], starting_inds)]
# where starting_inds==-1 that value should be 0, i.e. starting from the first
# value in the rmf matrix
new_e = np.where(starting_inds != -1, start_inds, 0)
# initialise the rmf matrix
mat_array_py = np.zeros((len(data), len(n_grp_list)))
# now go through row by row (this is the slowest part and needs to be made faster).
# Here we go through each photon channel's number of discrete rows of counts channels.
for r in range(len(c)):
mat_array_py[b[0][r], c[r] : c[r] + d[r]] = data[b[0][r]][new_e[r] : final_inds[r]]
return mat_array_py
[docs]
def asset_cmos_resp(save_location=None):
"""Plot the CMOS responses."""
# CMOS
fig = plt.figure(figsize=(12,7))
gs = gridspec.GridSpec(1, 2)
gs_ax0 = fig.add_subplot(gs[0, 0])
telescope = 0
cmos0_resp = cmos_det_resp(file=None, telescope=telescope)
extent = [np.min(cmos0_resp.output_energy_edges.value),
np.max(cmos0_resp.output_energy_edges.value),
np.min(cmos0_resp.input_energy_edges.value),
np.max(cmos0_resp.input_energy_edges.value)]
i = gs_ax0.imshow(cmos0_resp.detector_response.value, origin="lower", aspect=(extent[1]-extent[0])/(extent[3]-extent[2]), extent=extent, norm=LogNorm())
gs_ax0.set_ylim([0,10])
gs_ax0.set_xlabel(f"Counts [{cmos0_resp.output_energy_edges.unit:latex}]")
gs_ax0.set_ylabel(f"Energy [{cmos0_resp.input_energy_edges.unit:latex}]")
gs_ax0.set_title(f"CMOS{telescope}")
cax = gs_ax0.inset_axes([0.1, 0.08, 0.8, 0.05],)
cbar = fig.colorbar(i, cax=cax, orientation='horizontal')
cbar.set_label(f"{cmos0_resp.detector_response.unit:latex}", size=8, labelpad=0)
cax.tick_params(axis='both', which='major', labelsize=6)
gs_ax1 = fig.add_subplot(gs[0, 1])
telescope = 1
cmos1_resp = cmos_det_resp(file=None, telescope=telescope)
extent = [np.min(cmos1_resp.output_energy_edges.value),
np.max(cmos1_resp.output_energy_edges.value),
np.min(cmos1_resp.input_energy_edges.value),
np.max(cmos1_resp.input_energy_edges.value)]
i = gs_ax1.imshow(cmos1_resp.detector_response.value, origin="lower", aspect=(extent[1]-extent[0])/(extent[3]-extent[2]), extent=extent, norm=LogNorm())
gs_ax1.set_ylim([0,10])
gs_ax1.set_xlabel(f"Counts [{cmos1_resp.output_energy_edges.unit:latex}]")
gs_ax1.set_ylabel(f"Energy [{cmos1_resp.input_energy_edges.unit:latex}]")
gs_ax1.set_title(f"CMOS{telescope}")
cax = gs_ax1.inset_axes([0.1, 0.08, 0.8, 0.05],)
cbar = fig.colorbar(i, cax=cax, orientation='horizontal')
cbar.set_label(f"{cmos1_resp.detector_response.unit:latex}", size=8, labelpad=0)
cax.tick_params(axis='both', which='major', labelsize=6)
plt.tight_layout()
if save_location is not None:
pathlib.Path(save_location).mkdir(parents=True, exist_ok=True)
plt.savefig(os.path.join(save_location,"cmos-response-matrices.png"), dpi=200, bbox_inches="tight")
plt.show()
[docs]
def asset_cdte_resp(save_location=None):
"""Plot the CdTe1, region 0, 1-hit response."""
# CdTe
d_rmf = os.path.join(FILE_PATH, RESPONSE_INFO_TYPE[f"cdte_det_pt_resp"])
f_rmf = "Resp_3keVto30keV_CdTe1_reg0_1hit.rmf"
cdte_resp = cdte_det_resp_rmf(os.path.join(pathlib.Path(__file__).parent, d_rmf, f_rmf))
fig = plt.figure(figsize=(12, 5))
gs = gridspec.GridSpec(1, 2)
gs_ax0 = fig.add_subplot(gs[0, 0])
r = gs_ax0.imshow(cdte_resp.detector_response.value,
origin="lower",
norm=Normalize(vmin=0.001,
vmax=np.max(cdte_resp.detector_response.value)*0.9),
extent=[np.min(cdte_resp.output_energy_edges.value),
np.max(cdte_resp.output_energy_edges.value),
np.min(cdte_resp.input_energy_edges.value),
np.max(cdte_resp.input_energy_edges.value)]
)
cbar = plt.colorbar(r)
cbar.ax.set_ylabel(f"Response [{cdte_resp.detector_response.unit:latex}]")
gs_ax0.set_xlabel(f"Count Energy [{cdte_resp.output_energy_edges.unit:latex}]")
gs_ax0.set_ylabel(f"Photon Energy [{cdte_resp.input_energy_edges.unit:latex}]")
gs_ax0.set_title("Linear Scale")
gs_ax1 = fig.add_subplot(gs[0, 1])
r = gs_ax1.imshow(cdte_resp.detector_response.value,
origin="lower",
norm=LogNorm(vmin=0.001,
vmax=np.max(cdte_resp.detector_response.value)*0.9),
extent=[np.min(cdte_resp.output_energy_edges.value),
np.max(cdte_resp.output_energy_edges.value),
np.min(cdte_resp.input_energy_edges.value),
np.max(cdte_resp.input_energy_edges.value)]
)
cbar = plt.colorbar(r)
cbar.ax.set_ylabel(f"Response [{cdte_resp.detector_response.unit:latex}]")
gs_ax1.set_xlabel(f"Count Energy [{cdte_resp.output_energy_edges.unit:latex}]")
gs_ax1.set_ylabel(f"Photon Energy [{cdte_resp.input_energy_edges.unit:latex}]")
gs_ax1.set_title("Log Scale")
fig.suptitle(f_rmf)
if save_location is not None:
pathlib.Path(save_location).mkdir(parents=True, exist_ok=True)
plt.savefig(os.path.join(save_location,"cdte-response-matrix.png"), dpi=200, bbox_inches="tight")
plt.show()
if __name__=="__main__":
save_location = None # ASSETS_PATH
asset_cdte_resp(save_location=save_location)
asset_cmos_resp(save_location=save_location)