"""
Implements a general purpose object used to decompose and predict
traces using principle-component analysis.
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import warnings
from IPython import embed
import numpy as np
from astropy.io import fits
from pypeit import log
from pypeit import PypeItError
from pypeit import utils
from pypeit.io import hdu_iter_by_ext
from pypeit.core import trace
from pypeit.core import pca
from pypeit.core.fitting import PypeItFit
from pypeit.datamodel import DataContainer
# TODO: This is even more general than a "trace" PCA. Rename to
# "VectorPCA"?
[docs]
class TracePCA(DataContainer):
r"""
Class to build and interact with PCA model of traces.
This is primarily a container class for the results of
:func:`~pypeit.core.pca.pca_decomposition`,
:func:`~pypeit.core.pca.fit_pca_coefficients`, and
:func:`~pypeit.core.pca.pca_predict`.
The datamodel attributes are:
.. include:: ../include/class_datamodel_tracepca.rst
Args:
trace_cen (`numpy.ndarray`_, optional):
A floating-point array with the spatial location of each
each trace. Shape is :math:`(N_{\rm spec}, N_{\rm
trace})`. If None, the object is "empty" and all of the
other keyword arguments are ignored.
npca (:obj:`bool`, optional):
The number of PCA components to keep. See
:func:`~pypeit.core.pca.pca_decomposition`.
pca_explained_var (:obj:`float`, optional):
The percentage (i.e., not the fraction) of the variance
in the data accounted for by the PCA used to truncate the
number of PCA coefficients to keep (see `npca`). Ignored
if `npca` is provided directly. See
:func:`~pypeit.core.pca.pca_decomposition`.
reference_row (:obj:`int`, optional):
The row (spectral position) in `trace_cen` to use as the
reference coordinate system for the PCA. If None, set to
the :math:`N_{\rm spec}/2`.
coo (`numpy.ndarray`_, optional):
Floating-point array with the reference coordinates for
each trace. If provided, the shape must be :math:`(N_{\rm
trace},)`. If None, the reference coordinate system is
defined by the value of `trace_cen` at the spectral
position defined by `reference_row`. See the `mean`
argument of :func:`~pypeit.core.pca.pca_decomposition`.
"""
version = '1.1.0'
"""Datamodel version."""
datamodel = {'reference_row': dict(otype=(int, np.integer),
descr='The row (spectral position) used as the reference ' \
'coordinate system for the PCA.'),
'trace_coo': dict(otype=np.ndarray, atype=(float,np.floating),
descr='Trace coordinates. Shape must be '
r':math:`(N_{\rm spec},N_{\rm trace})`.'),
'nspec': dict(otype=int,
descr='Number of pixels in the image spectral direction.'),
'ntrace': dict(otype=int, descr='Number of traces used to construct the PCA.'),
'input_npca': dict(otype=int,
descr='Requested number of PCA components if provided.'),
'npca': dict(otype=int, descr='Number of PCA components used.'),
'input_pcav': dict(otype=(float,np.floating),
descr='Requested variance accounted for by PCA decomposition.'),
'pca_coeffs': dict(otype=np.ndarray, atype=(float,np.floating),
descr=r'PCA component coefficients. If the PCA decomposition '
r'used :math:`N_{\rm comp}` components for '
r':math:`N_{\rm vec}` vectors, the shape of this array '
r'must be :math:`(N_{\rm vec}, N_{\rm comp})`. The '
r'array can be 1D with shape :math:`(N_{\rm vec},)` if '
r'there was only one PCA component.'),
'pca_components': dict(otype=np.ndarray, atype=(float,np.floating),
descr=r'Vectors with the PCA components. Shape must be '
r':math:`(N_{\rm comp}, N_{\rm spec})`.'),
'pca_mean': dict(otype=np.ndarray, atype=(float,np.floating),
descr=r'The mean offset of the PCA decomposotion for each '
r' spectral pixel. Shape is :math:`(N_{\rm spec},)`.'),
'pca_coeffs_model': dict(otype=np.ndarray, atype=PypeItFit,
descr='An array of PypeItFit objects, one per PCA '
'component, that models the trend of the PCA '
'component coefficients with the reference '
'coordinate of each vector. These models are '
r'used by :func:`predict` to model the expected '
'coefficients at a new reference coordinate.')}
"""Object datamodel."""
internals = ['is_empty']
# TODO: Add a show method that plots the pca coefficients and the
# current fit, if there is one
def __init__(self, trace_cen=None, npca=None, pca_explained_var=99.0, reference_row=None,
coo=None):
# Instantiate as an empty DataContainer
super().__init__()
self.is_empty = True
# Only do the decomposition if the trace coordinates are provided.
if trace_cen is not None:
self.decompose(trace_cen, npca=npca, pca_explained_var=pca_explained_var,
reference_row=reference_row, coo=coo)
[docs]
def decompose(self, trace_cen, npca=None, pca_explained_var=99.0, reference_row=None,
coo=None):
r"""
Construct the PCA from scratch.
Args:
trace_cen (`numpy.ndarray`_):
A floating-point array with the spatial location of
each each trace. Shape is :math:`(N_{\rm spec},
N_{\rm trace})`. Cannot be None.
npca (:obj:`bool`, optional):
The number of PCA components to keep. See
:func:`~pypeit.core.pca.pca_decomposition`.
pca_explained_var (:obj:`float`, optional):
The percentage (i.e., not the fraction) of the
variance in the data accounted for by the PCA used to
truncate the number of PCA coefficients to keep (see
`npca`). Ignored if `npca` is provided directly. See
:func:`~pypeit.core.pca.pca_decomposition`.
reference_row (:obj:`int`, optional):
The row (spectral position) in `trace_cen` to use as
the reference coordinate system for the PCA. If None,
set to the :math:`N_{\rm spec}/2`.
coo (`numpy.ndarray`_, optional):
Floating-point array with the reference coordinates
for each trace. If provided, the shape must be
:math:`(N_{\rm trace},)`. If None, the reference
coordinate system is defined by the value of
`trace_cen` at the spectral position defined by
`reference_row`. See the `mean` argument of
:func:`~pypeit.core.pca.pca_decomposition`.
"""
if trace_cen is None:
raise ValueError('Must provide traces to construct the PCA.')
self._reinit()
self.is_empty = False
# Set the reference row to use for the coordinates of the trace
self.reference_row = trace_cen.shape[0]//2 if reference_row is None else reference_row
self.ntrace = trace_cen.shape[1]
self.trace_coo = trace_cen[self.reference_row,:] if coo is None else np.atleast_1d(coo)
if self.trace_coo.size != self.ntrace:
raise ValueError('Provided reference coordinates have incorrect shape.')
# Save the input
self.input_npca = npca
self.input_pcav = pca_explained_var
# Perform the PCA decomposition of the traces
self.pca_coeffs, self.pca_components, self.pca_mean, _ \
= pca.pca_decomposition(trace_cen.T, npca=npca,
pca_explained_var=pca_explained_var, mean=self.trace_coo)
self.npca = self.pca_coeffs.shape[1]
self.nspec = self.pca_components.shape[1]
[docs]
def _reinit(self):
"""
Erase and/or define all the attributes of the class.
"""
# The following are *all* objects assigned to self.
self.is_empty = True
self.reference_row = None
self.ntrace = None
self.trace_coo = None
self.input_npca = None
self.input_pcav = None
self.pca_coeffs = None
self.pca_components = None
self.pca_mean = None
self.npca = None
self.nspec = None
self.pca_coeffs_model = None
[docs]
def build_interpolator(self, order, ivar=None, weights=None, function='polynomial', lower=3.0,
upper=3.0, maxrej=1, maxiter=25, minx=None, maxx=None, debug=False):
"""
Wrapper for :func:`~pypeit.core.pca.fit_pca_coefficients` that uses class
attributes and saves the input parameters.
"""
if self.is_empty:
raise ValueError('TracePCA object is empty; re-instantiate or run decompose().')
self.pca_coeffs_model \
= pca.fit_pca_coefficients(self.pca_coeffs, order, ivar=ivar, weights=weights,
function=function, lower=lower, upper=upper,
minx=minx, maxx=maxx, maxrej=maxrej, maxiter=maxiter,
coo=self.trace_coo, debug=debug)
[docs]
def predict(self, x):
r"""
Predict one or more traces given the functional forms for the
PCA coefficients.
.. warning::
The PCA coefficients must have first been modeled by a
function before using this method. An error will be
raised if :attr:`fit_coeff` is not defined.
Args:
x (:obj:`float`, `numpy.ndarray`_):
One or more spatial coordinates (at the PCA reference
row) at which to sample the PCA coefficients and
produce the PCA model for the trace spatial position
as a function of spectral pixel.
Returns:
`numpy.ndarray`_: The array with the predicted spatial
locations of the trace. If the provided coordinate is a
single value, the returned shape is :math:`(N_{\rm
pix},)`; otherwise it is :math:`(N_{\rm pix}, N_{\rm
x})`.
"""
if self.is_empty:
raise ValueError('TracePCA object is empty; re-instantiate or run decompose().')
if self.pca_coeffs_model is None:
raise PypeItError('PCA coefficients have not been modeled; run build_interpolator first.')
return pca.pca_predict(x, self.pca_coeffs_model, self.pca_components, self.pca_mean, x).T
[docs]
def _bundle(self, ext='PCA'):
"""Bundle the data for writing."""
d = super()._bundle(ext=ext)
if self.pca_coeffs_model is None:
return d
# Fix the default bundling to handle the fact that
# pca_coeffs_model is an array of PypeItFit instances.
_d = d[0] if ext is None else d[0][ext]
del _d['pca_coeffs_model']
d += [{'{0}_MODEL_{1}'.format(ext,i+1): self.pca_coeffs_model[i]}
for i in range(self.npca)]
return d
# TODO: Although I don't like doing it, kwargs is here to catch the
# extraneous keywords that can be passed to _parse from the base class but
# won't be used.
[docs]
@classmethod
def _parse(cls, hdu, hdu_prefix=None, **kwargs):
"""
Parse the data from the provided HDU.
See :func:`~pypeit.datamodel.DataContainer._parse` for the
argument descriptions.
"""
# Run the default parser to get most of the data
d, version_passed, type_passed, parsed_hdus = super()._parse(hdu, hdu_prefix=hdu_prefix)
# This should only ever read one hdu!
if len(parsed_hdus) > 1:
raise PypeItError('CODING ERROR: Parsing saved TracePCA instances should only parse 1 HDU, '
'independently of the PCA PypeItFit models.')
# Check if any models exist
if hasattr(hdu, '__len__') \
and np.any(['{0}_MODEL'.format(parsed_hdus[0]) in h.name for h in hdu]):
# Parse the models
model_ext = ['{0}_MODEL_{1}'.format(parsed_hdus[0],i+1) for i in range(d['npca'])]
d['pca_coeffs_model'] = np.array([PypeItFit.from_hdu(hdu[e]) for e in model_ext])
parsed_hdus += model_ext
return d, version_passed, type_passed, parsed_hdus
[docs]
@classmethod
def from_dict(cls, d=None):
"""
Instantiate from a dictionary.
This is a basic wrapper for
:class:`~pypeit.datamodel.DataContainer.from_dict` that
appropriately toggles :attr:`is_empty`.
"""
self = super().from_dict(d=d)
self.is_empty = False
return self
# TODO: Like with the use of TracePCA in EdgeTraceSet, we should
# integrate the elements of the function below into classes that trace
# objects and tilts so that the PCA can be called and used later
[docs]
def pca_trace_object(trace_cen, order=None, trace_bpm=None, min_length=0.6, npca=None,
pca_explained_var=99.0, reference_row=None, coo=None, minx=None, maxx=None,
trace_wgt=None, function='polynomial', lower=3.0, upper=3.0, maxrej=1,
maxiter=25, debug=False):
r"""
Decompose and reconstruct the provided traces using
principle-component analysis.
Args:
trace_cen (`numpy.ndarray`_):
A floating-point array with the spatial location of each
each trace. Shape is :math:`(N_{\rm spec}, N_{\rm
trace})`.
order (:obj:`int`, :obj:`list`, optional):
The order of the polynomial to use fit each PCA
coefficient as a function of trace position. If None,
`order` is set to :math:`3.3 N_{\rm use}/N_{\rm trace}`,
where :math:`N_{\rm use}` is the number of traces used to
construct the PCA and :math:`N_{\rm trace}` is the number
of total traces provided. If an integer (determined
automatically if the argument is `None`), the order per
PCA component (see `npca`) is set to cascade from
high-to-low order as follows::
_order = np.clip(order - np.arange(npca), 1, None).astype(int)
trace_bpm (`numpy.ndarray`_, optional):
Bad-pixel mask for the trace data (True is bad; False is
good). Must match the shape of `trace_cen`.
min_length (:obj:`float`, optional):
The good position of the trace must cover at least this
fraction of the spectral dimension for use in the PCA
decomposition.
npca (:obj:`bool`, optional):
The number of PCA components to keep. See
:func:`~pypeit.core.pca.pca_decomposition`.
pca_explained_var (:obj:`float`, optional):
The percentage (i.e., not the fraction) of the variance
in the data accounted for by the PCA used to truncate the
number of PCA coefficients to keep (see `npca`). Ignored
if `npca` is provided directly. See
:func:`~pypeit.core.pca.pca_decomposition`.
reference_row (:obj:`int`, optional):
The row (spectral position) in `trace_cen` to use as the
reference coordinate system for the PCA. If None, set to
the :math:`N_{\rm spec}/2` or based on the spectral
position that crosses the most number of valid trace
positions.
coo (`numpy.ndarray`_, optional):
Floating-point array with the reference coordinates to
use for each trace. If None, coordinates are defined at
the reference row of `trace_cen`. Shape must be
:math:`(N_{\rm trace},)`.
minx, maxx (:obj:`float`, optional):
Minimum and maximum values used to rescale the
independent axis data. If None, the minimum and maximum
values of `coo` are used. See
:func:`~pypeit.core.fitting.robust_fit`.
trace_wgt (`numpy.ndarray`_, optional):
Weights to apply to the PCA coefficient of each trace
during the fit. Weights are independent of the PCA
component. See `weights` parameter of
:func:`~pypeit.core.pca.fit_pca_coefficients`. Shape must
be :math:`(N_{\rm trace},)`.
function (:obj:`str`, optional):
Type of function used to fit the data.
lower (:obj:`float`, optional):
Number of standard deviations used for rejecting data
**below** the mean residual. If None, no rejection is
performed. See :func:`~pypeit.core.fitting.robust_fit`.
upper (:obj:`float`, optional):
Number of standard deviations used for rejecting data
**above** the mean residual. If None, no rejection is
performed. See :func:`~pypeit.core.fitting.robust_fit`.
maxrej (:obj:`int`, optional):
Maximum number of points to reject during fit iterations.
See :func:`~pypeit.core.fitting.robust_fit`.
maxiter (:obj:`int`, optional):
Maximum number of rejection iterations allows. To force
no rejection iterations, set to 0.
debug (:obj:`bool`, optional):
Show plots useful for debugging.
"""
# Check the input
if trace_bpm is None:
use_trace = np.ones(trace_cen.shape[1], dtype=bool)
_reference_row = trace_cen.shape[0]//2 if reference_row is None else reference_row
else:
use_trace = np.sum(np.logical_not(trace_bpm), axis=0)/trace_cen.shape[0] > min_length
_reference_row = trace.most_common_trace_row(trace_bpm[:,use_trace]) \
if reference_row is None else reference_row
_coo = None if coo is None else coo[use_trace]
# Instantiate the PCA
cenpca = TracePCA(trace_cen[:,use_trace], npca=npca, pca_explained_var=pca_explained_var,
reference_row=_reference_row, coo=_coo)
# Set the order of the function fit to the PCA coefficients:
# Order is set to cascade down to lower order for components
# that account for a smaller percentage of the variance.
if order is None:
# TODO: Where does this come from?
order = int(np.clip(np.floor(3.3*np.sum(use_trace)/trace_cen.shape[1]),1.0,3.0))
_order = np.atleast_1d(order)
if _order.size == 1:
_order = np.clip(order - np.arange(cenpca.npca), 1, None).astype(int)
if _order.size != cenpca.npca:
raise PypeItError('Number of polynomial orders does not match the number of PCA components.')
log.info('Order of function fit to each component: {0}'.format(_order))
# Apply a 10% relative error to each coefficient. This performs
# better than use_mad, since larger coefficients will always be
# considered inliers, if the coefficients vary rapidly with
# order as they sometimes do.
# TODO: This inverse variance usage has performance issues and
# tends to lead to rejection of coefficients that are near 0.
# Instead of setting the floor to an absolute value 0.1, why not a
# relative value like the mean or median of the coefficients? I.e.
# ivar = utils.inverse(numpy.square(np.fmax(0.1*np.absolute(cenpca.pca_coeffs),
# 0.1*np.median(cenpca.pca_coeffs))))
ivar = utils.inverse(np.square(np.fmax(0.1*np.absolute(cenpca.pca_coeffs), 0.1)))
# Set any additional weights for each trace
weights = np.ones(np.sum(use_trace), dtype=float) \
if trace_wgt is None else trace_wgt[use_trace]
# TODO: This combination of ivar and weights is as it has been
# previously. However, we recently changed the slit-edge tracing
# code to fit the PCA coefficients with unity weights (the default
# when passing weights=None to build_interpolator) and ivar=None.
# The means the PCA coefficients are fit with uniform weighting and
# the rejection is based on the median absolute deviation of the
# data with respect to the fitted model.
#
# This current call below to build_interpolator will instead weight
# the fit according to the weights set above, and it will reject
# points based on the inverse variance set above. We need to check
# that this makes sense!
# Build the interpolator that allows prediction of new traces
cenpca.build_interpolator(_order, ivar=ivar, weights=weights, function=function,
lower=lower, upper=upper, maxrej=maxrej, maxiter=maxiter,
minx=minx, maxx=maxx, debug=debug)
# Return the traces predicted for all input traces
return cenpca.predict(trace_cen[_reference_row,:] if coo is None else coo)