"""
Provides a simple datamodel for a single spectrum.
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import inspect
import astropy
from IPython import embed
import numpy as np
from scipy.interpolate import interp1d
from pypeit import datamodel
from pypeit import io
from pypeit import log
from pypeit import PypeItError
from pypeit import utils
from pypeit.spectrographs.util import load_spectrograph
[docs]
class OneSpec(datamodel.DataContainer):
"""
DataContainer to hold single spectra, e.g., from
:class:`~pypeit.coadd1d.CoAdd1D`.
The datamodel attributes are:
.. include:: ../include/class_datamodel_onespec.rst
Args:
wave
wave_grid_mid
flux
PYP_SPEC
ivar
mask
telluric
obj_model
ext_mode
fluxed
Attributes:
head0 (`astropy.io.fits.Header`):
Primary header
spect_meta (:obj:`dict`):
Parsed meta from the header
spectrograph (:class:`pypeit.spectrographs.spectrograph.Spectrograph`):
Build from PYP_SPEC
"""
version = '1.0.2'
datamodel = {'wave': dict(otype=np.ndarray, atype=np.floating,
# TODO: The "weighted by pixel contributions" part
# should be better explained.
descr='Wavelength array (angstroms in vacuum), weighted by pixel '
'contributions'),
'wave_grid_mid': dict(otype=np.ndarray, atype=np.floating,
descr='Wavelength (angstroms in vacuum) evaluated at the '
'bin centers of a grid that is uniformly spaced '
'in either lambda or log10-lambda/velocity'),
'flux': dict(otype=np.ndarray, atype=np.floating,
descr='Flux array in units of counts/s or 10^-17 erg/s/cm^2/Ang; '
'see ``fluxed``'),
'ivar': dict(otype=np.ndarray, atype=np.floating,
descr='Inverse variance array (matches units of flux)'),
'sigma': dict(otype=np.ndarray, atype=np.floating,
descr='One sigma noise array, equivalent to 1/sqrt(ivar) (matches units of flux)'),
'mask': dict(otype=np.ndarray, atype=np.integer,
descr='Mask array (1=Good,0=Bad)'),
'telluric': dict(otype=np.ndarray, atype=np.floating, descr='Telluric model'),
'PYP_SPEC': dict(otype=str, descr='``PypeIt`` spectrograph designation'),
'obj_model': dict(otype=np.ndarray, atype=np.floating,
descr='Object model for tellurics'),
'ext_mode': dict(otype=str, descr='Extraction mode (options: BOX, OPT)'),
'fluxed': dict(otype=bool, descr='Boolean indicating if the spectrum is fluxed.'),
# TODO: Needs a better description. What's in the dictionary?
# Why isn't this dictionary expanded into its elements? I.e.,
# shouldn't each element of this dictionary be a component of
# the datamodel?
'spect_meta': dict(otype=dict, descr='header dict')}
internals = ['head0',
'filename',
'spectrograph',
'spect_meta',
'history']
[docs]
@classmethod
def from_file(cls, ifile, verbose=True, chk_version=True, **kwargs):
"""
Instantiate the object from an extension in the specified fits file.
Over-load :func:`~pypeit.datamodel.DataContainer.from_file`
to deal with the header
Args:
ifile (:obj:`str`, `Path`_):
Fits file with the data to read
verbose (:obj:`bool`, optional):
Print informational messages (not currently used)
chk_version (:obj:`bool`, optional):
Passed to :func:`from_hdu`.
kwargs (:obj:`dict`, optional):
Arguments passed directly to :func:`from_hdu`.
"""
with io.fits_open(ifile) as hdu:
self = cls.from_hdu(hdu, chk_version=chk_version, **kwargs)
self.filename = ifile
self.head0 = hdu[0].header
self.spectrograph = load_spectrograph(self.PYP_SPEC)
self.spect_meta = self.spectrograph.parse_spec_header(self.head0)
return self
[docs]
@classmethod
def from_xspec_file(cls, ifile:str):
hdul = io.fits_open(ifile)
wave = hdul['WAVELENGTH'].data
flux = hdul['FLUX'].data
return cls(wave, None, flux, fluxed=False)
@property
def npix(self):
"""
Number of pixels in the spectrum.
Returns:
:obj:`int`: Number of pixels in the spectrum.
"""
return self.wave.size
def __init__(self, wave, wave_grid_mid, flux, PYP_SPEC=None, ivar=None, sigma=None, mask=None, telluric=None,
obj_model=None, ext_mode=None, fluxed=None):
args, _, _, values = inspect.getargvalues(inspect.currentframe())
_d = dict([(k,values[k]) for k in args[1:]])
# Setup the DataContainer
datamodel.DataContainer.__init__(self, d=_d)
[docs]
def _bundle(self):
"""
Override the base class method simply to set the HDU extension name.
"""
return super()._bundle(ext='SPECTRUM')
[docs]
def to_file(self, ofile, primary_hdr=None, history=None, **kwargs):
"""
Over-load :func:`pypeit.datamodel.DataContainer.to_file`
to deal with the header
Args:
ofile (:obj:`str`): Filename
primary_hdr (`astropy.io.fits.Header`_, optional):
**kwargs: Passed to super.to_file()
"""
if primary_hdr is None:
primary_hdr = io.initialize_header()
# Build the header
if self.head0 is not None and self.PYP_SPEC is not None:
spectrograph = load_spectrograph(self.PYP_SPEC)
subheader = spectrograph.subheader_for_spec(self.head0, self.head0,
extra_header_cards = ['RA_OBJ', 'DEC_OBJ'])
else:
subheader = {}
# Add em in
for key in subheader:
primary_hdr[key] = subheader[key]
# Add history
if history is not None:
history.write_to_header(primary_hdr)
# Do it
super().to_file(ofile, primary_hdr=primary_hdr, **kwargs)
[docs]
def rebin(self, new_wv, fill_value=0., grow_bad_sig=False):
""" Rebin the spectrum to a new OneSpec object with the input array
Uses simple linear interpolation. The default (and only)
option conserves counts (and flambda).
WARNING: Do not trust either edge pixel of the new array.
In fact the sig is set to 0 for each of these
Also be aware that neighboring pixels are likely to be
correlated in a manner that is not described by the error
array.
Parameters
----------
new_wv : np.ndarray
New wavelength array
fill_value : float, optional
Fill value at the edges
Default = 0., but 'extrapolate' may be considered
grow_bad_sig : bool, optional
Allow sig<=0. values and grow them
Returns
-------
newspec : OneSpec
"""
# Save flux info to avoid unit issues
flux = self.flux
# Deal with nan
badf = np.any([np.isnan(flux), np.isinf(flux)], axis=0)
if np.sum(badf) > 0:
log.warning("Ignoring pixels with NAN or INF in flux")
gdf = ~badf
flux = flux[gdf]
# Check for bad pixels (not prepared for these)
if self.sigma is not None:
sig = self.sigma
bad_sig = sig[gdf] <= 0.
if np.sum(bad_sig) > 0:
if not grow_bad_sig:
raise PypeItError(
'Data contains rejected pixels (sig=0). Use grow_bad_sig to proceed and '
'grow them.'
)
bads = np.any([np.isnan(sig[gdf]), np.isinf(sig[gdf]**2)], axis=0) # Latter is for way too large values
bad_sig[bads] = True
# Endpoints of original pixels
npix = self.wave.size
wvh = (self.wave + np.roll(self.wave, -1)) / 2.
wvh[npix - 1] = self.wave[npix - 1] + \
(self.wave[npix - 1] - self.wave[npix - 2]) / 2.
dwv = wvh - np.roll(wvh, 1)
dwv[0] = 2 * (wvh[0] - self.wave[0])
med_dwv = np.median(dwv)
wvh = wvh[gdf]
dwv = dwv[gdf]
# Error
if self.sigma is not None:
var = sig[gdf]**2
var[bad_sig] = 0.
else:
var = np.ones_like(flux)
# Cumulative Sum
cumsum = np.cumsum(flux * dwv)
cumvar = np.cumsum(var * dwv, dtype=np.float64)
# Interpolate (loses the units)
fcum = interp1d(wvh, cumsum, fill_value=fill_value, bounds_error=False)
fvar = interp1d(wvh, cumvar, fill_value=0., bounds_error=False)
# Endpoints of new pixels
nnew = len(new_wv)
nwvh = (new_wv + np.roll(new_wv, -1)) / 2.
nwvh[nnew - 1] = new_wv[nnew - 1] + \
(new_wv[nnew - 1] - new_wv[nnew - 2]) / 2.
# Pad starting point
bwv = np.zeros(nnew + 1)
bwv[0] = new_wv[0] - (new_wv[1] - new_wv[0]) / 2.
bwv[1:] = nwvh
# Evaluate and put unit back
newcum = fcum(bwv)
newvar = fvar(bwv)
# Rebinned flux, var, co
new_fx = (np.roll(newcum, -1) - newcum)[:-1]
new_var = (np.roll(newvar, -1) - newvar)[:-1]
# Normalize (preserve counts and flambda)
new_dwv = bwv - np.roll(bwv, 1)
new_fx = new_fx / new_dwv[1:]
# Preserve S/N (crudely)
med_newdwv = np.median(new_dwv)
new_var = new_var / (med_newdwv/med_dwv) / new_dwv[1:]
# Return new spectrum
if self.sigma is not None:
# Create new_sig
new_sig = np.zeros_like(new_var)
gd = new_var > 0.
new_sig[gd] = np.sqrt(new_var[gd])
# Deal with bad pixels (grow_bad_sig should be True)
bad = np.where(var <= 0.)[0]
# Find nearby wavelengths in rebinned wavelength
nearidxs = np.searchsorted(new_wv, self.wave[bad])
# Pad arrays to enable vector operations
pwv = np.concatenate([self.wave,[self.wave[-1]+dwv[-1]]])
pndwv = np.concatenate([new_dwv,[new_dwv[-1]]])
pnwv = np.concatenate([new_wv,[new_wv[-1]+new_dwv[-1]]])
# Find distances between original bad wavelengths and nearby new ones
ldiff = np.abs(new_wv[nearidxs-1]-pwv[bad]) - \
(pndwv[1:][nearidxs]+dwv[bad])/2
rdiff = np.abs(pwv[bad]-pnwv[nearidxs]) - \
(pndwv[1:][nearidxs] + dwv[bad]) / 2
# Set errors to 0; we have to mind the padding above
new_sig[nearidxs[(ldiff<0)&(nearidxs<len(new_wv))]] = 0
new_sig[nearidxs[(rdiff<0)&(nearidxs<len(new_wv))]] = 0
# Zero out edge pixels -- not to be trusted
igd = np.where(gd)[0]
if len(igd) == 0: # Should not get here!
raise PypeItError('Not a single good pixel?! Something went wrong...')
new_sig[igd[0]] = 0.
new_sig[igd[-1]] = 0.
else:
new_sig = None
# Finish
return OneSpec(new_wv, None, new_fx, sigma=new_sig)
[docs]
def gauss_smooth(self, fwhm):
"""
Smooth the spectrum with a Gaussian kernal.
.. warning::
This does **not** propagate the errors! The returned
:class:`OneSpec` instance will *not* have any errors. All other
attributes are included in the returned
:class:`~pypeit.onespec.OneSpec` instance; array attributes are
copied.
Parameters
----------
fwhm : float
FWHM of the Gaussian kernal in pixels.
Returns
-------
:class:`~pypeit.onespec.OneSpec`
The smoothed spectrum
"""
return OneSpec(
self.wave.copy(), None if self.wave_grid_mid is None else self.wave_grid_mid.copy(),
utils.convolve_psf(self.flux, fwhm), PYP_SPEC=self.PYP_SPEC, ivar=None, sigma=None,
mask=None if self.mask is None else self.mask.copy(),
telluric=None if self.telluric is None else self.telluric.copy(),
obj_model=None if self.obj_model is None else self.obj_model.copy(),
ext_mode=self.ext_mode, fluxed=self.fluxed
)