"""
Module for guiding Arc/Sky line tracing
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import os
import copy
import inspect
from IPython import embed
from pathlib import Path
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.lines import Line2D
from astropy import stats, visualization
from astropy import table
from pypeit import msgs, datamodel, utils
from pypeit import calibframe
from pypeit import slittrace, wavecalib
from pypeit.display import display
from pypeit.core import arc
from pypeit.core import tracewave
from pypeit.images import buildimage
[docs]
class WaveTilts(calibframe.CalibFrame):
"""
Calibration frame containing the wavelength tilt calibration.
All of the items in the datamodel are required for instantiation, although
they can be None (but shouldn't be)
The datamodel attributes are:
.. include:: ../include/class_datamodel_wavetilts.rst
When written to an output-file HDU, all `numpy.ndarray`_ elements are
bundled into an `astropy.io.fits.BinTableHDU`_, and the other elements are
written as header keywords. Any datamodel elements that are None are *not*
included in the output.
"""
version = '1.2.0'
# Calibration frame attributes
calib_type = 'Tilts'
calib_file_format = 'fits'
# NOTE:
# - Internals are identical to the base class
# - Datamodel already contains CalibFrame base elements, so no need to
# include it here.
datamodel = {'PYP_SPEC': dict(otype=str, descr='PypeIt spectrograph name'),
'coeffs': dict(otype=np.ndarray, atype=np.floating,
descr='2D coefficents for the fit on the initial slits. One '
'set per slit/order (3D array).'),
'bpmtilts': dict(otype=np.ndarray, atype=np.integer,
descr='Bad pixel mask for tilt solutions. Keys are taken from '
'SlitTraceSetBitmask'),
'nslit': dict(otype=int,
descr='Total number of slits. This can include masked slits'),
'spat_id': dict(otype=np.ndarray, atype=np.integer, descr='Slit spat_id '),
'spat_order': dict(otype=np.ndarray, atype=np.integer,
descr='Order for spatial fit (nslit)'),
'spec_order': dict(otype=np.ndarray, atype=np.integer,
descr='Order for spectral fit (nslit)'),
'func2d': dict(otype=str, descr='Function used for the 2D fit'),
'spat_flexure': dict(otype=float, descr='Flexure shift from the input TiltImage'),
'slits_filename': dict(otype=str, descr='Path to SlitTraceSet file. This helps to '
'find the Slits calibration file when running '
'pypeit_chk_tilts()'),
'tiltimg_filename': dict(otype=str, descr='Path to Tiltimg file. This helps to '
'find Tiltimg file when running pypeit_chk_tilts()'),
'tilt_traces': dict(otype=table.Table, descr='Table with the positions of the '
'traced and fitted tilts for all the slits. '
'see :func:`~pypeit.wavetilts.BuildWaveTilts.make_tbl_tilt_traces` for more details. ')
}
def __init__(self, coeffs, nslit, spat_id, spat_order, spec_order, func2d, bpmtilts=None,
spat_flexure=None, PYP_SPEC=None, slits_filename=None, tiltimg_filename=None,
tilt_traces=None):
# Parse
args, _, _, values = inspect.getargvalues(inspect.currentframe())
d = dict([(k,values[k]) for k in args[1:]])
# Setup the DataContainer
datamodel.DataContainer.__init__(self, d=d)
[docs]
def _bundle(self):
"""
Bundle the data in preparation for writing to a fits file.
See :func:`~pypeit.datamodel.DataContainer._bundle`. Data is
always written to a 'TILTS' extension.
"""
_d = super(WaveTilts, self)._bundle(ext='TILTS')
# similarly to SlitTraceSet
# save the table
tab_detached = _d[0]['TILTS']['tilt_traces']
# remove `tab_detached` from the dict
_d[0]['TILTS'].pop('tilt_traces')
# add the table as a separate extension
return [_d[0], {'tilt_traces': tab_detached}]
[docs]
def is_synced(self, slits):
"""
Confirm the slits in WaveTilts are aligned to that in SlitTraceSet
Barfs if not
Args:
slits (:class:`~pypeit.slittrace.SlitTraceSet`):
"""
if not np.array_equal(self.spat_id, slits.spat_id):
msgs.error('Your tilt solutions are out of sync with your slits. Remove calibrations '
'and restart from scratch.')
[docs]
def fit2tiltimg(self, slitmask, flexure=None):
"""
Generate a tilt image from the fit parameters
Mainly to allow for flexure
Args:
slitmask (`numpy.ndarray`_):
??
flexure (float, optional):
Spatial shift of the tilt image onto the desired frame
(typically a science image)
Returns:
`numpy.ndarray`_: New tilt image
"""
msgs.info("Generating a tilts image from the fit parameters")
_flexure = 0. if flexure is None else flexure
final_tilts = np.zeros_like(slitmask).astype(float)
gdslit_spat = np.unique(slitmask[slitmask >= 0]).astype(int)
# Loop
for slit_spat in gdslit_spat:
slit_idx = self.spatid_to_zero(slit_spat)
# Calculate
coeff_out = self.coeffs[:self.spec_order[slit_idx]+1,:self.spat_order[slit_idx]+1,slit_idx]
_tilts = tracewave.fit2tilts(final_tilts.shape, coeff_out, self.func2d, spat_shift=-1*_flexure)
# Fill
thismask_science = slitmask == slit_spat
final_tilts[thismask_science] = _tilts[thismask_science]
# Return
return final_tilts
[docs]
def spatid_to_zero(self, spat_id):
"""
Convert slit spat_id to zero-based
Mainly for coeffs
Args:
spat_id (int):
Returns:
int:
"""
mtch = self.spat_id == spat_id
return np.where(mtch)[0][0]
[docs]
def show(self, waveimg=None, wcs_match=True, in_ginga=True, show_traces=False,
chk_version=True):
"""
Show in ginga or mpl Tiltimg with the tilts traced and fitted overlaid
Args:
waveimg (`numpy.ndarray`_, optional):
Image with the wavelength solution.
wcs_match (bool, optional):
If True, use this image as a reference image for the WCS and match all
image in other channels to it.
in_ginga (bool, optional):
If True, show the image in ginga. Otherwise, use matplotlib.
show_traces (bool, optional):
If True, show the traces of the tilts on the image.
chk_version (:obj:`bool`, optional):
When reading in existing files written by PypeIt, perform strict
version checking to ensure a valid file. If False, the code
will try to keep going, but this may lead to faults and quiet
failures. User beware!
"""
# get tilt_img_dict
cal_file = Path(self.calib_dir).absolute() / self.tiltimg_filename
if cal_file.exists():
tilt_img_dict = buildimage.TiltImage.from_file(cal_file, chk_version=chk_version)
else:
msgs.error(f'Tilt image {str(cal_file)} NOT FOUND.')
# get slits
slitmask = None
cal_file = Path(self.calib_dir).absolute() / self.slits_filename
if cal_file.exists():
slits = slittrace.SlitTraceSet.from_file(cal_file, chk_version=chk_version)
_slitmask = slits.slit_img(initial=True, flexure=self.spat_flexure)
_left, _right, _mask = slits.select_edges(flexure=self.spat_flexure)
gpm = _mask == 0
# resize
slitmask = arc.resize_mask2arc(tilt_img_dict.image.shape, _slitmask)
left = arc.resize_slits2arc(tilt_img_dict.image.shape, _slitmask.shape, _left)
right = arc.resize_slits2arc(tilt_img_dict.image.shape, _slitmask.shape, _right)
else:
slits = None
msgs.warn(f'Slits file {str(cal_file)} NOT FOUND.')
# get waveimg
same_size = (slits.nspec, slits.nspat) == tilt_img_dict.image.shape
if waveimg is None and slits is not None and same_size and in_ginga:
wv_calib_name = wavecalib.WaveCalib.construct_file_name(self.calib_key, calib_dir=self.calib_dir)
if Path(wv_calib_name).absolute().exists():
wv_calib = wavecalib.WaveCalib.from_file(wv_calib_name, chk_version=chk_version)
tilts = self.fit2tiltimg(slitmask, flexure=self.spat_flexure)
waveimg = wv_calib.build_waveimg(tilts, slits, spat_flexure=self.spat_flexure)
else:
msgs.warn('Could not load Wave image to show with tilts image.')
# Show
# tilt image
tilt_img = tilt_img_dict.image * (slitmask > -1) if slitmask is not None else tilt_img_dict.image
# set cuts
zmax = stats.sigma_clip(tilt_img, sigma=10, return_bounds=True)[2]
zmin = stats.sigma_clip(tilt_img, sigma=5, return_bounds=True)[1] * 2
cut = (zmin, zmax)
# show in ginga
if in_ginga:
# connect to ginga
display.connect_to_ginga(raise_err=True, allow_new=True)
# display image
viewer, ch = display.show_image(tilt_img, chname='Tilts', cuts=cut,
wcs_match=wcs_match, waveimg=waveimg, clear=True)
# show tilts
display.show_tilts(viewer, ch, self.tilt_traces, points=show_traces, nspec=tilt_img.shape[0])
# show slit edges
if slits is not None:
display.show_slits(viewer, ch, left[:, gpm], right[:, gpm], slit_ids=slits.spat_id[gpm])
# show in matplotlib
else:
if slits is not None:
show_tilts_mpl(tilt_img, self.tilt_traces, show_traces=show_traces, left_edges=left[:, gpm],
right_edges=right[:, gpm], slit_ids=slits.spat_id[gpm], cut=cut)
else:
show_tilts_mpl(tilt_img, self.tilt_traces, show_traces=show_traces, cut=cut)
[docs]
class BuildWaveTilts:
"""
Class to guide arc/sky tracing
Args:
mstilt (:class:`~pypeit.images.buildimage.TiltImage`):
Tilt image. QA file naming inherits the calibration key
(``calib_key``) from this object.
slits (:class:`~pypeit.slittrace.SlitTraceSet`):
Slit edges
spectrograph (:class:`~pypeit.spectrographs.spectrograph.Spectrograph`):
Spectrograph object
par (:class:`~pypeit.par.pypeitpar.WaveTiltsPar` or None):
The parameters used to fuss with the tilts
wavepar (:class:`~pypeit.par.pypeitpar.WavelengthSolutionPar` or None):
The parameters used for the wavelength solution
det (int): Detector index
qa_path (:obj:`str`, optional):
Directory for QA output.
spat_flexure (float, optional):
If input, the slitmask and slit edges are shifted prior
to tilt analysis.
Attributes:
spectrograph (:class:`~pypeit.spectrographs.spectrograph.Spectrograph`):
??
steps (list):
??
mask (`numpy.ndarray`_):
boolean array; True = Ignore this slit
all_trcdict (list):
All trace dict's
tilts (`numpy.ndarray`_):
Tilts for a single slit/order
all_tilts (list):
Tuple of tilts `numpy.ndarray`_ objects
final_tilts (`numpy.ndarray`_):
Final tilts image
gpm (`numpy.ndarray`_):
Good pixel mask. Eventually, we might attach this to self.mstilt
although that would then require that we write it to disk with
self.mstilt.image
"""
# TODO This needs to be modified to take an inmask
def __init__(self, mstilt, slits, spectrograph, par, wavepar, det=1, qa_path=None,
spat_flexure=None):
# TODO: Perform type checking
self.spectrograph = spectrograph
self.par = par
self.wavepar = wavepar
self.mstilt = mstilt
self.slits = slits
self.det = det
self.qa_path = qa_path
self.spat_flexure = spat_flexure
# Get the non-linear count level
# TODO: This is currently hacked to deal with Mosaics
try:
self.nonlinear_counts = self.mstilt.detector.nonlinear_counts()
except:
self.nonlinear_counts = 1e10
# Set the slitmask and slit boundary related attributes that the
# code needs for execution. This also deals with arcimages that
# have a different binning then the trace images used to defined
# the slits
# TODO -- Tidy this up into one or two methods?
# Load up all slits
# TODO -- Discuss further with JFH
all_left, all_right, mask = self.slits.select_edges(initial=True, flexure=self.spat_flexure) # Grabs all, initial slits
# self.tilt_bpm = np.invert(mask == 0)
# At this point of the reduction the only bitmask flags that may have been generated are 'USERIGNORE',
# 'SHORTSLIT', 'BOXSLIT' and 'BADWVCALIB'. Here we use only 'USERIGNORE' and 'SHORTSLIT' to create the bpm mask
self.tilt_bpm = self.slits.bitmask.flagged(mask, flag=['SHORTSLIT', 'USERIGNORE'])
self.tilt_bpm_init = self.tilt_bpm.copy()
# Slitmask
# TODO -- Discuss further with JFH
self.slitmask_science = self.slits.slit_img(initial=True, flexure=self.spat_flexure, exclude_flag=['BOXSLIT']) # All unmasked slits
# Resize
# TODO: Should this be the bpm or *any* flag?
gpm = self.mstilt.select_flag(flag='BPM', invert=True) if self.mstilt is not None \
else np.ones_like(self.slitmask_science, dtype=bool)
self.shape_science = self.slitmask_science.shape
self.shape_tilt = self.mstilt.image.shape
self.slitcen = arc.resize_slits2arc(self.shape_tilt, self.shape_science, (all_left+all_right)/2)
self.slitmask = arc.resize_mask2arc(self.shape_tilt, self.slitmask_science)
self.gpm = (arc.resize_mask2arc(self.shape_tilt, gpm)) & (self.mstilt.image < self.nonlinear_counts)
# --------------------------------------------------------------
# Key Internals
self.mask = None
self.all_trace_dict = [None]*self.slits.nslits
self.tilts = None
# 2D fits are stored as a dictionary rather than list because we will jsonify the dict
self.all_fit_dict = [None]*self.slits.nslits
self.steps = []
# Main outputs
self.final_tilts = None
self.fit_dict = None
self.trace_dict = None
[docs]
def find_lines(self, arcspec, slit_cen, slit_idx, bpm=None, debug=False):
"""
Find the lines for tracing
Wrapper to tracewave.tilts_find_lines()
Args:
arcspec ():
??
slit_cen ():
??
slit_idx (int):
Slit index, zero-based
bpm (`numpy.ndarray`_, optional):
??
debug (bool, optional):
??
Returns:
tuple: 2 objectcs
- `numpy.ndarray`_ or None: Spectral positions of lines to trace
- `numpy.ndarray`_ or None: Spatial positions of lines to trace
"""
# TODO: Implement this!
only_these_lines = None
if self.par['idsonly']:
# Put in some hook here for getting the lines out of the
# wave calib for i.e. LRIS ghosts.
raise NotImplementedError('Select lines with IDs for tracing not yet implemented.')
# TODO -- This should be order not slit!
tracethresh = self._parse_param(self.par, 'tracethresh', slit_idx)
lines_spec, lines_spat, good \
= tracewave.tilts_find_lines(arcspec, slit_cen, tracethresh=tracethresh,
sig_neigh=self.par['sig_neigh'],
nfwhm_neigh=self.par['nfwhm_neigh'],
only_these_lines=only_these_lines,
fwhm=self.wavepar['fwhm'],
nonlinear_counts=self.nonlinear_counts,
bpm=bpm, debug_peaks=False, debug_lines=debug)
if debug:
mean, median, stddev = stats.sigma_clipped_stats(self.mstilt.image, sigma=3.)
# vmin, vmax = visualization.ZScaleInterval().get_limits(self.mstilt.image)
vmin = median - 2*stddev
vmax = median + 2*stddev
plt.imshow(self.mstilt.image, origin='lower', interpolation='nearest', aspect='auto',
vmin=vmin, vmax=vmax)
plt.scatter(lines_spat[good], lines_spec[good], marker='x', color='k', lw=2, s=50)
plt.scatter(lines_spat[np.invert(good)], lines_spec[np.invert(good)], marker='x', color='C3', lw=2, s=50)
plt.show()
self.steps.append(inspect.stack()[0][3])
return (None, None) if lines_spec is None else (lines_spec[good], lines_spat[good])
[docs]
def fit_tilts(self, trc_tilt_dict, thismask, slit_cen, spat_order, spec_order, slit_idx,
show_QA=False, doqa=True):
"""
Fit the tilts
all_fit_dict and all_trace_dict are filled in place
Args:
trc_tilt_dict (dict): Contains information from tilt tracing
slit_cen (`numpy.ndarray`_): (nspec,) Central trace for this slit
spat_order (int): Order of the 2d polynomial fit for the spatial direction
spec_order (int): Order of the 2d polytnomial fit for the spectral direction
slit_idx (int): zero-based, integer index for the slit in question
show_QA (bool, optional):
show the QA instead of writing it out to the outfile
doqa (bool, optional):
Construct the QA plot
Returns:
`numpy.ndarray`_: Array containing the coefficients for the 2d
legendre polynomial fit. Shape is (spat_order + 1, spec_order+1).
"""
# Index
self.all_fit_dict[slit_idx], self.all_trace_dict[slit_idx] \
= tracewave.fit_tilts(trc_tilt_dict, thismask, slit_cen, spat_order=spat_order,
spec_order=spec_order,maxdev=self.par['maxdev2d'],
sigrej=self.par['sigrej2d'], func2d=self.par['func2d'],
doqa=doqa, calib_key=self.mstilt.calib_key,
slitord_id=self.slits.slitord_id[slit_idx],
minmax_extrap=self.par['minmax_extrap'],
show_QA=show_QA, out_dir=self.qa_path)
self.steps.append(inspect.stack()[0][3])
return self.all_fit_dict[slit_idx]['coeff2']
[docs]
def trace_tilts(self, arcimg, lines_spec, lines_spat, thismask, slit_cen,
debug_pca=False, show_tracefits=False):
"""
Trace the tilts
Args:
arcimg (`numpy.ndarray`_):
Arc image. Shape is (nspec, nspat).
lines_spec (`numpy.ndarray`_):
Array containing the spectral pixel location of each
line found for this slit. Shape is (nlines,).
lines_spat (`numpy.ndarray`_):
Array containing the spatial pixel location of each line,
which is the slitcen evaluate at the spectral position
position of the line stored in lines_spec. Shape is
(nlines,).
thismask (`numpy.ndarray`_):
Image indicating which pixels lie on the slit in
equation. True = on the slit. False = not on slit. Shape
is (nspec, nspat) with dtype=bool.
slit_cen (:obj:`int`):
Integer index indicating the slit in question.
Returns:
dict: Dictionary containing information on the traced tilts required
to fit the filts.
"""
trace_dict = tracewave.trace_tilts(arcimg, lines_spec, lines_spat, thismask, slit_cen,
inmask=self.gpm, fwhm=self.wavepar['fwhm'],
spat_order=self.par['spat_order'],
maxdev_tracefit=self.par['maxdev_tracefit'],
sigrej_trace=self.par['sigrej_trace'],
debug_pca=debug_pca, show_tracefits=show_tracefits)
# Return
self.steps.append(inspect.stack()[0][3])
return trace_dict
[docs]
def model_arc_continuum(self, debug=False):
"""
Model the continuum of the arc image.
The method uses the arc spectra extracted using
:attr:`extract_arcs` and fits a characteristic low-order
continuum for each slit/order using
:func:`~pypeit.core.fitting.robust_fit` and the parameters
`cont_function`, `cont_order`, and `cont_rej` from
:attr:`par`. The characteristic continuum is then rescaled to
match the continuum at each spatial position in the
slit/order.
.. note::
The approach used here may be too simplistic (in the
robustness of the continuum fit and then how the
continuum is rescaled and projected for each slit/order).
Tests should be performed to make sure that the approach
is good enough to trace the centroid of the arc/sky lines
without biasing the centroids due to a non-zero continuum
level.
Args:
debug (:obj:`bool`, optional):
Run the method in debug mode.
Returns:
`numpy.ndarray`_: Returns a 2D image with the same shape as
:attr:`mstilt` with the model continuum.
"""
# TODO: Should make this operation part of WaveTiltsPar ...
# Parse the upper and lower sigma rejection thresholds; used
# when rescaling continuum from center spectrum.
lower_rej, upper_rej = self.par['cont_rej'] if hasattr(self.par['cont_rej'], '__len__') \
else np.repeat(self.par['cont_rej'], 2)
# Fit the continuum of the extracted arc spectra for each slit
nspec, nslits = self.arccen.shape
spec = np.arange(nspec, dtype=float)
arc_continuum = np.zeros(self.arccen.shape, dtype=float)
arc_fitmask = np.zeros(self.arccen.shape, dtype=bool)
for i in range(nslits):
if self.tilt_bpm[i]:
continue
# TODO: What to do with the following iter_continuum parameters?:
# sigthresh, sigrej, niter_cont, cont_samp, cont_frac_fwhm
arc_continuum[:,i], arc_fitmask[:,i] \
= arc.iter_continuum(self.arccen[:,i], gpm=np.invert(self.arccen_bpm[:,i]),
fwhm=self.wavepar['fwhm'])
# TODO: Original version. Please leave it for now.
# arc_fitmask[:,i], coeff \
# = utils.robust_polyfit_djs(spec, self.arccen[:,i], self.par['cont_order'],
# function=self.par['cont_function'],
# minx=spec[0], maxx=spec[-1],
# upper=upper_rej, lower=lower_rej, use_mad=True,
# sticky=True)
# arc_continuum[:,i] = utils.func_val(coeff, spec, self.par['cont_function'],
# minx=spec[0], maxx=spec[-1])
if debug:
plt.plot(spec, self.arccen[:,i], color='k', label='Arc')
plt.scatter(spec, np.ma.MaskedArray(self.arccen[:,i], mask=arc_fitmask[:,i]),
marker='x', s=10, color='C1', label='Ignored')
plt.plot(arc_continuum[:,i], color='C3', label='Cont. Fit')
plt.xlabel('Spectral pixel')
plt.ylabel('Counts')
plt.legend()
plt.show()
# For each slit, rescale the continuum to the spectrum at a
# given spatial position along the slit/order. This
# implementation may be too simplistic in how it treats the
# spatial axis.
nspat = self.slitmask.shape[1]
cont_image = np.zeros(self.mstilt.image.shape, dtype=float)
# TODO: Can probably do this without the for loop but this
# still may be faster.
for i in range(nslits):
# Masked?
if self.tilt_bpm[i]:
continue
# Find the pixels in this slit
indx = self.slitmask == self.slits.spat_id[i]
# Set a single width for the slit to simplify the
# calculation
width = np.sum(indx, axis=1)
width = int(np.amax(width[np.invert(self.arccen_bpm[:,i])]))
# Get the spatial indices for spectral pixels in the
# spatial dimension that follow the curvature of the slit
# center.
# TODO: May need to be more sophisticated.
_spat = (self.slitcen[:,i,None] + np.arange(width)[None,:] - width//2).astype(int)
# Set the index of masked pixels or those off the detector
# to -1 so that they don't cause the image indexing to
# fault and can be selected for masking
_spat[(_spat < 0) | (_spat >= nspat) | self.arccen_bpm[:,i,None]] = -1
# Pull out the slit pixels into a square array and mask
# pixels off of the slit
aligned_spec = np.tile(np.arange(nspec), (width,1)).T
aligned_flux = np.ma.MaskedArray(self.mstilt.image[aligned_spec, _spat],
mask=_spat==-1)
# Use a sigma-clipped median to determine the scaling of
# the continuum fit to the central extracted spectrum to
# match the spectrum at each spatial position.
# TODO: Instead of determining the scale factor directly,
# use the slit-illumination profile?
cont_renorm = stats.sigma_clipped_stats(aligned_flux/arc_continuum[:,i,None],
sigma_lower=lower_rej, sigma_upper=upper_rej,
axis=0)[1]
# Fill the image with the continuum for this slit
indx = np.invert(aligned_flux.mask)
cont_image[aligned_spec[indx], _spat[indx]] \
= (arc_continuum[:,i,None] * cont_renorm[None,:])[indx]
# Remove continuum measurements that are off any slits (because
# of the fixed width assumption)
cont_image[self.slitmask == -1] = 0.
return cont_image
[docs]
def run(self, doqa=True, debug=False, show=False):
"""
Main driver for tracing arc lines
Code flow:
#. Extract an arc spectrum down the center of each slit/order
#. Loop on slits/orders
#. Trace and fit the arc lines (This is done twice, once
with trace_crude as the tracing crutch, then again
with a PCA model fit as the crutch).
#. Repeat trace.
#. 2D Fit to the offset from slitcen
#. Save
Args:
doqa (bool):
debug (bool):
show (bool):
Returns:
:class:`WaveTilts`:
"""
# Extract the arc spectra for all slits
self.arccen, self.arccen_bpm = self.extract_arcs()
# TODO: Leave for now. Used for debugging
#self.par['rm_continuum'] = True
#debug = True
#show = True
# Subtract arc continuum
_mstilt = self.mstilt.image.copy()
if self.par['rm_continuum']:
continuum = self.model_arc_continuum(debug=debug)
_mstilt -= continuum
if debug:
# TODO: Put this into a function
vmin, vmax = visualization.ZScaleInterval().get_limits(_mstilt)
w,h = plt.figaspect(1)
fig = plt.figure(figsize=(3*w,h))
ax = fig.add_axes([0.15/3, 0.1, 0.8/3, 0.8])
ax.imshow(self.mstilt.image, origin='lower', interpolation='nearest',
aspect='auto', vmin=vmin, vmax=vmax)
ax.set_title('Arc')
ax = fig.add_axes([1.15/3, 0.1, 0.8/3, 0.8])
ax.imshow(continuum, origin='lower', interpolation='nearest',
aspect='auto', vmin=vmin, vmax=vmax)
ax.set_title('Continuum')
ax = fig.add_axes([2.15/3, 0.1, 0.8/3, 0.8])
ax.imshow(_mstilt, origin='lower', interpolation='nearest',
aspect='auto', vmin=vmin, vmax=vmax)
ax.set_title('Arc - Continuum')
plt.show()
# Final tilts image
self.final_tilts = np.zeros(self.shape_science,dtype=float)
max_spat_dim = (np.asarray(self.par['spat_order']) + 1).max()
max_spec_dim = (np.asarray(self.par['spec_order']) + 1).max()
self.coeffs = np.zeros((max_spec_dim, max_spat_dim,self.slits.nslits))
self.spat_order = np.zeros(self.slits.nslits, dtype=int)
self.spec_order = np.zeros(self.slits.nslits, dtype=int)
# Loop on all slits
for slit_idx, slit_spat in enumerate(self.slits.spat_id):
if self.tilt_bpm[slit_idx]:
msgs.info(f'Skipping bad slit/order {self.slits.slitord_id[slit_idx]} ({slit_idx+1}/{self.slits.nslits})')
self.slits.mask[slit_idx] = self.slits.bitmask.turn_on(self.slits.mask[slit_idx], 'BADTILTCALIB')
continue
msgs.info(f'Computing tilts for slit/order {self.slits.slitord_id[slit_idx]} ({slit_idx+1}/{self.slits.nslits})')
# Identify lines for tracing tilts
msgs.info('Finding lines for tilt analysis')
self.lines_spec, self.lines_spat \
= self.find_lines(self.arccen[:,slit_idx], self.slitcen[:,slit_idx],
slit_idx,
bpm=self.arccen_bpm[:,slit_idx], debug=debug)
if self.lines_spec is None:
msgs.warn('Did not recover any lines for slit/order = {:d}'.format(self.slits.slitord_id[slit_idx]) +
'. This slit/order will not reduced!')
self.slits.mask[slit_idx] = self.slits.bitmask.turn_on(self.slits.mask[slit_idx], 'BADTILTCALIB')
continue
thismask = self.slitmask == slit_spat
# Performs the initial tracing of the line centroids as a
# function of spatial position resulting in 1D traces for
# each line.
msgs.info('Trace the tilts')
self.trace_dict = self.trace_tilts(_mstilt, self.lines_spec, self.lines_spat,
thismask, self.slitcen[:, slit_idx])
# IF there are < 2 usable arc lines for tilt tracing, PCA fit does not work and the reduction crushes
# TODO investigate why some slits have <2 usable arc lines
if np.sum(self.trace_dict['use_tilt']) < 2:
msgs.warn('Less than 2 usable arc lines for slit/order = {:d}'.format(self.slits.slitord_id[slit_idx]) +
'. This slit/order will not reduced!')
self.slits.mask[slit_idx] = self.slits.bitmask.turn_on(self.slits.mask[slit_idx], 'BADTILTCALIB')
continue
# Check the spectral coverage of the usable arc lines for tilts. If the coverage is small,
# it will affect the wavelength range in waveimg (i.e, self.wv_calib.build_waveimg()) and
# crash the reduction later on.
# Here we mask slits that computed the tilts with arc lines coverage <10%
use_tilt_spec_cov = (self.trace_dict['tilts_spec'][:, self.trace_dict['use_tilt']].max() -
self.trace_dict['tilts_spec'][:, self.trace_dict['use_tilt']].min()) / self.arccen.shape[0]
if use_tilt_spec_cov < 0.1:
msgs.warn(f'The spectral coverage of the usable arc lines is {use_tilt_spec_cov:.3f} (less than 10%).' +
' This slit/order will not be reduced!')
self.slits.mask[slit_idx] = self.slits.bitmask.turn_on(self.slits.mask[slit_idx], 'BADTILTCALIB')
continue
# TODO: Show the traces before running the 2D fit
self.spat_order[slit_idx] = self._parse_param(self.par, 'spat_order', slit_idx)
self.spec_order[slit_idx] = self._parse_param(self.par, 'spec_order', slit_idx)
# 2D model of the tilts, includes construction of QA
# NOTE: This also fills in self.all_fit_dict and self.all_trace_dict
coeff_out = self.fit_tilts(self.trace_dict, thismask, self.slitcen[:,slit_idx],
self.spat_order[slit_idx], self.spec_order[slit_idx],
slit_idx,
doqa=doqa, show_QA=show)
self.coeffs[:self.spec_order[slit_idx]+1,:self.spat_order[slit_idx]+1,slit_idx] = coeff_out
# TODO: Need a way to assess the success of fit_tilts and
# flag the slit if it fails
# Tilts are created with the size of the original slitmask,
# which corresonds to the same binning as the science
# images, trace images, and pixelflats etc.
self.tilts = tracewave.fit2tilts(self.slitmask_science.shape, coeff_out,
self.par['func2d'])
# Save to final image
thismask_science = self.slitmask_science == slit_spat
self.final_tilts[thismask_science] = self.tilts[thismask_science]
if show:
viewer, ch = display.show_image(self.mstilt.image * (self.slitmask > -1), chname='tilts')
display.show_tilts(viewer, ch, self.make_tbl_tilt_traces())
if debug:
show_tilts_mpl(self.mstilt.image*(self.slitmask > -1), self.make_tbl_tilt_traces())
# Record the Mask
bpmtilts = np.zeros_like(self.slits.mask, dtype=self.slits.bitmask.minimum_dtype())
for flag in ['BADTILTCALIB']:
bpm = self.slits.bitmask.flagged(self.slits.mask, flag=flag)
if np.any(bpm):
bpmtilts[bpm] = self.slits.bitmask.turn_on(bpmtilts[bpm], flag)
# grab slits file_name
slits_filename = self.slits.construct_file_name(self.slits.calib_key)
# grab titimg file_name
tiltimg_filename = self.mstilt.construct_file_name(self.mstilt.calib_key)
# Build and return the calibration frame
tilts = WaveTilts(self.coeffs, self.slits.nslits, self.slits.spat_id, self.spat_order,
self.spec_order, self.par['func2d'], bpmtilts=bpmtilts,
spat_flexure=self.spat_flexure, PYP_SPEC=self.spectrograph.name,
slits_filename=slits_filename, tiltimg_filename=tiltimg_filename,
tilt_traces=self.make_tbl_tilt_traces())
# Inherit the calibration frame naming from self.mstilt
# TODO: Should throw an error here if these calibration frame naming
# elements are not defined by self.mstilts...
tilts.copy_calib_internals(self.mstilt)
return tilts
[docs]
def make_tbl_tilt_traces(self):
"""
Make a Table of the traced and fitted tilts from self.all_trace_dict
to save into the WaveTilts object
Returns:
`astropy.table.Table`_: Table including the traced and fitted tilts for each slit.
Columns are:
- slit_ids: Slit IDs for which the tilts were traced and fit
- goodpix_spat: Good spatial pixels of the traced tilts
- goodpix_tilt: Good spectral pixels of the traced tilts
- goodpix_lid: Line IDs for each goodpix. This is needed to associate goodpix to each line
- badpix_spat: Masked spatial pixels of the traced tilts
- badpix_tilt: Masked spectral pixels of the traced tilts
- badpix_lid: Line IDs for each badpix. This is needed to associate badpix to each line
- good2dfit_spat: Good spatial pixels of the 2D fit of the tilts
- good2dfit_tilt: Good spectral pixels of the 2D fit of the tilts
- good2dfit_lid: Line IDs for each good2dfit. This is needed to associate good2dfit to each line
- bad2dfit_spat: Rejected spatial pixels of the 2D fit of the tilts
- bad2dfit_tilt: Rejected spectral pixels of the 2D fit of the tilts
- bad2dfit_lid: Line IDs for each bad2dfit. This is needed to associate bad2dfit to each line
"""
if self.all_trace_dict is None:
msgs.error('No tilts have been traced and fit yet. Run the run() method first.')
# slit_ids
slit_ids = np.array([])
# good&bad traces
goodpix_lid = np.array([])
goodpix_spat = np.array([])
goodpix_tilt = np.array([])
badpix_lid = np.array([])
badpix_spat = np.array([])
badpix_tilt = np.array([])
# good&bad fits
good2dfit_lid = np.array([])
good2dfit_spat = np.array([])
good2dfit_tilt = np.array([])
bad2dfit_lid = np.array([])
bad2dfit_spat = np.array([])
bad2dfit_tilt = np.array([])
# fill the arrays with the traced and fitted tilts
# each trc is a slit
for i, trc in enumerate(self.all_trace_dict):
if trc is not None and self.slits.mask[i] == 0:
# slit ids
slit_ids = np.append(slit_ids, self.slits.spat_id[i])
# good pixels
gpix = trc['tot_mask']
# bad pixels
bpix = np.invert(gpix) & (trc['tilts'] > 0)
# good 2d fit
gfit = gpix & trc['fit_mask']
# bad 2d fit
bfit = gpix & np.invert(trc['fit_mask'])
for l in range(trc['tilts_spat'].shape[1]):
# good pixels
goodpix_lid = np.append(goodpix_lid, np.repeat(f'{self.slits.spat_id[i]}_{l + 1}', sum(gpix[:, l])))
goodpix_spat = np.append(goodpix_spat, trc['tilts_spat'][:,l][gpix[:,l]])
goodpix_tilt = np.append(goodpix_tilt, trc['tilts'][:,l][gpix[:,l]])
# bad pixels
badpix_lid = np.append(badpix_lid, np.repeat(f'{self.slits.spat_id[i]}_{l + 1}', sum(bpix[:, l])))
badpix_spat = np.append(badpix_spat, trc['tilts_spat'][:,l][bpix[:,l]])
badpix_tilt = np.append(badpix_tilt, trc['tilts'][:,l][bpix[:,l]])
# good 2d fit
good2dfit_lid = np.append(good2dfit_lid, np.repeat(f'{self.slits.spat_id[i]}_{l + 1}', sum(gfit[:, l])))
good2dfit_spat = np.append(good2dfit_spat, trc['tilts_spat'][:,l][gfit[:,l]])
good2dfit_tilt = np.append(good2dfit_tilt, trc['tilt_2dfit'][:,l][gfit[:,l]])
# bad 2d fit
bad2dfit_lid = np.append(bad2dfit_lid, np.repeat(f'{self.slits.spat_id[i]}_{l + 1}', sum(bfit[:, l])))
bad2dfit_spat = np.append(bad2dfit_spat, trc['tilts_spat'][:,l][bfit[:,l]])
bad2dfit_tilt = np.append(bad2dfit_tilt, trc['tilt_2dfit'][:,l][bfit[:,l]])
# fill the table
tbl_tilt_traces = table.Table()
trc_arrays = [slit_ids, goodpix_spat, goodpix_tilt, badpix_lid, badpix_spat, badpix_tilt,
good2dfit_lid, good2dfit_spat, good2dfit_tilt, bad2dfit_lid, bad2dfit_spat, bad2dfit_tilt]
tbl_keys = ['slit_ids', 'goodpix_spat', 'goodpix_tilt', 'badpix_lid', 'badpix_spat', 'badpix_tilt',
'good2dfit_lid', 'good2dfit_spat', 'good2dfit_tilt', 'bad2dfit_lid', 'bad2dfit_spat', 'bad2dfit_tilt']
for i, arr in enumerate(trc_arrays):
if arr.size > 0:
tbl_tilt_traces[tbl_keys[i]] = np.expand_dims(arr, axis=0)
if len(tbl_tilt_traces) == 0:
msgs.warn('No traced and fitted tilts have been found.')
return None
return tbl_tilt_traces
[docs]
def _parse_param(self, par, key, slit):
"""
Grab a parameter for a given slit
Args:
par (ParSet):
key (str):
slit (int):
Returns:
object: Value of the parameter
"""
param_in = par[key]
if isinstance(param_in, (float, int)):
param = param_in
elif isinstance(param_in, (list, np.ndarray)):
param = param_in[slit]
else:
raise ValueError('Invalid input for parameter {:s}'.format(key))
return param
def __repr__(self):
# Generate sets string
txt = '<{:s}: '.format(self.__class__.__name__)
if len(self.steps) > 0:
txt+= ' steps: ['
for step in self.steps:
txt += '{:s}, '.format(step)
txt = txt[:-2]+']' # Trim the trailing comma
txt += '>'
return txt
[docs]
def show_tilts_mpl(tilt_img, tilt_traces, show_traces=False, left_edges=None,
right_edges=None, slit_ids=None, cut=None):
"""Show the TiltImage with the traced and 2D fitted tilts overlaid
Args:
tilt_img (`numpy.ndarray`_):
TiltImage
tilt_traces (`astropy.table.Table`_):
Table containing the traced and fitted tilts. See
:func:`~pypeit.wavetilts.BuiltWaveTilts.make_tbl_tilt_traces` for
information on the table columns.
show_traces (bool, optional):
Show the traced tilts
left_edges (`numpy.ndarray`_, optional):
Left edges of the slits
right_edges (`numpy.ndarray`_, optional):
Right edges of the slits
slit_ids (`numpy.ndarray`_, optional):
Slit IDs
cut (tuple, optional):
Lower and upper levels cut for the image display
"""
if tilt_traces is None:
return msgs.error('No tilts have been traced or fitted')
if cut is None:
cut = utils.growth_lim(tilt_img, 0.98, fac=1)
w, h = plt.figaspect(1)
fig = plt.figure(figsize=(1.5 * w, 1.5 * h))
plt.imshow(tilt_img, origin='lower', interpolation='nearest', aspect='auto', cmap='gray',
vmin=cut[0], vmax=cut[1], zorder=0)
# traced tilts
if show_traces:
if 'goodpix_tilt' in tilt_traces.keys() and tilt_traces['goodpix_tilt'][0].size > 0:
plt.scatter(tilt_traces['goodpix_spat'][0], tilt_traces['goodpix_tilt'][0], color='cyan',
edgecolors=None, marker='s', s=10, alpha=0.3, lw=0, label='Good pixel', zorder=1)
# masked tilts
if 'badpix_tilt' in tilt_traces.keys() and tilt_traces['badpix_tilt'][0].size > 0:
plt.scatter(tilt_traces['badpix_spat'], tilt_traces['badpix_tilt'], color='red',
edgecolors=None, marker='s', s=10, alpha=0.3, lw=0, label='Masked pixel', zorder=2)
# 2D fit tilts
# loop over each line so that we can plot lines instead of points
if 'good2dfit_lid' in tilt_traces.keys():
for iline in np.unique(tilt_traces['good2dfit_lid'][0]):
this_line = tilt_traces['good2dfit_lid'][0] == iline
if np.any(this_line):
good2dfit_spat = tilt_traces['good2dfit_spat'][0][this_line]
good2dfit_tilt = tilt_traces['good2dfit_tilt'][0][this_line]
plt.plot(good2dfit_spat, good2dfit_tilt, color='blue', lw=1, alpha=0.4, zorder=3)
# rejected point in 2D fit
if 'bad2dfit_tilt' in tilt_traces.keys() and tilt_traces['bad2dfit_tilt'][0].size > 0:
plt.scatter(tilt_traces['bad2dfit_spat'][0], tilt_traces['bad2dfit_tilt'][0], color='yellow',
edgecolors=None, marker='s', s=5, alpha=0.3, lw=0, label='Rejected in fit', zorder=4)
if left_edges is not None and right_edges is not None:
pstep = 50
for i in range(left_edges.shape[1]):
spec = np.arange(left_edges[:, i].size)
plt.plot(left_edges[::pstep, i], spec[::pstep], color='green', lw=2, zorder=5)
plt.plot(right_edges[::pstep, i], spec[::pstep], color='magenta', lw=2, zorder=5)
x_spatid = left_edges[spec.size//2, i] + 0.10*(right_edges[spec.size//2, i] - left_edges[spec.size//2, i])
y_spatid = spec[spec.size//2]
if slit_ids is not None:
plt.text(x_spatid, y_spatid, str(slit_ids[i]), color='aquamarine', fontsize=10, alpha=1,
weight='bold', rotation='vertical', zorder=5)
# add legend for 2D fitted tilts
legend_element = [Line2D([0], [0], color='blue', ls='-', lw=1, label='Good tilt fit')]
handles, labels = plt.gca().get_legend_handles_labels()
handles.append(legend_element[0])
lcolors = ['cyan', 'red', 'yellow', 'blue'] if show_traces else ['red', 'yellow', 'blue']
legend = plt.legend(handles=handles, labelcolor=lcolors, loc=3, markerscale=2, frameon=False)
for h in legend.legendHandles:
h.set_alpha(1)
plt.ylabel('Spectral pixel index')
plt.xlabel('Spatial pixel index')
plt.show()