"""
Module containing the core methods for arc-lamp fitting
and basic analysis. Note that there are additional modules
in pypeit.core.wavecal related to wavelength calibration.
.. include:: ../include/links.rst
"""
import numpy as np
from matplotlib import gridspec
from matplotlib import pyplot as plt
import scipy
from astropy import stats
from pypeit import msgs
from pypeit import utils
from pypeit.core import fitting
from IPython import embed
[docs]
def fit2darc(all_wv,all_pix,all_orders,nspec, nspec_coeff=4,norder_coeff=4,sigrej=3.0, func2d='legendre2d', debug=False):
"""Routine to obtain the 2D wavelength solution for an echelle spectrograph.
This is calculated from the spec direction pixel-centroid and the order number
of identified arc lines. The fit is a simple least-squares with rejections.
This is a port of the XIDL code: x_fit2darc.pro
Parameters
----------
all_wv: `numpy.ndarray`_
wavelengths of the identified lines
all_pix: `numpy.ndarray`_
Spectral direction centroid positions of the identified lines
all_orders: `numpy.ndarray`_
Echelle order number for each of the identified lines
nspec: int
size of the image in the spectral direction
nspec_coeff : int, optional
order of the fitting along the spectral (pixel) direction for each order
norder_coeff : int, optional
order of the fitting in the order direction
sigrej: float, optional
sigma level for the rejection
debug: bool, optional
If True, show extra plots to check the status of the procedure
Returns
-------
pypeitFit : :class:`pypeit.core.fitting.PypeItFit`
2D wavelength solution fit
"""
# Normalize for pixels. Fits are performed in normalized units (pixels/(nspec-1) to be able to deal with various
# binnings.
min_spec = 0.0
max_spec = 1.0
xnspecmin1 = float(nspec-1)
# Normalize for orders
min_order = np.min(all_orders)
max_order = np.max(all_orders)
if debug:
# set some plotting parameters
utils.pyplot_rcparams()
plt.figure(figsize=(7,5))
msgs.info("Plot identified lines")
cm = plt.get_cmap('RdYlBu_r')
sc = plt.scatter(all_orders, all_pix,c=all_wv/10000., cmap=cm)
cbar = plt.colorbar(sc)
cbar.set_label(r'Wavelength [$\mu$m]', rotation=270,
labelpad=20)
plt.xlabel(r'Normalized Orders')
plt.ylabel(r'Normalized Pixels')
plt.title(r'Location of the identified lines')
plt.show()
# Fit the product of wavelength and order number with a 2d legendre polynomial
all_wv_order = all_wv * all_orders
pypeitFit = fitting.robust_fit(all_pix/xnspecmin1, all_wv_order, (nspec_coeff, norder_coeff), x2=all_orders,
function=func2d, maxiter=100, lower=sigrej, upper=sigrej, minx=min_spec,maxx=max_spec,
minx2=min_order, maxx2=max_order, use_mad=True, sticky=False)
# Report the RMS
fin_rms = pypeitFit.calc_fit_rms(x2=all_orders, apply_mask=True)
msgs.info("RMS: {0:.5f} Ang*Order#".format(fin_rms))
if debug:
fit2darc_global_qa(pypeitFit, nspec)
fit2darc_orders_qa(pypeitFit, nspec)
return pypeitFit
[docs]
def fit2darc_global_qa(pypeitFit, nspec, outfile=None):
""" Generate a QA plot for the 2D fit of the wavelength solution
generated by :class:`pypeit.core.arc.fit2darc`
Parameters
----------
pypeitFit : :class:`pypeit.core.fitting.PypeItFit`
Fit object for the 2D arc solution
nspec: int
Size of the image in the spectral direction
outfile: str, optional
Name of the outfile to write to disk. If not provided, show to screen.
"""
msgs.info("Creating QA for 2D wavelength solution")
utils.pyplot_rcparams()
# Extract info from pypeitFit
xnspecmin1 = float(nspec - 1)
all_orders = pypeitFit['x2']
orders = np.unique(pypeitFit['x2'])
all_wv = pypeitFit['yval']/pypeitFit['x2']
all_pix = pypeitFit['xval']*xnspecmin1
gpm = pypeitFit.bool_gpm
nspec_coeff = pypeitFit['order'][0]
norder_coeff = pypeitFit['order'][1]
resid_wl_global = []
# Define pixels array
spec_vec_norm = np.arange(nspec)/xnspecmin1
# Define figure properties
plt.figure(figsize=(8, 5))
# Variable where to store the max wavelength covered by the
# spectrum
mx = 0.
# Loop over orders
for ii in orders:
# define the color
rr = (ii - np.max(orders)) / (np.min(orders) - np.max(orders))
gg = 0.0
bb = (ii - np.min(orders)) / (np.max(orders) - np.min(orders))
# evaluate solution
wv_order_mod = pypeitFit.eval(spec_vec_norm, x2=np.ones_like(spec_vec_norm)*ii)
# Plot solution
plt.plot(wv_order_mod / ii, spec_vec_norm*xnspecmin1, color=(rr, gg, bb), linestyle='-', linewidth=2.5)
# Evaluate residuals at each order
on_order = all_orders == ii
this_pix = all_pix[on_order]
this_wv = all_wv[on_order]
this_msk = gpm[on_order]
this_order = all_orders[on_order]
wv_order_mod_resid = pypeitFit.eval(this_pix/xnspecmin1, x2=this_order)
resid_wl = (wv_order_mod_resid / ii - this_wv)
resid_wl_global = np.append(resid_wl_global, resid_wl[this_msk])
plt.scatter((wv_order_mod_resid[~this_msk] / ii) + \
100. * resid_wl[~this_msk], this_pix[~this_msk], \
marker='x', color='black', linewidths=2.5, s=16.)
plt.scatter((wv_order_mod_resid[this_msk] / ii) + \
100. * resid_wl[this_msk], this_pix[this_msk], \
color=(rr, gg, bb), linewidth=2.5, s=16.)
if np.max(wv_order_mod_resid / ii) > mx:
mx = np.max(wv_order_mod_resid / ii)
rms_global = np.std(resid_wl_global)
plt.text(mx, np.max(spec_vec_norm*xnspecmin1), r'residuals $\times$100', \
ha="right", va="top")
plt.title(r'Arc 2D FIT, norder_coeff={:d}, nspec_coeff={:d}, RMS={:5.3f} Ang*Order#'.format(
norder_coeff, nspec_coeff, rms_global))
plt.xlabel(r'Wavelength [$\AA$]')
plt.ylabel(r'Row [pixel]')
# Finish
if outfile is not None:
plt.savefig(outfile, dpi=300)
plt.close()
else:
plt.show()
# restore default rcparams
utils.pyplot_rcparams_default()
[docs]
def fit2darc_orders_qa(pypeitFit, nspec, outfile=None):
""" QA on 2D fit of the wavelength solution of an Echelle spectrograph.
Each sub-panel contains a single order with the global fit and the
residuals.
Parameters
----------
pypeitFit: :class:`pypeit.core.fitting.PypeItFit`
Fit object for the 2D arc solution
nspec : int
size of the image in the spectral direction
outfile : str, optional
Write to this file. If not provided, show to screen
"""
msgs.info("Creating QA for 2D wavelength solution")
utils.pyplot_rcparams()
# Extract info from pypeitFit
xnspecmin1 = float(nspec - 1)
all_orders = pypeitFit['x2']
orders = np.unique(pypeitFit['x2'])
all_wv = pypeitFit['yval']/pypeitFit['x2']
all_pix = pypeitFit['xval']*xnspecmin1
gpm = pypeitFit.bool_gpm
nspec_coeff = pypeitFit['order'][0]
norder_coeff = pypeitFit['order'][1]
resid_wl_global = []
# Define pixels array
spec_vec_norm = np.arange(nspec)/xnspecmin1
# set the size of the plot
nrow = int(2)
ncol = int(np.ceil(len(orders) / 2.))
fig = plt.figure(figsize=(5 * ncol, 6 * nrow))
outer = gridspec.GridSpec(nrow, ncol, wspace=0.3, hspace=0.2)
# Loop over all the orders
for ii_row in range(nrow):
for ii_col in range(ncol):
if (ii_row * ncol + ii_col) < len(orders):
inner = gridspec.GridSpecFromSubplotSpec(2, 1,
height_ratios=[2, 1], width_ratios=[1],
subplot_spec=outer[ii_row * ncol + ii_col],
wspace=0.1, hspace=0.0)
ax0 = plt.Subplot(fig, inner[0])
ax1 = plt.Subplot(fig, inner[1], sharex=ax0)
plt.setp(ax0.get_xticklabels(), visible=False)
ii = orders[ii_row * ncol + ii_col]
# define the color
rr = (ii - np.max(orders)) / (np.min(orders) - np.max(orders))
gg = 0.0
bb = (ii - np.min(orders)) / (np.max(orders) - np.min(orders))
# Evaluate function
# evaluate solution
wv_order_mod = pypeitFit.eval(spec_vec_norm, x2=ii*np.ones_like(spec_vec_norm))
# Evaluate delta lambda
dwl = (wv_order_mod[-1] - wv_order_mod[0])/ii/xnspecmin1/(spec_vec_norm[-1] - spec_vec_norm[0])
# Estimate the residuals
on_order = all_orders == ii
this_order = all_orders[on_order]
this_pix = all_pix[on_order]
this_wv = all_wv[on_order]
this_msk = gpm[on_order]
wv_order_mod_resid = pypeitFit.eval(this_pix/xnspecmin1, x2=this_order)
resid_wl = (wv_order_mod_resid/ii - this_wv)
resid_wl_global = np.append(resid_wl_global, resid_wl[this_msk])
# Plot the fit
ax0.set_title('Order = {0:0.0f}'.format(ii))
ax0.plot(spec_vec_norm*xnspecmin1, wv_order_mod / ii / 10000., color=(rr, gg, bb), linestyle='-',
linewidth=2.5)
ax0.scatter(this_pix[~this_msk], (wv_order_mod_resid[~this_msk] / ii / 10000.) + \
100. * resid_wl[~this_msk] / 10000., marker='x', color='black', \
linewidth=2.5, s=16.)
ax0.scatter(this_pix[this_msk], (wv_order_mod_resid[this_msk] / ii / 10000.) + \
100. * resid_wl[this_msk] / 10000., color=(rr, gg, bb), \
linewidth=2.5, s=16.)
ax0.set_ylabel(r'Wavelength [$\mu$m]')
# Plot the residuals
ax1.scatter(this_pix[~this_msk], (resid_wl[~this_msk] / dwl), marker='x', color='black', \
linewidth=2.5, s=16.)
ax1.scatter(this_pix[this_msk], (resid_wl[this_msk] / dwl), color=(rr, gg, bb), \
linewidth=2.5, s=16.)
ax1.axhline(y=0., color=(rr, gg, bb), linestyle=':', linewidth=2.5)
ax1.get_yaxis().set_label_coords(-0.15, 0.5)
rms_order = np.std(resid_wl[this_msk])
ax1.set_ylabel(r'Res. [pix]')
ax0.text(0.1, 0.8, r'RMS={0:.3f} Pixel'.format(rms_order / np.abs(dwl)), ha="left", va="top",
transform=ax0.transAxes)
ax0.text(0.1, 0.9, r'$\Delta\lambda$={0:.3f} $\AA$/Pixel'.format(np.abs(dwl)), ha="left", va="top",
transform=ax0.transAxes)
ax0.get_yaxis().set_label_coords(-0.15, 0.5)
fig.add_subplot(ax0)
fig.add_subplot(ax1)
rms_global = np.std(resid_wl_global)
# Labeling
fig.text(0.5, 0.04, r'Row [pixel]', ha='center', size='large')
fig.suptitle(
r'Arc 2D FIT, norder_coeff={:d}, nspec_coeff={:d}, RMS={:5.3f} Ang*Order#, residuals $\times$100'.format(
norder_coeff,
nspec_coeff, rms_global))
# Finish
if outfile is not None:
plt.savefig(outfile, dpi=200)
plt.close()
else:
plt.show()
[docs]
def resize_mask2arc(shape_arc, slitmask_orig):
"""
Rebin the input slitmask to a new shape. Generally used
for cases where the arc image has a different binning than
the science image.
Parameters
----------
shape_arc : tuple
Shape of the arc
slitmask_orig : `numpy.ndarray`_ of floats
original slitmask
Returns
-------
slitmask : `numpy.ndarray`_ of floats
Slitmask with shape corresponding to that of the arc
"""
(nspec, nspat) = shape_arc
# Is our arc a different size than the other calibs? If yes, slit_left/slit_righ, slitpix, and inmask will
# be a different size
(nspec_orig,nspat_orig) = slitmask_orig.shape
if nspec_orig != nspec:
if ((nspec_orig > nspec) & (nspec_orig % nspec != 0)) | ((nspec > nspec_orig) & (nspec % nspec_orig != 0)):
msgs.error('Problem with images sizes. arcimg size and calibration size need to be integer multiples of each other')
else:
msgs.info('Calibration images have different binning than the arcimg. Resizing calibs for arc spectrum extraction.')
slitmask = utils.rebin_slice(slitmask_orig, (nspec, nspat))
else:
slitmask = slitmask_orig
return slitmask
[docs]
def resize_slits2arc(shape_arc, shape_orig, trace_orig):
"""
Resizes the trace (spat positions from where the arc was
extracted) created with some original binning to be
relevant to an arc with a different binning.
Args:
shape_arc (tuple):
shape of the arc
shape_orig (tuple):
original shape of the images used to create the trace
trace_orig (`numpy.ndarray`_ of floats):
trace that you want to resize
Returns:
`numpy.ndarray`_:
trace corresponding to the binning of the arc
"""
(nspec, nspat) = shape_arc
# Is our arc a different size than the other calibs? If yes, slit_left/slit_righ, slitpix, and inmask will
# be a different size
(nspec_orig,nspat_orig) = shape_orig
if nspec_orig != nspec:
msgs.info('Calibration images have different binning than the arcimg. Resizing calibs for arc spectrum extraction.')
spec_vec_orig = np.arange(nspec_orig)/float(nspec_orig - 1)
spec_vec = np.arange(nspec)/float(nspec - 1)
spat_ratio = float(nspat)/float(nspat_orig)
trace = (scipy.interpolate.interp1d(spec_vec_orig, spat_ratio*trace_orig, axis=0, bounds_error=False,fill_value='extrapolate'))(spec_vec)
else:
trace = trace_orig
return trace
[docs]
def resize_spec(spec_from, nspec_to):
"""
Resize the input spectrum (usually an arc spectrum)
to a new size using linear interpolation `scipy.interpolate.interp1d`_
Args:
spec_from (`numpy.ndarray`_):
one or more spectra to resize via interpolation. Shape
must be (nspec, nslits) or (nspec,).
nspec_to (int):
size of spectrum you wish to resize to.
Returns:
`numpy.ndarray`_: New spectra or spectrum with shape
(nspec_to, nslits) or (nspec_to,).
"""
nspec_from = spec_from.shape[0]
# Is our arc a different size than the other calibs? If yes, slit_left/slit_righ, slitpix, and inmask will
# be a different size
if nspec_from != nspec_to:
spec_vec_from = np.arange(nspec_from)/float(nspec_from - 1)
spec_vec_to = np.arange(nspec_to)/float(nspec_to - 1)
spec_to = (scipy.interpolate.interp1d(spec_vec_from, spec_from, axis=0, bounds_error=False,fill_value='extrapolate'))(spec_vec_to)
else:
spec_to = spec_from
return spec_to
[docs]
def get_censpec(slit_cen, slitmask, arcimg, gpm=None, box_rad=3.0,
nonlinear_counts=1e10, slit_bpm=None, slitIDs=None, verbose=True):
"""
Extract a boxcar spectrum with radius `box_rad` (pixels) from the input image using the
input trace. By default, outliers within the box are clipped
with 3.0 sigma rejection using `astropy.stats.sigma_clipped_stats`_.
Parameters
----------
slit_cen : `numpy.ndarray`_
Trace down the center of the slit. Shape (nspec, nslits)
slitmask : `numpy.ndarray`_
Image where pixel values identify its parent slit, starting with 0.
Pixels with -1 are not part of any slit. Shape must match `arcimg`.
arcimg : `numpy.ndarray`_
Image to extract the arc from. This should be an arcimage or perhaps a
frame with night sky lines. Shape (nspec, nspat)
gpm : `numpy.ndarray`_, optional
Input mask image with same shape as arcimg. Convention True = good and
False = bad. If None, all pixels are considered good. Shape must match
arcimg.
box_rad : :obj:`float`, optional
Half-width of the boxcar (floating-point pixels) in the spatial
direction used to extract the arc.
nonlinear_counts : :obj:`float`, optional
Values exceeding this input value are masked as bad.
slitIDs : :obj:`list`, optional
A list of the slit IDs to extract (if None, all slits will be extracted)
slit_bpm: `numpy.ndarray`_, bool, optional
Bad pixel mask for the slits. True = bad. Shape must be (nslits,). Arc
spectra are filled with np.nan for masked slits.
verbose : :obj:`bool`, optional
Print out verbose information?
Returns
-------
arc_spec : `numpy.ndarray`_
Array containing the extracted arc spectrum for each
slit. Shape is (nspec, nslits)
arc_spec_bpm : `numpy.ndarray`_
Bad-pixel mask for the spectra. Shape is (nspec,
nslits).
bpm_mask : `numpy.ndarray`_
Bad-slit mask, True means the entire spectrum is bad.
Shape is (nslits,).
"""
# Initialize the good pixel mask
_gpm = slitmask > -1 if gpm is None else gpm & (slitmask > -1)
# Mask saturated parts of the arc image for the extraction
_gpm = _gpm & (arcimg < nonlinear_counts)
# Inialize output
arc_spec = np.zeros_like(slit_cen)
# Iterate over slits
nslits = slit_cen.shape[1]
nspat = arcimg.shape[1]
spat = np.arange(nspat)
for islit in range(nslits):
# Check if this slit is to be extracted
if slitIDs is not None and islit not in slitIDs:
continue
# Check if this slit is masked
if slit_bpm is not None and slit_bpm[islit]:
msgs.info('Ignoring masked slit {}'.format(islit+1))
# TODO -- Avoid using NaNs
arc_spec[:,islit] = np.nan
continue
if verbose:
msgs.info(f'Extracting approximate arc spectrum of slit {islit+1}/{nslits}')
# Create a mask for the pixels that will contribue to the arc
arcmask = _gpm & (np.absolute(spat[None,:] - slit_cen[:,islit,None]) < box_rad)
# Trimming the image makes this much faster
indx = np.nonzero(np.any(arcmask, axis=0))[0]
if len(indx) == 0: # Must have been a masked slit
arc_spec[:,islit] = np.nan
continue
left, right = np.clip([indx[0]-4, indx[-1]+5], 0, nspat)
# TODO JFH Add cenfunc and std_func here, using median and the use_mad fix.
arc_spec[:,islit] = stats.sigma_clipped_stats(arcimg[:,left:right],
mask=np.invert(arcmask[:,left:right]),
sigma=3.0, axis=1,
cenfunc = np.nanmedian, stdfunc=np.nanstd)[1]
# Get the mask, set the masked values to 0, and return
arc_spec_bpm = np.isnan(arc_spec)
arc_spec[arc_spec_bpm] = 0.0
return arc_spec, arc_spec_bpm, np.all(arc_spec_bpm, axis=0)
[docs]
def detect_peaks(x, mph=None, mpd=1, threshold=0, edge='rising',
kpsh=False, valley=False, show=False, ax=None):
"""Detect peaks in data based on their amplitude and other features.
This is generally, but not exclusively, used for arc line detecting.
This code was taken from https://github.com/demotu/BMC
Parameters
----------
x : array-like
1D vector with data
mph : {None, number}, optional (default = None)
detect peaks that are greater than minimum peak height (if parameter
`valley` is False) or peaks that are smaller than maximum peak height
(if parameter `valley` is True).
mpd : positive integer, optional (default = 1)
detect peaks that are at least separated by minimum peak distance (in
number of data).
threshold : positive number, optional (default = 0)
detect peaks (valleys) that are greater (smaller) than `threshold`
in relation to their immediate neighbors.
edge : {None, 'rising', 'falling', 'both'}, optional (default = 'rising')
for a flat peak, keep only the rising edge ('rising'), only the
falling edge ('falling'), both edges ('both'), or don't detect a
flat peak (None).
kpsh : bool, optional (default = False)
keep peaks with same height even if they are closer than `mpd`.
valley : bool, optional (default = False)
if True (1), detect valleys (local minima) instead of peaks.
show : bool, optional (default = False)
if True (1), plot data in matplotlib figure.
ax : `matplotlib.axes.Axes`_, optional
`matplotlib.axes.Axes`_ instance to use when plotting. If
None and ``show`` is True, a new instance is constructed.
Returns
-------
ind : array-like
1D vector with element indices containing the peaks in `x`
Notes
-----
The detection of valleys instead of peaks is performed internally by simply
negating the data::
ind_valleys = detect_peaks(-x)
The function can handle NaN's
See this IPython Notebook [1]_.
.. code-block:: python
__author__ = "Marcos Duarte, https://github.com/demotu/BMC"
__version__ = "1.0.5"
__license__ = "MIT"
Version history:
* '1.0.5': The sign of `mph` is inverted if parameter `valley` is True
References
----------
.. [1] http://nbviewer.ipython.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb
Examples
--------
>>> from pypeit.core.arc import detect_peaks
>>> x = np.random.randn(100)
>>> x[60:81] = np.nan
>>> # detect all peaks and plot data
>>> ind = detect_peaks(x, show=True)
>>> print(ind)
>>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5
>>> # set minimum peak height = 0 and minimum peak distance = 20
>>> detect_peaks(x, mph=0, mpd=20, show=True)
>>> x = [0, 1, 0, 2, 0, 3, 0, 2, 0, 1, 0]
>>> # set minimum peak distance = 2
>>> detect_peaks(x, mpd=2, show=True)
>>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5
>>> # detection of valleys instead of peaks
>>> detect_peaks(x, mph=-1.2, mpd=20, valley=True, show=True)
>>> x = [0, 1, 1, 0, 1, 1, 0]
>>> # detect both edges
>>> detect_peaks(x, edge='both', show=True)
>>> x = [-2, 1, -2, 2, 1, 1, 3, 0]
>>> # set threshold = 2
>>> detect_peaks(x, threshold = 2, show=True)
"""
x = np.atleast_1d(x).astype('float64')
if x.size < 3:
return np.array([], dtype=int)
if valley:
x = -x
if mph is not None:
mph = -mph
# find indices of all peaks
dx = x[1:] - x[:-1]
# handle NaN's
indnan = np.where(np.isnan(x))[0]
if indnan.size:
x[indnan] = np.inf
dx[np.where(np.isnan(dx))[0]] = np.inf
ine, ire, ife = np.array([[], [], []], dtype=int)
if not edge:
ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0]
else:
if edge.lower() in ['rising', 'both']:
ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0]
if edge.lower() in ['falling', 'both']:
ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0]
ind = np.unique(np.hstack((ine, ire, ife)))
# handle NaN's
if ind.size and indnan.size:
# NaN's and values close to NaN's cannot be peaks
ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)]
# first and last values of x cannot be peaks
if ind.size and ind[0] == 0:
ind = ind[1:]
if ind.size and ind[-1] == x.size - 1:
ind = ind[:-1]
# remove peaks < minimum peak height
if ind.size and mph is not None:
ind = ind[x[ind] >= mph]
# remove peaks - neighbors < threshold
if ind.size and threshold > 0:
dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0)
ind = np.delete(ind, np.where(dx < threshold)[0])
# detect small peaks closer than minimum peak distance
if ind.size and mpd > 1:
ind = ind[np.argsort(x[ind], kind='stable')][::-1] # sort ind by peak height
idel = np.zeros(ind.size, dtype=bool)
for i in range(ind.size):
if not idel[i]:
# keep peaks with the same height if kpsh is True
idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \
& (x[ind[i]] > x[ind] if kpsh else True)
idel[i] = 0 # Keep current peak
# remove the small peaks and sort back the indices by their occurrence
ind = np.sort(ind[~idel])
if show:
if indnan.size:
x[indnan] = np.nan
if valley:
x = -x
if mph is not None:
mph = -mph
plot_detect_peaks(x, mph, mpd, threshold, edge, valley, ax, ind)
return ind
[docs]
def plot_detect_peaks(x, mph, mpd, threshold, edge, valley, ax, ind):
"""Plot results of the :class:`pypeit.core.arc.detect_peaks` function, see its help
for a description of the variables.
Only used for debugging
"""
if ax is None:
_, ax = plt.subplots(1, 1, figsize=(8, 4))
ax.plot(x, 'b', lw=1)
if ind.size:
label = 'valley' if valley else 'peak'
label = label + 's' if ind.size > 1 else label
ax.plot(ind, x[ind], '+', mfc=None, mec='r', mew=2, ms=8,
label='%d %s' % (ind.size, label))
ax.legend(loc='best', framealpha=.5, numpoints=1)
ax.set_xlim(-.02 * x.size, x.size * 1.02 - 1)
ymin, ymax = x[np.isfinite(x)].min(), x[np.isfinite(x)].max()
yrange = ymax - ymin if ymax > ymin else 1
ax.set_ylim(ymin - 0.1 * yrange, ymax + 0.1 * yrange)
ax.set_xlabel('Data #', fontsize=14)
ax.set_ylabel('Amplitude', fontsize=14)
mode = 'Valley detection' if valley else 'Peak detection'
ax.set_title("%s (mph=%s, mpd=%f, threshold=%s, edge='%s')"
% (mode, str(mph), mpd, str(threshold), edge))
# plt.grid()
plt.show()
[docs]
def iter_continuum(spec, gpm=None, fwhm=4.0, sigthresh = 2.0, sigrej=3.0, niter_cont = 3, cont_samp = 30, cont_frac_fwhm=1.0,
cont_mask_neg=False, qa_title='', npoly=None, debug_peak_find=False, debug=False):
"""
Routine to determine the continuum and continuum pixels in spectra with peaks.
The general procedure is to:
- Detect positive "peaks" using :class:`pypeit.core.arc.detect_peaks`
- Optionally, detect negative "peaks" using :class:`pypeit.core.arc.detect_peaks`
- Mask these peaks
- Generate a running median with sampling width set by `cont_samp`
- Fit this median continuum with a polynomial of order `npoly` (if set)
- Evaluate the fit (`npoly` is set) or interpolate the median at all locations
Note: This was developed for arc line spectra and may not function well in other
contexts.
Parameters
----------
spec : `numpy.ndarray`_ of floats
1D spectrum with shape (nspec,) for which the continuum is to be characterized
gpm : `numpy.ndarray`_, bool
A mask with shape (nspec,) indicating which pixels are good. True = Good, False=Bad
niter_cont : int, default = 3, optional
Number of iterations of peak finding, masking, and continuum fitting used to define the continuum.
npoly : int, default = None, optional
If set the code will perform a polynomimal fit to the interpolate a running median filter of the
continuum points instead of the default behavior which is to just return the
interpolated running median filter
sigthresh : float, default = 2.0, optional
Signifiance threshold for peak finding
sigrej : float, default = 3.0, optional
Sigma clipping rejection threshold for threshold determination
fwhm : float, default = 4.0, optional
Number of pixels per fwhm resolution element.
cont_samp: float, default = 30.0, optional
The number of samples across the spectrum used for continuum subtraction. Continuum subtraction is done via
median filtering, with a width of ngood/cont_samp, where ngood is the number of good pixels for estimating the continuum
(i.e. that don't have peaks).
cont_frac_fwhm : float, default = 1.0, optional
Width used for masking peaks in the spectrum when the continuum is being defined. Expressed as a fraction of the fwhm
parameter
cont_mask_neg: bool, default = False, optional
If True, the code will also search for negative peaks when iteratively determining the continuum. This option is
used for object finding in the near-IR where there will also be negative peaks.
cont_samp: float, default = 30.0, optional
The number of samples across the spectrum used for continuum subtraction. Continuum subtraction is done via
median filtering, with a width of ngood/cont_samp, where ngood is the number of good pixels for estimating the continuum
debug: bool, default = False, optional
Show plots for debugging
Returns
-------
cont: `numpy.ndarray`_ of floats
The continuum determined with shape (nspec,)
cont_mask: `numpy.ndarray`_ of bool
A mask indicating which pixels were used for continuum determination with shape (nspec,)
"""
if gpm is None:
gpm = np.ones(spec.size,dtype=bool)
cont_mask = np.copy(gpm)
else:
cont_mask = np.copy(gpm)
nspec = spec.size
spec_vec = np.arange(nspec)
cont_now = np.zeros(nspec)
mask_sm = np.round(cont_frac_fwhm*fwhm).astype(int)
mask_odd = mask_sm + 1 if mask_sm % 2 == 0 else mask_sm
nspec_available = np.sum(gpm)
max_mask_frac = 0.70
max_nmask = int(np.ceil((max_mask_frac)*nspec_available))
for iter in range(niter_cont):
spec_sub = spec - cont_now
mask_sigclip = np.logical_not(cont_mask & gpm)
(mean, med, stddev) = stats.sigma_clipped_stats(spec_sub, mask=mask_sigclip, sigma_lower=sigrej,
sigma_upper=sigrej, cenfunc='median', stdfunc=utils.nan_mad_std)
# be very liberal in determining threshold for continuum determination
thresh = med + sigthresh*stddev
pixt_now = detect_peaks(spec_sub, mph=thresh, mpd=fwhm*0.75, show=debug_peak_find)
# mask out the peaks we find for the next continuum iteration
cont_mask_fine = np.ones_like(cont_now)
cont_mask_fine[pixt_now] = 0.0
if cont_mask_neg is True:
pixt_now_neg = detect_peaks(-spec_sub, mph=thresh, mpd=fwhm * 0.75, show=debug_peak_find)
cont_mask_fine[pixt_now_neg] = 0.0
# cont_mask is the mask for defining the continuum regions: True is good, False is bad
peak_mask = (utils.smooth(cont_mask_fine,mask_odd) > 0.999)
cont_mask = peak_mask & gpm
# If more than max_mask_frac of the nspec_available are getting masked than short circuit this masking
#frac_mask = np.sum(np.invert(cont_mask))/float(nspec)
nmask = np.sum(np.invert(peak_mask[gpm]))
if nmask > max_nmask:
msgs.warn('Too many pixels {:d} masked in spectrum continuum definiton: frac_mask = {:5.3f} > {:5.3f} which is '
'max allowed. Only masking the {:d} largest values....'.format(nmask, nmask/nspec_available, max_mask_frac, max_nmask))
# Old
#cont_mask = np.ones_like(cont_mask) & gpm
# Short circuit the masking and just mask the 0.70 most offending pixels
peak_mask_ind = np.where(np.logical_not(peak_mask) & gpm)[0]
isort = np.argsort(np.abs(spec[peak_mask_ind]), kind='stable')[::-1]
peak_mask_new = np.ones_like(peak_mask)
peak_mask_new[peak_mask_ind[isort[0:max_nmask]]] = False
cont_mask = peak_mask_new & gpm
ngood = np.sum(cont_mask)
if ngood == 0:
msgs.warn("All pixels rejected for continuum. Returning a 0 array")
return np.zeros_like(spec), cont_mask
samp_width = np.ceil(ngood/cont_samp).astype(int)
cont_med = utils.fast_running_median(spec[cont_mask], samp_width)
if npoly is not None:
# ToDO robust_poly_fit needs to return minv and maxv as outputs for the fits to be usable downstream
pypeitFit = fitting.robust_fit(spec_vec[cont_mask].astype(float), cont_med,
npoly, function='polynomial', maxiter=25,
upper=3.0, lower=3.0, minx=0.0,
maxx=float(nspec-1))
cont_now = pypeitFit.eval(spec_vec.astype(float))
else:
cont_now = np.interp(spec_vec,spec_vec[cont_mask],cont_med)
if debug & (iter == (niter_cont-1)):
plt.plot(spec_vec, spec,'k', label='Spectrum')
#plt.plot(spec_vec, spec*cont_mask,'k', label='Spectrum*cont_mask')
plt.plot(spec_vec, cont_now,'g',label='continuum')
plt.plot(spec_vec, spec - cont_now,'b',label='spec-cont')
plt.plot(spec_vec[cont_mask], spec[cont_mask], color='cyan', markersize=3.0,
mfc='cyan', linestyle='None', fillstyle='full',
zorder=9, marker='o', label = 'Used for cont')
plt.plot(spec_vec[np.invert(cont_mask)], spec[np.invert(cont_mask)], color='red', markersize=5.0,
mfc='red', linestyle='None', fillstyle='full',
zorder=9, marker='o', label = 'masked for cont')
plt.title(qa_title)
plt.legend()
plt.show()
return cont_now, cont_mask
[docs]
def detect_lines(censpec, sigdetect=5.0, fwhm=4.0, fit_frac_fwhm=1.25, input_thresh=None,
cont_subtract=True, cont_frac_fwhm=1.0, max_frac_fwhm=3.0,
min_pkdist_frac_fwhm=0.75, cont_samp=30, nonlinear_counts=1e10, niter_cont=3,
nfind=None, bpm=None, verbose=False, debug=False, debug_peak_find=False, debug_cont=False):
"""
Identify peaks in an input arc spectrum that satisfy a series of criteria:
- Sufficient signal (set by sigdetect)
- Peak amplitude < nonlinear_counts
- Peak amplitude > input_thresh (optional)
- Measured FWHM < fwhm * max_frac_fwhm
By default, the input spectrum has a continuum fitted to it and then
subtracted prior to peak finding.
Parameters
----------
censpec : `numpy.ndarray`_
A 1D spectrum to be searched for significant detections, shape = (nspec,)
sigdetect : float, default=5., optional
Sigma threshold above fluctuations for arc-line detection.
Arcs are continuum subtracted and the fluctuations are
computed after continuum subtraction.
input_thresh : float, str, optional
Optionally the user can specify the threhsold that peaks must
be above to be kept. In this case the sigdetect parameter will
be ignored. This is most useful for example for cases where
cont_subtract =False, and the user prefers to determine the
significance threhsold outside of this routine, rather than
using this routines defaults to determine the continuum level
and standard deviation of the continuum subtracted spetrum. If
a string input of 'None' is set then the code will simply
return all peaks irrespective of any threshold. This is
equivalent to setting the mph parameter to None in the
detect_peaks code.
fwhm : float, default = 4.0, optional
Number of pixels per fwhm resolution element.
fit_frac_fwhm: float, default 1.25, optional
Number of pixels that are used in the fits for Gaussian arc
line centroiding expressed as a fraction of the fwhm parameter
max_frac_fwhm: float, default = 3.0, optional
maximum width allowed for usable arc lines expressed relative
to the fwhm.
min_pkdist_frac_fwhm: float, default = 0.75, optional
minimum allowed separation between peaks expressed relative to
the fwhm.
cont_frac_fwhm : float, default = 1.0, optional
width used for masking peaks in the spectrum when the
continuum is being defined. Expressed as a fraction of the
fwhm parameter
cont_subtract: bool, default = True, optional
If true, the code will continuum subtract the input array by
iteratively determining the continuum
cont_samp: float, default = 30.0, optional
The number of samples across the spectrum used for continuum
subtraction. Continuum subtraction is done via median
filtering, with a width of ngood/cont_samp, where ngood is the
number of good pixels for estimating the continuum (i.e. that
don't have peaks).
niter_cont: int, default = 3, optional
Number of iterations of peak finding, masking, and continuum
fitting used to define the continuum.
nonlinear_counts: float, default = 1e10, optional
Value above which to mask saturated arc lines. This should be
nonlinear_counts= nonlinear*saturation according to pypeit
parsets. Default is 1e10 which is to not mask.
nfind: int, default = None, optional
Return only the nfind highest significance lines. The default
is None, which means the code will return all the lines above
the significance threshold.
bpm: `numpy.ndarray`_, optional
Bad-pixel mask for input spectrum. If None, all pixels
considered good. If passed in shape must match that of censpec
verbose: bool, default = False
Output more stuff to the screen.
debug: bool, default = False
Make plots showing results of peak finding and final arc lines that are used.
debug_cont: bool, default = False
Make plots showing results of continuum fitting
Returns
-------
tampl : `numpy.ndarray`_
The amplitudes of the line detections in the true arc
tampl_cont : `numpy.ndarray`_
The amplitudes of the line detections in the continuum subtracted arc
tcent : `numpy.ndarray`_
The centroids of the line detections
twid : `numpy.ndarray`_
The 1sigma Gaussian widths of the line detections
center : `numpy.ndarray`_
The variance on tcent
w : `numpy.ndarray`_
An index array indicating which detections are the most reliable.
arc : `numpy.ndarray`_
The continuum sutracted arc used to find detections.
nsig : `numpy.ndarray`_
The significance of each line detected relative to the 1sigma
variation in the continuum subtracted arc in the line free
region. Bad lines are assigned a significance of -1, since they
don't have an amplitude fit
"""
# Detect the location of the arc lines
if verbose:
msgs.info("Detecting lines...isolating the strongest, nonsaturated lines")
# TODO: Why is this here? Can't the calling function be required to
# pass a single spectrum? This is not reflected in the docstring.
# OLD CODE
#if len(censpec.shape) == 3:
# detns = censpec[:, 0].flatten()
#else:
# detns = censpec.copy()
#detns = detns.astype(np.float)
# Need to use bpm since when arc spectrum is padded, the padding can make the thresh low and also will screw up
# continuum fitting
bpm_out = censpec == 0.0 if bpm is None else bpm
xrng = np.arange(censpec.size, dtype=float)
if cont_subtract:
cont_now, cont_mask = iter_continuum(censpec, gpm=np.logical_not(bpm_out), fwhm=fwhm, niter_cont=niter_cont,
cont_samp=cont_samp, cont_frac_fwhm=cont_frac_fwhm, debug=debug_cont)
else:
cont_mask = np.ones(censpec.size, dtype=bool)
cont_now = np.zeros_like(censpec)
arc = (censpec - cont_now)*np.logical_not(bpm_out)
if input_thresh is None:
(mean, med, stddev) = stats.sigma_clipped_stats(arc[cont_mask & np.logical_not(bpm_out)],
sigma_lower=3.0, sigma_upper=3.0, cenfunc= np.nanmedian,
stdfunc = np.nanstd)
thresh = med + sigdetect*stddev
if stddev == 0.0:
msgs.warn('stddev = 0.0, so resetting to 1.0')
stddev = 1.0
else:
med = 0.0
if isinstance(input_thresh,(float, int)):
thresh = input_thresh
elif isinstance(input_thresh, str):
if input_thresh == 'None':
thresh = None
else:
msgs.error('Unrecognized value for thresh')
stddev = 1.0
# Find the peak locations
pixt = detect_peaks(arc, mph=thresh, mpd=fwhm*min_pkdist_frac_fwhm, show=debug_peak_find)
# Peak up the centers and determine the widths using a Gaussian fit
nfitpix = np.round(fit_frac_fwhm*fwhm).astype(int)
fwhm_max = max_frac_fwhm*fwhm
tampl_fit, tcent, twid, centerr = fit_arcspec(xrng, arc, pixt, nfitpix)
# Set the amplitudes using the spectra directly for both the input
# and continuum-subtracted spectrum.
# TODO: Why does this interpolate to pixt and not tcent?
tampl_true = np.interp(pixt, xrng, censpec)
tampl = np.interp(pixt, xrng, arc)
# Find the lines that meet the following criteria:
# - Amplitude is in the linear regime of the detector response
# - Center is within the limits of the spectrum
# - The Gaussian-fitted center and the center from `detect_lines`
# are not different by more than 0.75*FWHM
# - Width is finite, greater than 0, and less than FWHM_MAX/2.35
good = np.invert(np.isnan(twid)) & (twid > 0.0) & (twid < fwhm_max/2.35) & (tcent > 0.0) \
& (tcent < xrng[-1]) & (tampl_true < nonlinear_counts) \
& (np.abs(tcent-pixt) < fwhm*0.75)
# Get the indices of the good measurements
ww = np.where(good)[0]
# Compute the significance of each line, set the significance of bad lines to be -1
nsig = (tampl - med)/stddev
# If the user requested the nfind most significant peaks have been
# requested, then grab and return only these lines
if nfind is not None:
if nfind > len(nsig):
msgs.warn('Requested {0} peaks but only found {1}. '.format(nfind, len(tampl)) +
' Returning all the peaks found.')
else:
ikeep = (nsig.argsort()[::-1])[0:nfind]
tampl_true = tampl_true[ikeep]
tampl = tampl[ikeep]
tcent = tcent[ikeep]
twid = twid[ikeep]
centerr = centerr[ikeep]
ww = np.where(good[ikeep])[0]
nsig = nsig[ikeep]
good = good[ikeep]
if debug:
# NOTE: Uses pixt because apparently tcent can be set to -1 in fit_arcspec
# TODO: Replace any values of tcent that are -1 with the value of pixt?
find_lines_qa(arc, pixt, tampl, good, bpm=bpm, thresh=thresh, nonlinear=nonlinear_counts)
# TODO: Change this to return `good` instead of `ww`
return tampl_true, tampl, tcent, twid, centerr, ww, arc, nsig
[docs]
def fit_arcspec(xarray, yarray, pixt, fitp):
"""
Fit a series of pre-identified arc spectrum lines.
The implementation is a simple 3-parameter Gaussian (amplitude, centroid, width)
Parameters
----------
xarray: `numpy.ndarray`_
x-values of the input spectrum.
Either pixels or normalized pixel space (0-1)
yarray: `numpy.ndarray`_
Arc spectrum in counts
pixt: `numpy.ndarray`_
Initial guess for the center of the lines
fitp: int
Number of pixels to fit with
Returns
-------
ampl : `numpy.ndarray`_
amplitudes of the fitted lines
cent : `numpy.ndarray`_
centroids of the fitted lines
-999. are bad fits
widt : `numpy.ndarray`_
widths (sigma) of the fitted lines
centerr : `numpy.ndarray`_
error in the centroids of the fitted lines
"""
fitp_even = fitp if fitp % 2 == 0 else fitp + 1
fit_interval = fitp_even//2
# Setup the arrays with fit parameters
sz_p = pixt.size
sz_a = yarray.size
b = np.full(sz_p, -999.0, dtype=float)
ampl = np.full(sz_p, -999.0, dtype=float)
cent = np.full(sz_p, -999.0, dtype=float)
widt = np.full(sz_p, -999.0, dtype=float)
centerr =np.full(sz_p, -999.0, dtype=float)
for p in range(sz_p):
# This interval is always symmetric about the peak
pmin = pixt[p] - fit_interval
pmax = pixt[p] + fit_interval + 1
if pmin < 0:
pmin = 0
if pmax > sz_a:
pmax = sz_a
if pmin == pmax:
continue
if (pmax - pmin) < (fit_interval):
continue # Probably won't be a good solution
# JFH Removed below
# if pixt[p]-pmin <= 1 or pmax-pixt[p] <= 1:
# continue # Probably won't be a good solution
# Fit the gaussian
try:
fitc, fitcov = fitting.fit_gauss(xarray[pmin:pmax], yarray[pmin:pmax])
ampl[p], cent[p], widt[p] = fitc
centerr[p] = fitcov[1, 1]
except RuntimeError:
pass
return ampl, cent, widt, centerr
[docs]
def find_lines_qa(spec, cen, amp, good, bpm=None, thresh=None, nonlinear=None):
"""
Show a QA plot for the line detection.
Only used for debugging
Detected lines are marked and then
color coded by a variety of criteria.
Args:
spec (`numpy.ndarray`_):
Spectrum used to detect lines
cen (`numpy.ndarray`_):
Identified line peaks
amp (`numpy.ndarray`_):
Amplitude of the identified lines.
good (`numpy.ndarray`_):
Boolean array selecting the good line detections.
bpm (`numpy.ndarray`_, optional):
The bad-pixel mask for the spectrum. If None, all pixels
are assumed to be valid.
thresh (:obj:`float`, optional):
Threshold value for line detection
nonlinear (:obj:`float`, optional):
Threshold for nonlinear detector response.
"""
# TODO: Could just pull `amp` directly from the spectrum
# If bpm is provided, the masked pixels are *not* shown
_spec = np.ma.MaskedArray(spec, mask=np.zeros(spec.size, dtype=bool) if bpm is None else bpm)
pix = np.arange(_spec.size)
plt.figure(figsize=(14, 6))
plt.step(pix, _spec, color='k', where='mid', label='arc', lw=1.0)
plt.scatter(cen[np.invert(good)], amp[np.invert(good)], marker='+', color='C3', s=50,
label='bad for tilts')
plt.scatter(cen[good], amp[good], color='C2', marker='+', s=50, label='good for tilts')
if thresh is not None:
plt.axhline(thresh, color='cornflowerblue', linestyle=':', linewidth=2.0,
label='threshold', zorder=10)
if nonlinear is not None and nonlinear < 1e9:
plt.axhline(nonlinear, color='orange', linestyle='--', linewidth=2.0, label='nonlinear',
zorder=10)
ngood = np.sum(good)
plt.title('Good Lines = {0}, Bad Lines = {1}'.format(ngood, len(good)-ngood))
plt.ylim(np.amin(spec), 1.5 * np.amax(spec))
plt.legend()
plt.show()