Source code for pypeit.tracepca

"""
Implements a general purpose object used to decompose and predict
traces using principle-component analysis.

.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst

"""
import warnings
from IPython import embed

import numpy as np

from astropy.io import fits

from pypeit import log
from pypeit import PypeItError
from pypeit import utils
from pypeit.io import hdu_iter_by_ext
from pypeit.core import trace
from pypeit.core import pca
from pypeit.core.fitting import PypeItFit
from pypeit.datamodel import DataContainer

# TODO: This is even more general than a "trace" PCA. Rename to
# "VectorPCA"?

[docs] class TracePCA(DataContainer): r""" Class to build and interact with PCA model of traces. This is primarily a container class for the results of :func:`~pypeit.core.pca.pca_decomposition`, :func:`~pypeit.core.pca.fit_pca_coefficients`, and :func:`~pypeit.core.pca.pca_predict`. The datamodel attributes are: .. include:: ../include/class_datamodel_tracepca.rst Args: trace_cen (`numpy.ndarray`_, optional): A floating-point array with the spatial location of each each trace. Shape is :math:`(N_{\rm spec}, N_{\rm trace})`. If None, the object is "empty" and all of the other keyword arguments are ignored. npca (:obj:`bool`, optional): The number of PCA components to keep. See :func:`~pypeit.core.pca.pca_decomposition`. pca_explained_var (:obj:`float`, optional): The percentage (i.e., not the fraction) of the variance in the data accounted for by the PCA used to truncate the number of PCA coefficients to keep (see `npca`). Ignored if `npca` is provided directly. See :func:`~pypeit.core.pca.pca_decomposition`. reference_row (:obj:`int`, optional): The row (spectral position) in `trace_cen` to use as the reference coordinate system for the PCA. If None, set to the :math:`N_{\rm spec}/2`. coo (`numpy.ndarray`_, optional): Floating-point array with the reference coordinates for each trace. If provided, the shape must be :math:`(N_{\rm trace},)`. If None, the reference coordinate system is defined by the value of `trace_cen` at the spectral position defined by `reference_row`. See the `mean` argument of :func:`~pypeit.core.pca.pca_decomposition`. """ version = '1.1.0' """Datamodel version.""" datamodel = {'reference_row': dict(otype=(int, np.integer), descr='The row (spectral position) used as the reference ' \ 'coordinate system for the PCA.'), 'trace_coo': dict(otype=np.ndarray, atype=(float,np.floating), descr='Trace coordinates. Shape must be ' r':math:`(N_{\rm spec},N_{\rm trace})`.'), 'nspec': dict(otype=int, descr='Number of pixels in the image spectral direction.'), 'ntrace': dict(otype=int, descr='Number of traces used to construct the PCA.'), 'input_npca': dict(otype=int, descr='Requested number of PCA components if provided.'), 'npca': dict(otype=int, descr='Number of PCA components used.'), 'input_pcav': dict(otype=(float,np.floating), descr='Requested variance accounted for by PCA decomposition.'), 'pca_coeffs': dict(otype=np.ndarray, atype=(float,np.floating), descr=r'PCA component coefficients. If the PCA decomposition ' r'used :math:`N_{\rm comp}` components for ' r':math:`N_{\rm vec}` vectors, the shape of this array ' r'must be :math:`(N_{\rm vec}, N_{\rm comp})`. The ' r'array can be 1D with shape :math:`(N_{\rm vec},)` if ' r'there was only one PCA component.'), 'pca_components': dict(otype=np.ndarray, atype=(float,np.floating), descr=r'Vectors with the PCA components. Shape must be ' r':math:`(N_{\rm comp}, N_{\rm spec})`.'), 'pca_mean': dict(otype=np.ndarray, atype=(float,np.floating), descr=r'The mean offset of the PCA decomposotion for each ' r' spectral pixel. Shape is :math:`(N_{\rm spec},)`.'), 'pca_coeffs_model': dict(otype=np.ndarray, atype=PypeItFit, descr='An array of PypeItFit objects, one per PCA ' 'component, that models the trend of the PCA ' 'component coefficients with the reference ' 'coordinate of each vector. These models are ' r'used by :func:`predict` to model the expected ' 'coefficients at a new reference coordinate.')} """Object datamodel.""" internals = ['is_empty'] # TODO: Add a show method that plots the pca coefficients and the # current fit, if there is one def __init__(self, trace_cen=None, npca=None, pca_explained_var=99.0, reference_row=None, coo=None): # Instantiate as an empty DataContainer super().__init__() self.is_empty = True # Only do the decomposition if the trace coordinates are provided. if trace_cen is not None: self.decompose(trace_cen, npca=npca, pca_explained_var=pca_explained_var, reference_row=reference_row, coo=coo)
[docs] def decompose(self, trace_cen, npca=None, pca_explained_var=99.0, reference_row=None, coo=None): r""" Construct the PCA from scratch. Args: trace_cen (`numpy.ndarray`_): A floating-point array with the spatial location of each each trace. Shape is :math:`(N_{\rm spec}, N_{\rm trace})`. Cannot be None. npca (:obj:`bool`, optional): The number of PCA components to keep. See :func:`~pypeit.core.pca.pca_decomposition`. pca_explained_var (:obj:`float`, optional): The percentage (i.e., not the fraction) of the variance in the data accounted for by the PCA used to truncate the number of PCA coefficients to keep (see `npca`). Ignored if `npca` is provided directly. See :func:`~pypeit.core.pca.pca_decomposition`. reference_row (:obj:`int`, optional): The row (spectral position) in `trace_cen` to use as the reference coordinate system for the PCA. If None, set to the :math:`N_{\rm spec}/2`. coo (`numpy.ndarray`_, optional): Floating-point array with the reference coordinates for each trace. If provided, the shape must be :math:`(N_{\rm trace},)`. If None, the reference coordinate system is defined by the value of `trace_cen` at the spectral position defined by `reference_row`. See the `mean` argument of :func:`~pypeit.core.pca.pca_decomposition`. """ if trace_cen is None: raise ValueError('Must provide traces to construct the PCA.') self._reinit() self.is_empty = False # Set the reference row to use for the coordinates of the trace self.reference_row = trace_cen.shape[0]//2 if reference_row is None else reference_row self.ntrace = trace_cen.shape[1] self.trace_coo = trace_cen[self.reference_row,:] if coo is None else np.atleast_1d(coo) if self.trace_coo.size != self.ntrace: raise ValueError('Provided reference coordinates have incorrect shape.') # Save the input self.input_npca = npca self.input_pcav = pca_explained_var # Perform the PCA decomposition of the traces self.pca_coeffs, self.pca_components, self.pca_mean, _ \ = pca.pca_decomposition(trace_cen.T, npca=npca, pca_explained_var=pca_explained_var, mean=self.trace_coo) self.npca = self.pca_coeffs.shape[1] self.nspec = self.pca_components.shape[1]
[docs] def _reinit(self): """ Erase and/or define all the attributes of the class. """ # The following are *all* objects assigned to self. self.is_empty = True self.reference_row = None self.ntrace = None self.trace_coo = None self.input_npca = None self.input_pcav = None self.pca_coeffs = None self.pca_components = None self.pca_mean = None self.npca = None self.nspec = None self.pca_coeffs_model = None
[docs] def build_interpolator(self, order, ivar=None, weights=None, function='polynomial', lower=3.0, upper=3.0, maxrej=1, maxiter=25, minx=None, maxx=None, debug=False): """ Wrapper for :func:`~pypeit.core.pca.fit_pca_coefficients` that uses class attributes and saves the input parameters. """ if self.is_empty: raise ValueError('TracePCA object is empty; re-instantiate or run decompose().') self.pca_coeffs_model \ = pca.fit_pca_coefficients(self.pca_coeffs, order, ivar=ivar, weights=weights, function=function, lower=lower, upper=upper, minx=minx, maxx=maxx, maxrej=maxrej, maxiter=maxiter, coo=self.trace_coo, debug=debug)
[docs] def predict(self, x): r""" Predict one or more traces given the functional forms for the PCA coefficients. .. warning:: The PCA coefficients must have first been modeled by a function before using this method. An error will be raised if :attr:`fit_coeff` is not defined. Args: x (:obj:`float`, `numpy.ndarray`_): One or more spatial coordinates (at the PCA reference row) at which to sample the PCA coefficients and produce the PCA model for the trace spatial position as a function of spectral pixel. Returns: `numpy.ndarray`_: The array with the predicted spatial locations of the trace. If the provided coordinate is a single value, the returned shape is :math:`(N_{\rm pix},)`; otherwise it is :math:`(N_{\rm pix}, N_{\rm x})`. """ if self.is_empty: raise ValueError('TracePCA object is empty; re-instantiate or run decompose().') if self.pca_coeffs_model is None: raise PypeItError('PCA coefficients have not been modeled; run build_interpolator first.') return pca.pca_predict(x, self.pca_coeffs_model, self.pca_components, self.pca_mean, x).T
[docs] def _bundle(self, ext='PCA'): """Bundle the data for writing.""" d = super()._bundle(ext=ext) if self.pca_coeffs_model is None: return d # Fix the default bundling to handle the fact that # pca_coeffs_model is an array of PypeItFit instances. _d = d[0] if ext is None else d[0][ext] del _d['pca_coeffs_model'] d += [{'{0}_MODEL_{1}'.format(ext,i+1): self.pca_coeffs_model[i]} for i in range(self.npca)] return d
# TODO: Although I don't like doing it, kwargs is here to catch the # extraneous keywords that can be passed to _parse from the base class but # won't be used.
[docs] @classmethod def _parse(cls, hdu, hdu_prefix=None, **kwargs): """ Parse the data from the provided HDU. See :func:`~pypeit.datamodel.DataContainer._parse` for the argument descriptions. """ # Run the default parser to get most of the data d, version_passed, type_passed, parsed_hdus = super()._parse(hdu, hdu_prefix=hdu_prefix) # This should only ever read one hdu! if len(parsed_hdus) > 1: raise PypeItError('CODING ERROR: Parsing saved TracePCA instances should only parse 1 HDU, ' 'independently of the PCA PypeItFit models.') # Check if any models exist if hasattr(hdu, '__len__') \ and np.any(['{0}_MODEL'.format(parsed_hdus[0]) in h.name for h in hdu]): # Parse the models model_ext = ['{0}_MODEL_{1}'.format(parsed_hdus[0],i+1) for i in range(d['npca'])] d['pca_coeffs_model'] = np.array([PypeItFit.from_hdu(hdu[e]) for e in model_ext]) parsed_hdus += model_ext return d, version_passed, type_passed, parsed_hdus
[docs] @classmethod def from_dict(cls, d=None): """ Instantiate from a dictionary. This is a basic wrapper for :class:`~pypeit.datamodel.DataContainer.from_dict` that appropriately toggles :attr:`is_empty`. """ self = super().from_dict(d=d) self.is_empty = False return self
# TODO: Like with the use of TracePCA in EdgeTraceSet, we should # integrate the elements of the function below into classes that trace # objects and tilts so that the PCA can be called and used later
[docs] def pca_trace_object(trace_cen, order=None, trace_bpm=None, min_length=0.6, npca=None, pca_explained_var=99.0, reference_row=None, coo=None, minx=None, maxx=None, trace_wgt=None, function='polynomial', lower=3.0, upper=3.0, maxrej=1, maxiter=25, debug=False): r""" Decompose and reconstruct the provided traces using principle-component analysis. Args: trace_cen (`numpy.ndarray`_): A floating-point array with the spatial location of each each trace. Shape is :math:`(N_{\rm spec}, N_{\rm trace})`. order (:obj:`int`, :obj:`list`, optional): The order of the polynomial to use fit each PCA coefficient as a function of trace position. If None, `order` is set to :math:`3.3 N_{\rm use}/N_{\rm trace}`, where :math:`N_{\rm use}` is the number of traces used to construct the PCA and :math:`N_{\rm trace}` is the number of total traces provided. If an integer (determined automatically if the argument is `None`), the order per PCA component (see `npca`) is set to cascade from high-to-low order as follows:: _order = np.clip(order - np.arange(npca), 1, None).astype(int) trace_bpm (`numpy.ndarray`_, optional): Bad-pixel mask for the trace data (True is bad; False is good). Must match the shape of `trace_cen`. min_length (:obj:`float`, optional): The good position of the trace must cover at least this fraction of the spectral dimension for use in the PCA decomposition. npca (:obj:`bool`, optional): The number of PCA components to keep. See :func:`~pypeit.core.pca.pca_decomposition`. pca_explained_var (:obj:`float`, optional): The percentage (i.e., not the fraction) of the variance in the data accounted for by the PCA used to truncate the number of PCA coefficients to keep (see `npca`). Ignored if `npca` is provided directly. See :func:`~pypeit.core.pca.pca_decomposition`. reference_row (:obj:`int`, optional): The row (spectral position) in `trace_cen` to use as the reference coordinate system for the PCA. If None, set to the :math:`N_{\rm spec}/2` or based on the spectral position that crosses the most number of valid trace positions. coo (`numpy.ndarray`_, optional): Floating-point array with the reference coordinates to use for each trace. If None, coordinates are defined at the reference row of `trace_cen`. Shape must be :math:`(N_{\rm trace},)`. minx, maxx (:obj:`float`, optional): Minimum and maximum values used to rescale the independent axis data. If None, the minimum and maximum values of `coo` are used. See :func:`~pypeit.core.fitting.robust_fit`. trace_wgt (`numpy.ndarray`_, optional): Weights to apply to the PCA coefficient of each trace during the fit. Weights are independent of the PCA component. See `weights` parameter of :func:`~pypeit.core.pca.fit_pca_coefficients`. Shape must be :math:`(N_{\rm trace},)`. function (:obj:`str`, optional): Type of function used to fit the data. lower (:obj:`float`, optional): Number of standard deviations used for rejecting data **below** the mean residual. If None, no rejection is performed. See :func:`~pypeit.core.fitting.robust_fit`. upper (:obj:`float`, optional): Number of standard deviations used for rejecting data **above** the mean residual. If None, no rejection is performed. See :func:`~pypeit.core.fitting.robust_fit`. maxrej (:obj:`int`, optional): Maximum number of points to reject during fit iterations. See :func:`~pypeit.core.fitting.robust_fit`. maxiter (:obj:`int`, optional): Maximum number of rejection iterations allows. To force no rejection iterations, set to 0. debug (:obj:`bool`, optional): Show plots useful for debugging. """ # Check the input if trace_bpm is None: use_trace = np.ones(trace_cen.shape[1], dtype=bool) _reference_row = trace_cen.shape[0]//2 if reference_row is None else reference_row else: use_trace = np.sum(np.logical_not(trace_bpm), axis=0)/trace_cen.shape[0] > min_length _reference_row = trace.most_common_trace_row(trace_bpm[:,use_trace]) \ if reference_row is None else reference_row _coo = None if coo is None else coo[use_trace] # Instantiate the PCA cenpca = TracePCA(trace_cen[:,use_trace], npca=npca, pca_explained_var=pca_explained_var, reference_row=_reference_row, coo=_coo) # Set the order of the function fit to the PCA coefficients: # Order is set to cascade down to lower order for components # that account for a smaller percentage of the variance. if order is None: # TODO: Where does this come from? order = int(np.clip(np.floor(3.3*np.sum(use_trace)/trace_cen.shape[1]),1.0,3.0)) _order = np.atleast_1d(order) if _order.size == 1: _order = np.clip(order - np.arange(cenpca.npca), 1, None).astype(int) if _order.size != cenpca.npca: raise PypeItError('Number of polynomial orders does not match the number of PCA components.') log.info('Order of function fit to each component: {0}'.format(_order)) # Apply a 10% relative error to each coefficient. This performs # better than use_mad, since larger coefficients will always be # considered inliers, if the coefficients vary rapidly with # order as they sometimes do. # TODO: This inverse variance usage has performance issues and # tends to lead to rejection of coefficients that are near 0. # Instead of setting the floor to an absolute value 0.1, why not a # relative value like the mean or median of the coefficients? I.e. # ivar = utils.inverse(numpy.square(np.fmax(0.1*np.absolute(cenpca.pca_coeffs), # 0.1*np.median(cenpca.pca_coeffs)))) ivar = utils.inverse(np.square(np.fmax(0.1*np.absolute(cenpca.pca_coeffs), 0.1))) # Set any additional weights for each trace weights = np.ones(np.sum(use_trace), dtype=float) \ if trace_wgt is None else trace_wgt[use_trace] # TODO: This combination of ivar and weights is as it has been # previously. However, we recently changed the slit-edge tracing # code to fit the PCA coefficients with unity weights (the default # when passing weights=None to build_interpolator) and ivar=None. # The means the PCA coefficients are fit with uniform weighting and # the rejection is based on the median absolute deviation of the # data with respect to the fitted model. # # This current call below to build_interpolator will instead weight # the fit according to the weights set above, and it will reject # points based on the inverse variance set above. We need to check # that this makes sense! # Build the interpolator that allows prediction of new traces cenpca.build_interpolator(_order, ivar=ivar, weights=weights, function=function, lower=lower, upper=upper, maxrej=maxrej, maxiter=maxiter, minx=minx, maxx=maxx, debug=debug) # Return the traces predicted for all input traces return cenpca.predict(trace_cen[_reference_row,:] if coo is None else coo)