Source code for pypeit.core.arc

"""
Module containing the core methods for arc-lamp fitting 
and basic analysis.  Note that there are additional modules
in pypeit.core.wavecal related to wavelength calibration.

.. include:: ../include/links.rst

"""


import numpy as np
from matplotlib import gridspec
from matplotlib import pyplot as plt


import scipy
from astropy import stats

from pypeit import msgs
from pypeit import utils
from pypeit.core import fitting
from IPython import embed


[docs]def fit2darc(all_wv,all_pix,all_orders,nspec, nspec_coeff=4,norder_coeff=4,sigrej=3.0, func2d='legendre2d', debug=False): """Routine to obtain the 2D wavelength solution for an echelle spectrograph. This is calculated from the spec direction pixel-centroid and the order number of identified arc lines. The fit is a simple least-squares with rejections. This is a port of the XIDL code: x_fit2darc.pro Parameters ---------- all_wv: `numpy.ndarray`_ wavelengths of the identified lines all_pix: `numpy.ndarray`_ Spectral direction centroid positions of the identified lines all_orders: `numpy.ndarray`_ Echelle order number for each of the identified lines nspec: int size of the image in the spectral direction nspec_coeff : int, optional order of the fitting along the spectral (pixel) direction for each order norder_coeff : int, optional order of the fitting in the order direction sigrej: float, optional sigma level for the rejection debug: bool, optional If True, show extra plots to check the status of the procedure Returns ------- pypeitFit : :class:`pypeit.core.fitting.PypeItFit` 2D wavelength solution fit """ # Normalize for pixels. Fits are performed in normalized units (pixels/(nspec-1) to be able to deal with various # binnings. min_spec = 0.0 max_spec = 1.0 xnspecmin1 = float(nspec-1) # Normalize for orders min_order = np.min(all_orders) max_order = np.max(all_orders) if debug: # set some plotting parameters utils.pyplot_rcparams() plt.figure(figsize=(7,5)) msgs.info("Plot identified lines") cm = plt.cm.get_cmap('RdYlBu_r') sc = plt.scatter(all_orders, all_pix,c=all_wv/10000., cmap=cm) cbar = plt.colorbar(sc) cbar.set_label(r'Wavelength [$\mu$m]', rotation=270, labelpad=20) plt.xlabel(r'Normalized Orders') plt.ylabel(r'Normalized Pixels') plt.title(r'Location of the identified lines') plt.show() # Fit the product of wavelength and order number with a 2d legendre polynomial all_wv_order = all_wv * all_orders pypeitFit = fitting.robust_fit(all_pix/xnspecmin1, all_wv_order, (nspec_coeff, norder_coeff), x2=all_orders, function=func2d, maxiter=100, lower=sigrej, upper=sigrej, minx=min_spec,maxx=max_spec, minx2=min_order, maxx2=max_order, use_mad=True, sticky=False) # Report the RMS fin_rms = pypeitFit.calc_fit_rms(x2=all_orders, apply_mask=True) msgs.info("RMS: {0:.5f} Ang*Order#".format(fin_rms)) if debug: fit2darc_global_qa(pypeitFit, nspec) fit2darc_orders_qa(pypeitFit, nspec) return pypeitFit
[docs]def fit2darc_global_qa(pypeitFit, nspec, outfile=None): """ Generate a QA plot for the 2D fit of the wavelength solution generated by :class:`pypeit.core.arc.fit2darc` Parameters ---------- pypeitFit : :class:`pypeit.core.fitting.PypeItFit` Fit object for the 2D arc solution nspec: int Size of the image in the spectral direction outfile: str, optional Name of the outfile to write to disk. If not provided, show to screen. """ msgs.info("Creating QA for 2D wavelength solution") utils.pyplot_rcparams() # Extract info from pypeitFit xnspecmin1 = float(nspec - 1) all_orders = pypeitFit['x2'] orders = np.unique(pypeitFit['x2']) all_wv = pypeitFit['yval']/pypeitFit['x2'] all_pix = pypeitFit['xval']*xnspecmin1 gpm = pypeitFit.bool_gpm nspec_coeff = pypeitFit['order'][0] norder_coeff = pypeitFit['order'][1] resid_wl_global = [] # Define pixels array spec_vec_norm = np.arange(nspec)/xnspecmin1 # Define figure properties plt.figure(figsize=(8, 5)) # Variable where to store the max wavelength covered by the # spectrum mx = 0. # Loop over orders for ii in orders: # define the color rr = (ii - np.max(orders)) / (np.min(orders) - np.max(orders)) gg = 0.0 bb = (ii - np.min(orders)) / (np.max(orders) - np.min(orders)) # evaluate solution wv_order_mod = pypeitFit.eval(spec_vec_norm, x2=np.ones_like(spec_vec_norm)*ii) # Plot solution plt.plot(wv_order_mod / ii, spec_vec_norm*xnspecmin1, color=(rr, gg, bb), linestyle='-', linewidth=2.5) # Evaluate residuals at each order on_order = all_orders == ii this_pix = all_pix[on_order] this_wv = all_wv[on_order] this_msk = gpm[on_order] this_order = all_orders[on_order] wv_order_mod_resid = pypeitFit.eval(this_pix/xnspecmin1, x2=this_order) resid_wl = (wv_order_mod_resid / ii - this_wv) resid_wl_global = np.append(resid_wl_global, resid_wl[this_msk]) plt.scatter((wv_order_mod_resid[~this_msk] / ii) + \ 100. * resid_wl[~this_msk], this_pix[~this_msk], \ marker='x', color='black', linewidths=2.5, s=16.) plt.scatter((wv_order_mod_resid[this_msk] / ii) + \ 100. * resid_wl[this_msk], this_pix[this_msk], \ color=(rr, gg, bb), linewidth=2.5, s=16.) if np.max(wv_order_mod_resid / ii) > mx: mx = np.max(wv_order_mod_resid / ii) rms_global = np.std(resid_wl_global) plt.text(mx, np.max(spec_vec_norm*xnspecmin1), r'residuals $\times$100', \ ha="right", va="top") plt.title(r'Arc 2D FIT, norder_coeff={:d}, nspec_coeff={:d}, RMS={:5.3f} Ang*Order#'.format( norder_coeff, nspec_coeff, rms_global)) plt.xlabel(r'Wavelength [$\AA$]') plt.ylabel(r'Row [pixel]') # Finish if outfile is not None: plt.savefig(outfile, dpi=300) plt.close() else: plt.show() # restore default rcparams utils.pyplot_rcparams_default()
[docs]def fit2darc_orders_qa(pypeitFit, nspec, outfile=None): """ QA on 2D fit of the wavelength solution of an Echelle spectrograph. Each sub-panel contains a single order with the global fit and the residuals. Parameters ---------- pypeitFit: :class:`pypeit.core.fitting.PypeItFit` Fit object for the 2D arc solution nspec : int size of the image in the spectral direction outfile : str, optional Write to this file. If not provided, show to screen """ msgs.info("Creating QA for 2D wavelength solution") utils.pyplot_rcparams() # Extract info from pypeitFit xnspecmin1 = float(nspec - 1) all_orders = pypeitFit['x2'] orders = np.unique(pypeitFit['x2']) all_wv = pypeitFit['yval']/pypeitFit['x2'] all_pix = pypeitFit['xval']*xnspecmin1 gpm = pypeitFit.bool_gpm nspec_coeff = pypeitFit['order'][0] norder_coeff = pypeitFit['order'][1] resid_wl_global = [] # Define pixels array spec_vec_norm = np.arange(nspec)/xnspecmin1 # set the size of the plot nrow = int(2) ncol = int(np.ceil(len(orders) / 2.)) fig = plt.figure(figsize=(5 * ncol, 6 * nrow)) outer = gridspec.GridSpec(nrow, ncol, wspace=0.3, hspace=0.2) # Loop over all the orders for ii_row in range(nrow): for ii_col in range(ncol): if (ii_row * ncol + ii_col) < len(orders): inner = gridspec.GridSpecFromSubplotSpec(2, 1, height_ratios=[2, 1], width_ratios=[1], subplot_spec=outer[ii_row * ncol + ii_col], wspace=0.1, hspace=0.0) ax0 = plt.Subplot(fig, inner[0]) ax1 = plt.Subplot(fig, inner[1], sharex=ax0) plt.setp(ax0.get_xticklabels(), visible=False) ii = orders[ii_row * ncol + ii_col] # define the color rr = (ii - np.max(orders)) / (np.min(orders) - np.max(orders)) gg = 0.0 bb = (ii - np.min(orders)) / (np.max(orders) - np.min(orders)) # Evaluate function # evaluate solution wv_order_mod = pypeitFit.eval(spec_vec_norm, x2=ii*np.ones_like(spec_vec_norm)) # Evaluate delta lambda dwl = (wv_order_mod[-1] - wv_order_mod[0])/ii/xnspecmin1/(spec_vec_norm[-1] - spec_vec_norm[0]) # Estimate the residuals on_order = all_orders == ii this_order = all_orders[on_order] this_pix = all_pix[on_order] this_wv = all_wv[on_order] this_msk = gpm[on_order] wv_order_mod_resid = pypeitFit.eval(this_pix/xnspecmin1, x2=this_order) resid_wl = (wv_order_mod_resid/ii - this_wv) resid_wl_global = np.append(resid_wl_global, resid_wl[this_msk]) # Plot the fit ax0.set_title('Order = {0:0.0f}'.format(ii)) ax0.plot(spec_vec_norm*xnspecmin1, wv_order_mod / ii / 10000., color=(rr, gg, bb), linestyle='-', linewidth=2.5) ax0.scatter(this_pix[~this_msk], (wv_order_mod_resid[~this_msk] / ii / 10000.) + \ 100. * resid_wl[~this_msk] / 10000., marker='x', color='black', \ linewidth=2.5, s=16.) ax0.scatter(this_pix[this_msk], (wv_order_mod_resid[this_msk] / ii / 10000.) + \ 100. * resid_wl[this_msk] / 10000., color=(rr, gg, bb), \ linewidth=2.5, s=16.) ax0.set_ylabel(r'Wavelength [$\mu$m]') # Plot the residuals ax1.scatter(this_pix[~this_msk], (resid_wl[~this_msk] / dwl), marker='x', color='black', \ linewidth=2.5, s=16.) ax1.scatter(this_pix[this_msk], (resid_wl[this_msk] / dwl), color=(rr, gg, bb), \ linewidth=2.5, s=16.) ax1.axhline(y=0., color=(rr, gg, bb), linestyle=':', linewidth=2.5) ax1.get_yaxis().set_label_coords(-0.15, 0.5) rms_order = np.std(resid_wl[this_msk]) ax1.set_ylabel(r'Res. [pix]') ax0.text(0.1, 0.8, r'RMS={0:.3f} Pixel'.format(rms_order / np.abs(dwl)), ha="left", va="top", transform=ax0.transAxes) ax0.text(0.1, 0.9, r'$\Delta\lambda$={0:.3f} $\AA$/Pixel'.format(np.abs(dwl)), ha="left", va="top", transform=ax0.transAxes) ax0.get_yaxis().set_label_coords(-0.15, 0.5) fig.add_subplot(ax0) fig.add_subplot(ax1) rms_global = np.std(resid_wl_global) # Labeling fig.text(0.5, 0.04, r'Row [pixel]', ha='center', size='large') fig.suptitle( r'Arc 2D FIT, norder_coeff={:d}, nspec_coeff={:d}, RMS={:5.3f} Ang*Order#, residuals $\times$100'.format( norder_coeff, nspec_coeff, rms_global)) # Finish if outfile is not None: plt.savefig(outfile, dpi=200) plt.close() else: plt.show()
[docs]def resize_mask2arc(shape_arc, slitmask_orig): """ Rebin the input slitmask to a new shape. Generally used for cases where the arc image has a different binning than the science image. Parameters ---------- shape_arc : tuple Shape of the arc slitmask_orig : `numpy.ndarray`_ of floats original slitmask Returns ------- slitmask : `numpy.ndarray`_ of floats Slitmask with shape corresponding to that of the arc """ (nspec, nspat) = shape_arc # Is our arc a different size than the other calibs? If yes, slit_left/slit_righ, slitpix, and inmask will # be a different size (nspec_orig,nspat_orig) = slitmask_orig.shape if nspec_orig != nspec: if ((nspec_orig > nspec) & (nspec_orig % nspec != 0)) | ((nspec > nspec_orig) & (nspec % nspec_orig != 0)): msgs.error('Problem with images sizes. arcimg size and calibration size need to be integer multiples of each other') else: msgs.info('Calibration images have different binning than the arcimg. Resizing calibs for arc spectrum extraction.') slitmask = utils.rebin_slice(slitmask_orig, (nspec, nspat)) else: slitmask = slitmask_orig return slitmask
[docs]def resize_slits2arc(shape_arc, shape_orig, trace_orig): """ Resizes the trace (spat positions from where the arc was extracted) created with some original binning to be relevant to an arc with a different binning. Args: shape_arc (tuple): shape of the arc shape_orig (tuple): original shape of the images used to create the trace trace_orig (`numpy.ndarray`_ of floats): trace that you want to resize Returns: `numpy.ndarray`_: trace corresponding to the binning of the arc """ (nspec, nspat) = shape_arc # Is our arc a different size than the other calibs? If yes, slit_left/slit_righ, slitpix, and inmask will # be a different size (nspec_orig,nspat_orig) = shape_orig if nspec_orig != nspec: msgs.info('Calibration images have different binning than the arcimg. Resizing calibs for arc spectrum extraction.') spec_vec_orig = np.arange(nspec_orig)/float(nspec_orig - 1) spec_vec = np.arange(nspec)/float(nspec - 1) spat_ratio = float(nspat)/float(nspat_orig) trace = (scipy.interpolate.interp1d(spec_vec_orig, spat_ratio*trace_orig, axis=0, bounds_error=False,fill_value='extrapolate'))(spec_vec) else: trace = trace_orig return trace
[docs]def resize_spec(spec_from, nspec_to): """ Resize the input spectrum (usually an arc spectrum) to a new size using linear interpolation `scipy.interpolate.interp1d`_ Args: spec_from (`numpy.ndarray`_): one or more spectra to resize via interpolation. Shape must be (nspec, nslits) or (nspec,). nspec_to (int): size of spectrum you wish to resize to. Returns: `numpy.ndarray`_: New spectra or spectrum with shape (nspec_to, nslits) or (nspec_to,). """ nspec_from = spec_from.shape[0] # Is our arc a different size than the other calibs? If yes, slit_left/slit_righ, slitpix, and inmask will # be a different size if nspec_from != nspec_to: spec_vec_from = np.arange(nspec_from)/float(nspec_from - 1) spec_vec_to = np.arange(nspec_to)/float(nspec_to - 1) spec_to = (scipy.interpolate.interp1d(spec_vec_from, spec_from, axis=0, bounds_error=False,fill_value='extrapolate'))(spec_vec_to) else: spec_to = spec_from return spec_to
[docs]def get_censpec(slit_cen, slitmask, arcimg, gpm=None, box_rad=3.0, nonlinear_counts=1e10, slit_bpm=None, slitIDs=None, verbose=True): """ Extract a boxcar spectrum with radius `box_rad` (pixels) from the input image using the input trace. By default, outliers within the box are clipped with 3.0 sigma rejection using `astropy.stats.sigma_clipped_stats`_. Parameters ---------- slit_cen : `numpy.ndarray`_ Trace down the center of the slit. Shape (nspec, nslits) slitmask : `numpy.ndarray`_ Image where pixel values identify its parent slit, starting with 0. Pixels with -1 are not part of any slit. Shape must match `arcimg`. arcimg : `numpy.ndarray`_ Image to extract the arc from. This should be an arcimage or perhaps a frame with night sky lines. Shape (nspec, nspat) gpm : `numpy.ndarray`_, optional Input mask image with same shape as arcimg. Convention True = good and False = bad. If None, all pixels are considered good. Shape must match arcimg. box_rad : :obj:`float`, optional Half-width of the boxcar (floating-point pixels) in the spatial direction used to extract the arc. nonlinear_counts : :obj:`float`, optional Values exceeding this input value are masked as bad. slitIDs : :obj:`list`, optional A list of the slit IDs to extract (if None, all slits will be extracted) slit_bpm: `numpy.ndarray`_, bool, optional Bad pixel mask for the slits. True = bad. Shape must be (nslits,). Arc spectra are filled with np.nan for masked slits. verbose : :obj:`bool`, optional Print out verbose information? Returns ------- arc_spec : `numpy.ndarray`_ Array containing the extracted arc spectrum for each slit. Shape is (nspec, nslits) arc_spec_bpm : `numpy.ndarray`_ Bad-pixel mask for the spectra. Shape is (nspec, nslits). bpm_mask : `numpy.ndarray`_ Bad-slit mask, True means the entire spectrum is bad. Shape is (nslits,). """ # Initialize the good pixel mask _gpm = slitmask > -1 if gpm is None else gpm & (slitmask > -1) # Mask saturated parts of the arc image for the extraction _gpm = _gpm & (arcimg < nonlinear_counts) # Inialize output arc_spec = np.zeros_like(slit_cen) # Iterate over slits nslits = slit_cen.shape[1] nspat = arcimg.shape[1] spat = np.arange(nspat) for islit in range(nslits): # Check if this slit is to be extracted if slitIDs is not None and islit not in slitIDs: continue # Check if this slit is masked if slit_bpm is not None and slit_bpm[islit]: msgs.info('Ignoring masked slit {}'.format(islit+1)) # TODO -- Avoid using NaNs arc_spec[:,islit] = np.nan continue if verbose: msgs.info(f'Extracting approximate arc spectrum of slit {islit+1}/{nslits}') # Create a mask for the pixels that will contribue to the arc arcmask = _gpm & (np.absolute(spat[None,:] - slit_cen[:,islit,None]) < box_rad) # Trimming the image makes this much faster indx = np.nonzero(np.any(arcmask, axis=0))[0] if len(indx) == 0: # Must have been a masked slit arc_spec[:,islit] = np.nan continue left, right = np.clip([indx[0]-4, indx[-1]+5], 0, nspat) # TODO JFH Add cenfunc and std_func here, using median and the use_mad fix. arc_spec[:,islit] = stats.sigma_clipped_stats(arcimg[:,left:right], mask=np.invert(arcmask[:,left:right]), sigma=3.0, axis=1)[1] # Get the mask, set the masked values to 0, and return arc_spec_bpm = np.isnan(arc_spec) arc_spec[arc_spec_bpm] = 0.0 return arc_spec, arc_spec_bpm, np.all(arc_spec_bpm, axis=0)
[docs]def detect_peaks(x, mph=None, mpd=1, threshold=0, edge='rising', kpsh=False, valley=False, show=False, ax=None): """Detect peaks in data based on their amplitude and other features. This is generally, but not exclusively, used for arc line detecting. This code was taken from https://github.com/demotu/BMC Parameters ---------- x : array-like 1D vector with data mph : {None, number}, optional (default = None) detect peaks that are greater than minimum peak height (if parameter `valley` is False) or peaks that are smaller than maximum peak height (if parameter `valley` is True). mpd : positive integer, optional (default = 1) detect peaks that are at least separated by minimum peak distance (in number of data). threshold : positive number, optional (default = 0) detect peaks (valleys) that are greater (smaller) than `threshold` in relation to their immediate neighbors. edge : {None, 'rising', 'falling', 'both'}, optional (default = 'rising') for a flat peak, keep only the rising edge ('rising'), only the falling edge ('falling'), both edges ('both'), or don't detect a flat peak (None). kpsh : bool, optional (default = False) keep peaks with same height even if they are closer than `mpd`. valley : bool, optional (default = False) if True (1), detect valleys (local minima) instead of peaks. show : bool, optional (default = False) if True (1), plot data in matplotlib figure. ax : `matplotlib.axes.Axes`_, optional `matplotlib.axes.Axes`_ instance to use when plotting. If None and ``show`` is True, a new instance is constructed. Returns ------- ind : array-like 1D vector with element indices containing the peaks in `x` Notes ----- The detection of valleys instead of peaks is performed internally by simply negating the data:: ind_valleys = detect_peaks(-x) The function can handle NaN's See this IPython Notebook [1]_. .. code-block:: python __author__ = "Marcos Duarte, https://github.com/demotu/BMC" __version__ = "1.0.5" __license__ = "MIT" Version history: * '1.0.5': The sign of `mph` is inverted if parameter `valley` is True References ---------- .. [1] http://nbviewer.ipython.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb Examples -------- >>> from pypeit.core.arc import detect_peaks >>> x = np.random.randn(100) >>> x[60:81] = np.nan >>> # detect all peaks and plot data >>> ind = detect_peaks(x, show=True) >>> print(ind) >>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5 >>> # set minimum peak height = 0 and minimum peak distance = 20 >>> detect_peaks(x, mph=0, mpd=20, show=True) >>> x = [0, 1, 0, 2, 0, 3, 0, 2, 0, 1, 0] >>> # set minimum peak distance = 2 >>> detect_peaks(x, mpd=2, show=True) >>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5 >>> # detection of valleys instead of peaks >>> detect_peaks(x, mph=-1.2, mpd=20, valley=True, show=True) >>> x = [0, 1, 1, 0, 1, 1, 0] >>> # detect both edges >>> detect_peaks(x, edge='both', show=True) >>> x = [-2, 1, -2, 2, 1, 1, 3, 0] >>> # set threshold = 2 >>> detect_peaks(x, threshold = 2, show=True) """ x = np.atleast_1d(x).astype('float64') if x.size < 3: return np.array([], dtype=int) if valley: x = -x if mph is not None: mph = -mph # find indices of all peaks dx = x[1:] - x[:-1] # handle NaN's indnan = np.where(np.isnan(x))[0] if indnan.size: x[indnan] = np.inf dx[np.where(np.isnan(dx))[0]] = np.inf ine, ire, ife = np.array([[], [], []], dtype=int) if not edge: ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] else: if edge.lower() in ['rising', 'both']: ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0] if edge.lower() in ['falling', 'both']: ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0] ind = np.unique(np.hstack((ine, ire, ife))) # handle NaN's if ind.size and indnan.size: # NaN's and values close to NaN's cannot be peaks ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)] # first and last values of x cannot be peaks if ind.size and ind[0] == 0: ind = ind[1:] if ind.size and ind[-1] == x.size - 1: ind = ind[:-1] # remove peaks < minimum peak height if ind.size and mph is not None: ind = ind[x[ind] >= mph] # remove peaks - neighbors < threshold if ind.size and threshold > 0: dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0) ind = np.delete(ind, np.where(dx < threshold)[0]) # detect small peaks closer than minimum peak distance if ind.size and mpd > 1: ind = ind[np.argsort(x[ind])][::-1] # sort ind by peak height idel = np.zeros(ind.size, dtype=bool) for i in range(ind.size): if not idel[i]: # keep peaks with the same height if kpsh is True idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \ & (x[ind[i]] > x[ind] if kpsh else True) idel[i] = 0 # Keep current peak # remove the small peaks and sort back the indices by their occurrence ind = np.sort(ind[~idel]) if show: if indnan.size: x[indnan] = np.nan if valley: x = -x if mph is not None: mph = -mph plot_detect_peaks(x, mph, mpd, threshold, edge, valley, ax, ind) return ind
[docs]def plot_detect_peaks(x, mph, mpd, threshold, edge, valley, ax, ind): """Plot results of the :class:`pypeit.core.arc.detect_peaks` function, see its help for a description of the variables. Only used for debugging """ if ax is None: _, ax = plt.subplots(1, 1, figsize=(8, 4)) ax.plot(x, 'b', lw=1) if ind.size: label = 'valley' if valley else 'peak' label = label + 's' if ind.size > 1 else label ax.plot(ind, x[ind], '+', mfc=None, mec='r', mew=2, ms=8, label='%d %s' % (ind.size, label)) ax.legend(loc='best', framealpha=.5, numpoints=1) ax.set_xlim(-.02 * x.size, x.size * 1.02 - 1) ymin, ymax = x[np.isfinite(x)].min(), x[np.isfinite(x)].max() yrange = ymax - ymin if ymax > ymin else 1 ax.set_ylim(ymin - 0.1 * yrange, ymax + 0.1 * yrange) ax.set_xlabel('Data #', fontsize=14) ax.set_ylabel('Amplitude', fontsize=14) mode = 'Valley detection' if valley else 'Peak detection' ax.set_title("%s (mph=%s, mpd=%f, threshold=%s, edge='%s')" % (mode, str(mph), mpd, str(threshold), edge)) # plt.grid() plt.show()
[docs]def iter_continuum(spec, gpm=None, fwhm=4.0, sigthresh = 2.0, sigrej=3.0, niter_cont = 3, cont_samp = 30, cont_frac_fwhm=1.0, cont_mask_neg=False, qa_title='', npoly=None, debug_peak_find=False, debug=False): """ Routine to determine the continuum and continuum pixels in spectra with peaks. The general procedure is to: - Detect positive "peaks" using :class:`pypeit.core.arc.detect_peaks` - Optionally, detect negative "peaks" using :class:`pypeit.core.arc.detect_peaks` - Mask these peaks - Generate a running median with sampling width set by `cont_samp` - Fit this median continuum with a polynomial of order `npoly` (if set) - Evaluate the fit (`npoly` is set) or interpolate the median at all locations Note: This was developed for arc line spectra and may not function well in other contexts. Parameters ---------- spec : `numpy.ndarray`_ of floats 1D spectrum with shape (nspec,) for which the continuum is to be characterized gpm : `numpy.ndarray`_, bool A mask with shape (nspec,) indicating which pixels are good. True = Good, False=Bad niter_cont : int, default = 3, optional Number of iterations of peak finding, masking, and continuum fitting used to define the continuum. npoly : int, default = None, optional If set the code will perform a polynomimal fit to the interpolate a running median filter of the continuum points instead of the default behavior which is to just return the interpolated running median filter sigthresh : float, default = 2.0, optional Signifiance threshold for peak finding sigrej : float, default = 3.0, optional Sigma clipping rejection threshold for threshold determination fwhm : float, default = 4.0, optional Number of pixels per fwhm resolution element. cont_samp: float, default = 30.0, optional The number of samples across the spectrum used for continuum subtraction. Continuum subtraction is done via median filtering, with a width of ngood/cont_samp, where ngood is the number of good pixels for estimating the continuum (i.e. that don't have peaks). cont_frac_fwhm : float, default = 1.0, optional Width used for masking peaks in the spectrum when the continuum is being defined. Expressed as a fraction of the fwhm parameter cont_mask_neg: bool, default = False, optional If True, the code will also search for negative peaks when iteratively determining the continuum. This option is used for object finding in the near-IR where there will also be negative peaks. cont_samp: float, default = 30.0, optional The number of samples across the spectrum used for continuum subtraction. Continuum subtraction is done via median filtering, with a width of ngood/cont_samp, where ngood is the number of good pixels for estimating the continuum debug: bool, default = False, optional Show plots for debugging Returns ------- cont: `numpy.ndarray`_ of floats The continuum determined with shape (nspec,) cont_mask: `numpy.ndarray`_ of bool A mask indicating which pixels were used for continuum determination with shape (nspec,) """ if gpm is None: gpm = np.ones(spec.size,dtype=bool) cont_mask = np.copy(gpm) else: cont_mask = np.copy(gpm) nspec = spec.size spec_vec = np.arange(nspec) cont_now = np.zeros(nspec) mask_sm = np.round(cont_frac_fwhm*fwhm).astype(int) mask_odd = mask_sm + 1 if mask_sm % 2 == 0 else mask_sm nspec_available = np.sum(gpm) max_mask_frac = 0.70 max_nmask = int(np.ceil((max_mask_frac)*nspec_available)) for iter in range(niter_cont): spec_sub = spec - cont_now mask_sigclip = np.invert(cont_mask & gpm) (mean, med, stddev) = stats.sigma_clipped_stats(spec_sub, mask=mask_sigclip, sigma_lower=sigrej, sigma_upper=sigrej, cenfunc='median', stdfunc=utils.nan_mad_std) # be very liberal in determining threshold for continuum determination thresh = med + sigthresh*stddev pixt_now = detect_peaks(spec_sub, mph=thresh, mpd=fwhm*0.75, show=debug_peak_find) # mask out the peaks we find for the next continuum iteration cont_mask_fine = np.ones_like(cont_now) cont_mask_fine[pixt_now] = 0.0 if cont_mask_neg is True: pixt_now_neg = detect_peaks(-spec_sub, mph=thresh, mpd=fwhm * 0.75, show=debug_peak_find) cont_mask_fine[pixt_now_neg] = 0.0 # cont_mask is the mask for defining the continuum regions: True is good, False is bad peak_mask = (utils.smooth(cont_mask_fine,mask_odd) > 0.999) cont_mask = peak_mask & gpm # If more than max_mask_frac of the nspec_available are getting masked than short circuit this masking #frac_mask = np.sum(np.invert(cont_mask))/float(nspec) nmask = np.sum(np.invert(peak_mask[gpm])) if nmask > max_nmask: msgs.warn('Too many pixels {:d} masked in spectrum continuum definiton: frac_mask = {:5.3f} > {:5.3f} which is ' 'max allowed. Only masking the {:d} largest values....'.format(nmask, nmask/nspec_available, max_mask_frac, max_nmask)) # Old #cont_mask = np.ones_like(cont_mask) & gpm # Short circuit the masking and just mask the 0.70 most offending pixels peak_mask_ind = np.where(np.logical_not(peak_mask) & gpm)[0] isort = np.argsort(np.abs(spec[peak_mask_ind]))[::-1] peak_mask_new = np.ones_like(peak_mask) peak_mask_new[peak_mask_ind[isort[0:max_nmask]]] = False cont_mask = peak_mask_new & gpm ngood = np.sum(cont_mask) if ngood == 0: msgs.warn("All pixels rejected for continuum. Returning a 0 array") return np.zeros_like(spec), cont_mask samp_width = np.ceil(ngood/cont_samp).astype(int) cont_med = utils.fast_running_median(spec[cont_mask], samp_width) if npoly is not None: # ToDO robust_poly_fit needs to return minv and maxv as outputs for the fits to be usable downstream pypeitFit = fitting.robust_fit(spec_vec[cont_mask].astype(float), cont_med, npoly, function='polynomial', maxiter=25, upper=3.0, lower=3.0, minx=0.0, maxx=float(nspec-1)) cont_now = pypeitFit.eval(spec_vec.astype(float)) else: cont_now = np.interp(spec_vec,spec_vec[cont_mask],cont_med) if debug & (iter == (niter_cont-1)): plt.plot(spec_vec, spec,'k', label='Spectrum') #plt.plot(spec_vec, spec*cont_mask,'k', label='Spectrum*cont_mask') plt.plot(spec_vec, cont_now,'g',label='continuum') plt.plot(spec_vec, spec - cont_now,'b',label='spec-cont') plt.plot(spec_vec[cont_mask], spec[cont_mask], color='cyan', markersize=3.0, mfc='cyan', linestyle='None', fillstyle='full', zorder=9, marker='o', label = 'Used for cont') plt.plot(spec_vec[np.invert(cont_mask)], spec[np.invert(cont_mask)], color='red', markersize=5.0, mfc='red', linestyle='None', fillstyle='full', zorder=9, marker='o', label = 'masked for cont') plt.title(qa_title) plt.legend() plt.show() return cont_now, cont_mask
[docs]def detect_lines(censpec, sigdetect=5.0, fwhm=4.0, fit_frac_fwhm=1.25, input_thresh=None, cont_subtract=True, cont_frac_fwhm=1.0, max_frac_fwhm=3.0, min_pkdist_frac_fwhm=0.75, cont_samp=30, nonlinear_counts=1e10, niter_cont=3, nfind=None, bpm=None, verbose=False, debug=False, debug_peak_find=False, debug_cont=False): """ Identify peaks in an input arc spectrum that satisfy a series of criteria: - Sufficient signal (set by sigdetect) - Peak amplitude < nonlinear_counts - Peak amplitude > input_thresh (optional) - Measured FWHM < fwhm * max_frac_fwhm By default, the input spectrum has a continuum fitted to it and then subtracted prior to peak finding. Parameters ---------- censpec : `numpy.ndarray`_ A 1D spectrum to be searched for significant detections, shape = (nspec,) sigdetect : float, default=5., optional Sigma threshold above fluctuations for arc-line detection. Arcs are continuum subtracted and the fluctuations are computed after continuum subtraction. input_thresh : float, str, optional Optionally the user can specify the threhsold that peaks must be above to be kept. In this case the sigdetect parameter will be ignored. This is most useful for example for cases where cont_subtract =False, and the user prefers to determine the significance threhsold outside of this routine, rather than using this routines defaults to determine the continuum level and standard deviation of the continuum subtracted spetrum. If a string input of 'None' is set then the code will simply return all peaks irrespective of any threshold. This is equivalent to setting the mph parameter to None in the detect_peaks code. fwhm : float, default = 4.0, optional Number of pixels per fwhm resolution element. fit_frac_fwhm: float, default 1.25, optional Number of pixels that are used in the fits for Gaussian arc line centroiding expressed as a fraction of the fwhm parameter max_frac_fwhm: float, default = 3.0, optional maximum width allowed for usable arc lines expressed relative to the fwhm. min_pkdist_frac_fwhm: float, default = 0.75, optional minimum allowed separation between peaks expressed relative to the fwhm. cont_frac_fwhm : float, default = 1.0, optional width used for masking peaks in the spectrum when the continuum is being defined. Expressed as a fraction of the fwhm parameter cont_subtract: bool, default = True, optional If true, the code will continuum subtract the input array by iteratively determining the continuum cont_samp: float, default = 30.0, optional The number of samples across the spectrum used for continuum subtraction. Continuum subtraction is done via median filtering, with a width of ngood/cont_samp, where ngood is the number of good pixels for estimating the continuum (i.e. that don't have peaks). niter_cont: int, default = 3, optional Number of iterations of peak finding, masking, and continuum fitting used to define the continuum. nonlinear_counts: float, default = 1e10, optional Value above which to mask saturated arc lines. This should be nonlinear_counts= nonlinear*saturation according to pypeit parsets. Default is 1e10 which is to not mask. nfind: int, default = None, optional Return only the nfind highest significance lines. The default is None, which means the code will return all the lines above the significance threshold. bpm: `numpy.ndarray`_, optional Bad-pixel mask for input spectrum. If None, all pixels considered good. If passed in shape must match that of censpec verbose: bool, default = False Output more stuff to the screen. debug: bool, default = False Make plots showing results of peak finding and final arc lines that are used. debug_cont: bool, default = False Make plots showing results of continuum fitting Returns ------- tampl : `numpy.ndarray`_ The amplitudes of the line detections in the true arc tampl_cont : `numpy.ndarray`_ The amplitudes of the line detections in the continuum subtracted arc tcent : `numpy.ndarray`_ The centroids of the line detections twid : `numpy.ndarray`_ The 1sigma Gaussian widths of the line detections center : `numpy.ndarray`_ The variance on tcent w : `numpy.ndarray`_ An index array indicating which detections are the most reliable. arc : `numpy.ndarray`_ The continuum sutracted arc used to find detections. nsig : `numpy.ndarray`_ The significance of each line detected relative to the 1sigma variation in the continuum subtracted arc in the line free region. Bad lines are assigned a significance of -1, since they don't have an amplitude fit """ # Detect the location of the arc lines if verbose: msgs.info("Detecting lines...isolating the strongest, nonsaturated lines") # TODO: Why is this here? Can't the calling function be required to # pass a single spectrum? This is not reflected in the docstring. # OLD CODE #if len(censpec.shape) == 3: # detns = censpec[:, 0].flatten() #else: # detns = censpec.copy() #detns = detns.astype(np.float) # Need to use bpm since when arc spectrum is padded, the padding can make the thresh low and also will screw up # continuum fitting bpm_out = censpec == 0.0 if bpm is None else bpm xrng = np.arange(censpec.size, dtype=float) if cont_subtract: cont_now, cont_mask = iter_continuum(censpec, gpm=np.logical_not(bpm_out), fwhm=fwhm, niter_cont=niter_cont, cont_samp=cont_samp, cont_frac_fwhm=cont_frac_fwhm, debug=debug_cont) else: cont_mask = np.ones(censpec.size, dtype=bool) cont_now = np.zeros_like(censpec) arc = (censpec - cont_now)*np.logical_not(bpm_out) if input_thresh is None: (mean, med, stddev) = stats.sigma_clipped_stats(arc[cont_mask & np.logical_not(bpm_out)], sigma_lower=3.0, sigma_upper=3.0) thresh = med + sigdetect*stddev else: med = 0.0 if isinstance(input_thresh,(float, int)): thresh = input_thresh elif isinstance(input_thresh, str): if input_thresh == 'None': thresh = None else: msgs.error('Unrecognized value for thresh') stddev = 1.0 # Find the peak locations pixt = detect_peaks(arc, mph=thresh, mpd=fwhm*min_pkdist_frac_fwhm, show=debug_peak_find) # Peak up the centers and determine the widths using a Gaussian fit nfitpix = np.round(fit_frac_fwhm*fwhm).astype(int) fwhm_max = max_frac_fwhm*fwhm tampl_fit, tcent, twid, centerr = fit_arcspec(xrng, arc, pixt, nfitpix) # Set the amplitudes using the spectra directly for both the input # and continuum-subtracted spectrum. # TODO: Why does this interpolate to pixt and not tcent? tampl_true = np.interp(pixt, xrng, censpec) tampl = np.interp(pixt, xrng, arc) # Find the lines that meet the following criteria: # - Amplitude is in the linear regime of the detector response # - Center is within the limits of the spectrum # - The Gaussian-fitted center and the center from `detect_lines` # are not different by more than 0.75*FWHM # - Width is finite, greater than 0, and less than FWHM_MAX/2.35 good = np.invert(np.isnan(twid)) & (twid > 0.0) & (twid < fwhm_max/2.35) & (tcent > 0.0) \ & (tcent < xrng[-1]) & (tampl_true < nonlinear_counts) \ & (np.abs(tcent-pixt) < fwhm*0.75) # Get the indices of the good measurements ww = np.where(good)[0] # Compute the significance of each line, set the significance of bad lines to be -1 nsig = (tampl - med)/stddev # If the user requested the nfind most significant peaks have been # requested, then grab and return only these lines if nfind is not None: if nfind > len(nsig): msgs.warn('Requested {0} peaks but only found {1}. '.format(nfind, len(tampl)) + ' Returning all the peaks found.') else: ikeep = (nsig.argsort()[::-1])[0:nfind] tampl_true = tampl_true[ikeep] tampl = tampl[ikeep] tcent = tcent[ikeep] twid = twid[ikeep] centerr = centerr[ikeep] ww = np.where(good[ikeep])[0] nsig = nsig[ikeep] good = good[ikeep] if debug: # NOTE: Uses pixt because apparently tcent can be set to -1 in fit_arcspec # TODO: Replace any values of tcent that are -1 with the value of pixt? find_lines_qa(arc, pixt, tampl, good, bpm=bpm, thresh=thresh, nonlinear=nonlinear_counts) # TODO: Change this to return `good` instead of `ww` return tampl_true, tampl, tcent, twid, centerr, ww, arc, nsig
[docs]def fit_arcspec(xarray, yarray, pixt, fitp): """ Fit a series of pre-identified arc spectrum lines. The implementation is a simple 3-parameter Gaussian (amplitude, centroid, width) Parameters ---------- xarray: `numpy.ndarray`_ x-values of the input spectrum. Either pixels or normalized pixel space (0-1) yarray: `numpy.ndarray`_ Arc spectrum in counts pixt: `numpy.ndarray`_ Initial guess for the center of the lines fitp: int Number of pixels to fit with Returns ------- ampl : `numpy.ndarray`_ amplitudes of the fitted lines cent : `numpy.ndarray`_ centroids of the fitted lines -999. are bad fits widt : `numpy.ndarray`_ widths (sigma) of the fitted lines centerr : `numpy.ndarray`_ error in the centroids of the fitted lines """ fitp_even = fitp if fitp % 2 == 0 else fitp + 1 fit_interval = fitp_even//2 # Setup the arrays with fit parameters sz_p = pixt.size sz_a = yarray.size b = np.full(sz_p, -999.0, dtype=float) ampl = np.full(sz_p, -999.0, dtype=float) cent = np.full(sz_p, -999.0, dtype=float) widt = np.full(sz_p, -999.0, dtype=float) centerr =np.full(sz_p, -999.0, dtype=float) for p in range(sz_p): # This interval is always symmetric about the peak pmin = pixt[p] - fit_interval pmax = pixt[p] + fit_interval + 1 if pmin < 0: pmin = 0 if pmax > sz_a: pmax = sz_a if pmin == pmax: continue if (pmax - pmin) < (fit_interval): continue # Probably won't be a good solution # JFH Removed below # if pixt[p]-pmin <= 1 or pmax-pixt[p] <= 1: # continue # Probably won't be a good solution # Fit the gaussian try: fitc, fitcov = fitting.fit_gauss(xarray[pmin:pmax], yarray[pmin:pmax]) ampl[p], cent[p], widt[p] = fitc centerr[p] = fitcov[1, 1] except RuntimeError: pass return ampl, cent, widt, centerr
[docs]def find_lines_qa(spec, cen, amp, good, bpm=None, thresh=None, nonlinear=None): """ Show a QA plot for the line detection. Only used for debugging Detected lines are marked and then color coded by a variety of criteria. Args: spec (`numpy.ndarray`_): Spectrum used to detect lines cen (`numpy.ndarray`_): Identified line peaks amp (`numpy.ndarray`_): Amplitude of the identified lines. good (`numpy.ndarray`_): Boolean array selecting the good line detections. bpm (`numpy.ndarray`_, optional): The bad-pixel mask for the spectrum. If None, all pixels are assumed to be valid. thresh (:obj:`float`, optional): Threshold value for line detection nonlinear (:obj:`float`, optional): Threshold for nonlinear detector response. """ # TODO: Could just pull `amp` directly from the spectrum # If bpm is provided, the masked pixels are *not* shown _spec = np.ma.MaskedArray(spec, mask=np.zeros(spec.size, dtype=bool) if bpm is None else bpm) pix = np.arange(_spec.size) plt.figure(figsize=(14, 6)) plt.step(pix, _spec, color='k', where='mid', label='arc', lw=1.0) plt.scatter(cen[np.invert(good)], amp[np.invert(good)], marker='+', color='C3', s=50, label='bad for tilts') plt.scatter(cen[good], amp[good], color='C2', marker='+', s=50, label='good for tilts') if thresh is not None: plt.axhline(thresh, color='cornflowerblue', linestyle=':', linewidth=2.0, label='threshold', zorder=10) if nonlinear is not None and nonlinear < 1e9: plt.axhline(nonlinear, color='orange', linestyle='--', linewidth=2.0, label='nonlinear', zorder=10) ngood = np.sum(good) plt.title('Good Lines = {0}, Bad Lines = {1}'.format(ngood, len(good)-ngood)) plt.ylim(np.amin(spec), 1.5 * np.amax(spec)) plt.legend() plt.show()