"""
Temporary light-weight spectrum object.
Ideally this would be replaced by specutils.Spectrum
"""
from copy import deepcopy
from IPython import embed
import numpy as np
from pypeit import log
from pypeit import PypeItError
from pypeit import sampling
from pypeit import utils
[docs]
class Spectrum:
r"""
A light-weight container class for a spectrum.
The flux array can be either 1D or 2D. If 2D, its shape is always assumed
to be :math:`(N_{\rm pix},N_{\rm spec})`, where :math:`N_{\rm pix}` is the
number of pixels per spectrum and :math:`N_{\rm spec}` is the number of
spectra. The wavelength array must always be provided as 1D; i.e., for a 2D
flux array, the wavelength vector must be correct for each flux vector along
the 1st axis of the 2D array.
Parameters
----------
wave : array-like
Vacuum wavelengths in angstrom. Must be 1D, and its length must match
the first axis of ``flux``.
flux : array-like
Flux data at each wavelength. Can be 2D or 1D; see class description.
ivar : array-like, optional
Inverse variance in the flux. Shape must match ``flux``. If None,
assumes uncertainties are unknown.
gpm : array-like, optional
Boolean good-pixel mask. Shape must match ``flux``. If None, assumes
all pixels are valid.
meta : dict, optional
Collection of relevant metadata.
"""
def __init__(self, wave, flux, ivar=None, gpm=None, meta=None):
# Throughout I use the copy method to ensure the original arrays are
# copied
self.flux = np.asarray(flux, dtype=float).copy()
self.wave = np.asarray(wave, dtype=float).copy()
if self.wave.ndim != 1:
raise PypeItError('wavelength array must always be 1D in the spectrum object')
if self.wave.size != self.flux.shape[0]:
raise PypeItError('wavelength vector must match length of flux array')
if ivar is None:
self.ivar = None
else:
self.ivar = np.asarray(ivar, dtype=float).copy()
if self.ivar.shape != self.flux.shape:
raise PypeItError('Wavelength and inverse variance arrays do not have the same shape.')
if gpm is None:
self.gpm = np.ones(self.flux.shape, dtype=bool)
else:
self.gpm = np.asarray(gpm, dtype=bool).copy()
if self.gpm.shape != self.flux.shape:
raise PypeItError('Wavelength and good-pixel arrays do not have the same size.')
self.meta = meta if meta is None else deepcopy(meta)
@property
def size(self):
"""
The size of the flux array
"""
return self.flux.size
@property
def shape(self):
"""
The shape of the flux array
"""
return self.flux.shape
@property
def ndim(self):
"""
The dimensionality of the flux array
"""
return self.flux.ndim
[docs]
def copy(self):
"""
Return a deepcopy of the object.
"""
_ivar = None if self.ivar is None else self.ivar.copy()
_meta = None if self.meta is None else deepcopy(self.meta)
return self.__class__(
self.wave.copy(), self.flux.copy(), ivar=_ivar, gpm=self.gpm.copy(), meta=_meta
)
[docs]
def multiply(self, a):
"""
Multiply the spectrum by a scalar, vector, or another spectrum.
*This modifies the spectral data in place.* If uncertainties are
available, they are propagated. Any divisions by 0 result in an inverse
variance of 0 and the good pixel mask is set to False.
Parameters
----------
a : scalar, array-like, :class:`pypeit.core.spectrum.Spectrum`
Multiplicative factor. If an array, its shape must match
:attr:`flux`. If a spectrum, the wavelength arrays of the two
spectrum *must be identical*.
"""
# Multiply by a scalar
if isinstance(a, (int, np.integer, float, np.floating)):
if float(a) == 0.:
log.warning('Multiplicative factor is 0!')
self.flux *= a
if self.ivar is not None:
if np.absolute(a) > 0:
self.ivar /= a**2
else:
self.ivar *= 0.
return
# Workspace
sqr_err_ratio = None
a_gpm = None
if isinstance(a, Spectrum):
# Pull the necessary data out of the spectrum
# Check the wavelength vectors
# TODO: Loosen this; i.e., use isclose instead of array_equal?
if not np.array_equal(a.wave, self.wave):
raise PypeItError('To multiply two spectra, their wavelength vectors must be identical.')
a_flux = a.flux
a_gpm = a.gpm
if a.ivar is not None:
# Square of the ratio between the error and flux in a
sqr_err_ratio = utils.inverse(a.flux**2 * a.ivar)
else:
# Convert the array-like object to a numpy array
a_flux = np.asarray(a)
# Check the input
if a_flux.ndim > self.ndim:
raise PypeItError(
'Multiplication does not allow the dimensionality of the spectrum to change. '
f'The dimensionality of this spectrum is {self.ndim} and the multiplier is '
f'{a_flux.ndim}.'
)
# Numpy broadcasting rules mean that the arithmetic operations performed
# below should work, as long as the last a.ndim dimensions of a and this
# spectrum match.
if a_flux.shape != self.shape[:a_flux.ndim]:
raise PypeItError(
'Numpy will not be able to successfully broadcast arithmetic operations between '
f'this spectrum, shape={self.shape}, and the multiplier, shape={a_flux.shape}.'
)
# Reshape the arrays, if necessary
if a_flux.ndim != self.flux.ndim:
a_flux = np.expand_dims(
a_flux, tuple(np.arange(len(self.shape))[a_flux.ndim:].tolist())
)
if sqr_err_ratio is not None:
sqr_err_ratio = np.expand_dims(
sqr_err_ratio, tuple(np.arange(len(self.shape))[sqr_err_ratio.ndim:].tolist())
)
if a_gpm is not None:
a_gpm = np.expand_dims(
a_gpm, tuple(np.arange(len(self.shape))[a_gpm.ndim:].tolist())
)
# Add the error. NOTE: This *must* be done before the multiplication by
# a_flux below because that changes self.flux.
if self.ivar is not None:
# Square of the ratio between the error and flux in this spectrum
if sqr_err_ratio is None:
sqr_err_ratio = utils.inverse(self.flux**2 * self.ivar)
else:
sqr_err_ratio += utils.inverse(self.flux**2 * self.ivar)
# Complete the multiplication
self.flux *= a_flux
if sqr_err_ratio is not None:
# Propagate the error
sqr_err = self.flux**2 * sqr_err_ratio
self.ivar = utils.inverse(sqr_err)
self.gpm[np.logical_not(self.ivar > 0)] = False
if a_gpm is not None:
# Propagate the good-pixel mask
self.gpm &= a_gpm
[docs]
def inverse(self):
"""
Replace the spectrum with its multiplicative inverse.
*This modifies the spectrum in place.* If uncertainties are available,
they are propagated. Any divisions by 0 result in an inverse variance
of 0 and the good pixel mask is set to False.
"""
if self.ivar is not None:
self.ivar *= self.flux**4
self.gpm[np.logical_not(self.ivar > 0)] = False
self.flux = utils.inverse(self.flux)
[docs]
def to_magnitude(self, zeropoint=0.):
r"""
Convert the spectrum to magnitudes.
For fluxes, :math:`f`, this returns
.. math::
m = -2.5 \log_{\rm 10} (f) + Z,
where :math:`Z` is the provided zeropoint.
*This modifies the spectrum in place.* If uncertainties are available,
they are propagated. Any pixels with non-positive fluxes are masked.
Parameters
----------
zeropoint : float, optional
The magnitude conversion zeropoint (see above)
"""
if self.ivar is not None:
self.ivar *= (self.flux * np.log(10) / 2.5)**2
self.gpm[np.logical_not(self.flux > 0)] = False
self.flux[np.logical_not(self.gpm)] = 0.
self.flux[self.gpm] = -2.5 * np.log10(self.flux[self.gpm]) + zeropoint
[docs]
def resample(self, new_wave, pixel_fraction_threshold=0.8, conserve=False):
r"""
Resample the spectrum to a new wavelength array.
If available, errors and masking are both propagated through the
calculation. This function is basically a wrapper for
:class:`~pypeit.sampling.Resample`.
Parameters
----------
new_wave : array-like
New wavelength vector for the spectrum
pixel_fraction_threshold : float, optional
The resampling calculates the fraction of each output pixel that has
unmasked contributions from the original spectrum. Fractions below
this threshold will be masked in the output spectrum.
conserve : bool, optional
Conserve the flux in the resampled spectrum. If the units of the
spectrum are flux integrated over the pixel, this should typically
be True; if the units are flux density (e.g., :math:`{\rm
ergs/s/cm}^2{\rm /angstrom}`), this should typically be False.
Returns
-------
:class:`~pypeit.core.spectrum.Spectrum`
A new spectrum object with the resample data.
"""
# Setup
bpm = None if np.all(self.gpm) else np.logical_not(self.gpm).T
if self.ivar is None:
err = None
else:
err = np.zeros(self.ivar.shape, dtype=float)
err[self.gpm] = np.sqrt(utils.inverse(self.ivar[self.gpm]))
err = err.T
# Resample accepts both 1D and 2D arrays, but it expects the 2D arrays
# to have the spectra organized along the 2nd axis; i.e.,
# (N_spec,N_pix) instead of (N_pix,N_spec).
r = sampling.Resample(
self.flux.T, e=err, mask=bpm, x=self.wave, newx=new_wave, conserve=conserve
)
ivar = None if err is None else utils.inverse(r.oute.T)**2
return Spectrum(
r.outx, r.outy.T, ivar=ivar, gpm=r.outf.T > pixel_fraction_threshold, meta=self.meta,
)