""" Module for basic utilties with holy grail
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import numpy as np
import os
from matplotlib import pyplot as plt
import scipy
from astropy.table import Table
from astropy import convolution
from astropy import constants
from pypeit import msgs
from pypeit import cache
from pypeit import utils
from pypeit.core import arc
from pypeit.pypmsgs import PypeItError
from IPython import embed
[docs]
def parse_param(par, key, slit):
# Find good lines for the tilts
param_in = par[key]
if isinstance(param_in, (float, int)):
param = param_in
elif isinstance(param_in, (list, np.ndarray)):
param = param_in[slit]
else:
raise ValueError('Invalid input for parameter {:s}'.format(key))
return param
# TODO: Should this code allow the user to skip the smoothing steps and just
# provide the raw delta_wave vector? I would think there are cases where you
# want the *exact* pixel width, as opposed to the smoothed version.
[docs]
def get_delta_wave(wave, wave_gpm, frac_spec_med_filter=0.03):
r"""
Compute the change in wavelength per pixel.
Given an input wavelength vector and an input good pixel mask, the *raw*
change in wavelength is defined to be ``delta_wave[i] =
wave[i+1]-wave[i]``, with ``delta_wave[-1] = delta_wave[-2]`` to force
``wave`` and ``delta_wave`` to have the same length.
The method imposes a smoothness on the change in wavelength by (1)
running a median filter over the raw values and (2) smoothing
``delta_wave`` with a Gaussian kernel. The boxsize for the median filter
is set by ``frac_spec_med_filter``, and the :math:`\sigma` for the
Gaussian kernel is either a 10th of that boxsize or 3 pixels (whichever
is larger).
Parameters
----------
wave : float `numpy.ndarray`_, shape = (nspec,)
Array of input wavelengths. Must be 1D.
wave_gpm : bool `numpy.ndarray`_, shape = (nspec)
Boolean good-pixel mask defining where the ``wave`` values are
good.
frac_spec_med_filter : :obj:`float`, optional
Fraction of the length of the wavelength vector to use to median
filter the raw change in wavelength, used to impose a smoothness.
Default is 0.03, which means the boxsize for the running median
filter will be approximately ``0.03*nspec`` (forced to be an odd
number of pixels).
Returns
-------
delta_wave : `numpy.ndarray`_, float, shape = (nspec,)
A smooth estimate for the change in wavelength for each pixel in the
input wavelength vector.
"""
# Check input
if wave.ndim != 1:
msgs.error('Input wavelength array must be 1D.')
nspec = wave.size
# This needs to be an odd number
nspec_med_filter = 2*int(np.round(nspec*frac_spec_med_filter/2.0)) + 1
delta_wave = np.zeros_like(wave)
wave_diff = np.diff(wave[wave_gpm])
wave_diff = np.append(wave_diff, wave_diff[-1])
wave_diff_filt = utils.fast_running_median(wave_diff, nspec_med_filter)
# Smooth with a Gaussian kernel
sig_res = np.fmax(nspec_med_filter/10.0, 3.0)
gauss_kernel = convolution.Gaussian1DKernel(sig_res)
wave_diff_smooth = convolution.convolve(wave_diff_filt, gauss_kernel, boundary='extend')
delta_wave[wave_gpm] = wave_diff_smooth
return delta_wave
[docs]
def get_sampling(waves, pix_per_R=3.0):
"""
Computes the median wavelength sampling of wavelength vector(s)
Args:
waves (list, `numpy.ndarray`_):
List of `numpy.ndarray`_ wavelength arrays or a single 1d
'numpy.ndarray'_
pix_per_R (float):
Number of pixels per resolution element used for the
resolution guess. The default of 3.0 assumes roughly Nyquist
sampling.
Returns:
tuple: Returns dlam, dloglam, resln_guess, pix_per_sigma.
Computes the median wavelength sampling of the wavelength
vector(s)
"""
if isinstance(waves, np.ndarray):
if waves.ndim == 1:
waves_out = [waves]
elif waves.ndim == 2:
waves_out = utils.array_to_explist(waves)
else:
msgs.error('Array inputs can only be 1D or 2D')
elif isinstance(waves, list):
ndim = np.array([wave.ndim for wave in waves], dtype=int)
if np.any(ndim > 1):
msgs.error('Input list can only contain 1D arrays')
waves_out = waves
else:
msgs.error('Input must be a list or numpy.ndarray')
wave_diff_flat = []
dloglam_flat = []
for wave in waves_out:
wave_good = wave[wave > 1.0]
wave_diff_flat += np.diff(wave_good).tolist()
dloglam_flat += np.diff(np.log10(wave_good)).tolist()
# Compute the median wavelength spacing
dwave = np.median(wave_diff_flat)
dloglam = np.median(dloglam_flat)
# Check that this won't introduce a divide by zero
if dloglam == 0.0:
msgs.error('The wavelength sampling has zero spacing in log wavelength. This is not supported.')
# Compute a guess of the resolution
resln_guess = 1.0 / (pix_per_R* dloglam * np.log(10.0))
pix_per_sigma = 1.0 / resln_guess / (dloglam * np.log(10.0)) / (2.0 * np.sqrt(2.0 * np.log(2)))
return dwave, dloglam, resln_guess, pix_per_sigma
# TODO: the other methods iref should be deprecated or removed
[docs]
def get_wave_grid(waves=None, gpms=None, wave_method='linear', iref=0, wave_grid_min=None,
wave_grid_max=None, dwave=None, dv=None, dloglam=None, wave_grid_input=None,
spec_samp_fact=1.0):
"""
Create a new wavelength grid for spectra to be rebinned and coadded.
Args:
waves (list):
List of the `numpy.ndarray`_ N original 1d wavelength arrays.
Shapes of the input arrays are arbitrary. Required unless
wave_method='user_input' in which case it need not be passed in.
gpms (list):
Good-pixel mask for wavelengths. Same format as waves and shapes of
the individual arrays must match.
wave_method (:obj:`str`, optional):
Desired method for creating new wavelength grid:
- 'iref' -- Use the first wavelength array (default)
- 'velocity' -- Grid is uniform in velocity
- 'log10' -- Grid is uniform in log10(wave). This is the same
as velocity.
- 'linear' -- Constant pixel grid
- 'concatenate' -- Meld the input wavelength arrays
- 'user_input' -- Use a user input wavelength grid.
wave_grid_input must be set for this option.
iref (:obj:`int`, optional):
Index in waves array for reference spectrum
wave_grid_min (:obj:`float`, optional):
min wavelength value for the final grid
wave_grid_max (:obj:`float`, optional):
max wavelength value for the final grid
dwave (:obj:`float`, optional):
Pixel size in same units as input wavelength array (e.g. Angstroms).
Used with the 'linear' method. If not input, the median pixel size
is calculated and used.
dv (:obj:`float`, optional):
Pixel size in km/s for 'velocity' method. If not input, the median
km/s per pixel is calculated and used
dloglam (:obj:`float`, optional):
Pixel size in log10(wave) for the log10 or velocity method.
spec_samp_fact (:obj:`float`, optional):
Make the wavelength grid sampling finer (spec_samp_fact < 1.0) or
coarser (spec_samp_fact > 1.0) by this sampling factor. This
basically multiples the 'native' spectral pixels by
``spec_samp_fact``, i.e. units ``spec_samp_fact`` are pixels.
wave_grid_input (`numpy.ndarray`_, optional):
User input wavelength grid to be used with the 'user_input' wave_method.
Shape is (nspec_input,)
Returns:
:obj:`tuple`: Returns two `numpy.ndarray`_ objects and a float:
- ``wave_grid``: (ndarray, (ngrid +1,)) New wavelength grid, not
masked. This is a set of bin edges (rightmost edge for the last
bin and leftmost edges for the rest), while wave_grid_mid is a set
of bin centers, hence wave_grid has 1 more value than
wave_grid_mid.
- ``wave_grid_mid``: ndarray, (ngrid,) New wavelength grid evaluated
at the centers of the wavelength bins, that is this grid is simply
offset from ``wave_grid`` by ``dsamp/2.0``, in either linear space
or log10 depending on whether linear or (log10 or velocity) was
requested. Last bin center is removed since it falls outside
wave_grid. For iref or concatenate, the linear wavelength
sampling will be calculated.
- ``dsamp``: The pixel sampling for wavelength grid created.
"""
c_kms = constants.c.to('km/s').value
if wave_method == 'user_input':
wave_grid = wave_grid_input
else:
if gpms is None:
gpms = [wave > 1.0 for wave in waves]
if wave_grid_min is None:
wave_grid_min = np.min([wave[gpm].min() for wave, gpm in zip(waves, gpms)])
if wave_grid_max is None:
wave_grid_max = np.max([wave[gpm].max() for wave, gpm in zip(waves, gpms)])
dwave_data, dloglam_data, resln_guess, pix_per_sigma = get_sampling(waves)
if wave_method in ['velocity', 'log10']:
if dv is not None and dloglam is not None:
msgs.error('You can only specify dv or dloglam but not both')
elif dv is not None:
dloglam_pix = dv/c_kms/np.log(10.0)
elif dloglam is not None:
dloglam_pix = dloglam
else:
dloglam_pix = dloglam_data
# Generate wavelength array
wave_grid, wave_grid_mid, dsamp = wavegrid(wave_grid_min, wave_grid_max, dloglam_pix,
spec_samp_fact=spec_samp_fact, log10=True)
elif wave_method == 'linear': # Constant Angstrom
if dwave is not None:
dwave_pix = dwave
else:
dwave_pix = dwave_data
# Generate wavelength array
wave_grid, wave_grid_mid, dsamp = wavegrid(wave_grid_min, wave_grid_max, dwave_pix, spec_samp_fact=spec_samp_fact)
elif wave_method == 'concatenate': # Concatenate
# Setup
loglam = [np.log10(wave) for wave in waves] # This deals with padding (0's) just fine, i.e. they get masked..
nexp = len(waves)
newloglam = loglam[iref] # Deals with mask
# Loop
for j in range(nexp):
if j == iref:
continue
#
iloglam = loglam[j]
dloglam_0 = (newloglam[1]-newloglam[0])
dloglam_n = (newloglam[-1] - newloglam[-2]) # Assumes sorted
if (newloglam[0] - iloglam[0]) > dloglam_0:
kmin = np.argmin(np.abs(iloglam - newloglam[0] - dloglam_0))
newloglam = np.concatenate([iloglam[:kmin], newloglam])
#
if (iloglam[-1] - newloglam[-1]) > dloglam_n:
kmin = np.argmin(np.abs(iloglam - newloglam[-1] - dloglam_n))
newloglam = np.concatenate([newloglam, iloglam[kmin:]])
# Finish
wave_grid = np.power(10.0,newloglam)
elif wave_method == 'iref': # Use the iref index wavelength array
msgs.info(f'iref for the list is set to {iref}')
msgs.info(f'The shape of the list is: {np.shape(waves)}')
msgs.info(f'shape of the first wave_grid in the list is: {np.shape(waves[iref])}')
wave_tmp = waves[iref]
wave_grid = wave_tmp[wave_tmp > 1.0]
if spec_samp_fact != 1: # adjust sampling via internal interpolation
nwave = len(wave_grid)
indx = np.arange(nwave)
indx_new = np.linspace(0,nwave-1,int(nwave/spec_samp_fact))
wave_tmp = np.interp(indx_new,indx,wave_grid)
wave_grid = wave_tmp
else:
msgs.error("Bad method for wavelength grid: {:s}".format(wave_method))
if wave_method in ['iref', 'concatenate', 'user_input']:
wave_grid_diff = np.diff(wave_grid)
wave_grid_diff = np.append(wave_grid_diff, wave_grid_diff[-1])
wave_grid_mid = wave_grid + wave_grid_diff / 2.0
dsamp = np.median(wave_grid_diff)
# removing the last bin since the midpoint now falls outside of wave_grid rightmost bin. This matches
# the convention in wavegrid above
wave_grid_mid = wave_grid_mid[:-1]
return wave_grid, wave_grid_mid, dsamp
[docs]
def arc_lines_from_spec(spec, sigdetect=10.0, fwhm=4.0,
fit_frac_fwhm = 1.25, cont_frac_fwhm=1.0,max_frac_fwhm=2.0,
cont_samp=30, niter_cont=3,nonlinear_counts=1e10, debug=False):
"""
Simple wrapper to arc.detect_lines.
See that code for docs
Args:
spec:
sigdetect:
fwhm:
fit_frac_fwhm:
cont_frac_fwhm:
max_frac_fwhm:
cont_samp:
niter_cont:
nonlinear_counts (float, optional):
Counts where the arc is presumed to go non-linear
debug:
Returns:
tuple: all_tcent, all_ecent, cut_tcent, icut, arc_cont_sub.
See arc.detect_lines for details
"""
# Find peaks
tampl, tampl_cont, tcent, twid, centerr, w, arc_cont_sub, nsig = arc.detect_lines(
spec, sigdetect = sigdetect, fwhm=fwhm, fit_frac_fwhm=fit_frac_fwhm,
cont_frac_fwhm=cont_frac_fwhm, max_frac_fwhm=max_frac_fwhm,
cont_samp=cont_samp,niter_cont = niter_cont, nonlinear_counts = nonlinear_counts,
debug=debug)
all_tcent = tcent[w]
all_ecent = centerr[w]
all_nsig = nsig[w]
# Cut on significance
cut_sig = all_nsig > sigdetect
cut_tcent = all_tcent[cut_sig]
icut = np.where(cut_sig)[0]
# Return
return all_tcent, all_ecent, cut_tcent, icut, arc_cont_sub
[docs]
def shift_and_stretch(spec, shift, stretch, stretch2, stretch_func='quadratic'):
"""
Utility function to shift and stretch a spectrum. This operation is
being implemented in many steps and could be significantly
optimized. But it works for now. Note that the stretch is applied
*first* and then the shift is applied in stretch coordinates.
Parameters
----------
spec : ndarray
spectrum to be shited and stretch
shift: float
shift to be applied
stretch: float
stretch to be applied
stretch2: (:obj:`float`, optional):
second order stretch to be applied. Default = 0.0
stretch_func : str, optional, default = 'quadratic'
Use quadratic ('quadratic') or linear ('linear') stretch.
Returns
-------
spec_out: ndarray
shifted and stretch spectrum. Regions where there is no information are set to zero.
"""
# TODO: DP: This is the old code that was used to shift and stretch.
# Comparing to the new one, I have seen some differences in spec_out.
# I leave this old code here for now, so that we can investigate this a little more in the future.
# nspec = spec.shape[0]
# # pad the spectrum on both sizes
# x1 = np.arange(nspec)/float(nspec-1)
# nspec_stretch = int(nspec*stretch)
# x2 = np.arange(nspec_stretch)/float(nspec_stretch-1)
# spec_str = (scipy.interpolate.interp1d(x1, spec, kind = 'quadratic', bounds_error = False, fill_value = 0.0))(x2)
# # Now create a shifted version
# ind_shift = np.arange(nspec_stretch) - shift
# spec_str_shf = (scipy.interpolate.interp1d(np.arange(nspec_stretch), spec_str, kind = 'quadratic', bounds_error = False, fill_value = 0.0))(ind_shift)
# # Now interpolate onto the original grid
# spec_out = (scipy.interpolate.interp1d(np.arange(nspec_stretch), spec_str_shf, kind = 'quadratic', bounds_error = False, fill_value = 0.0))(np.arange(nspec))
#
# return spec_out
# ###################################
# Positive value of shift means features shift to larger pixel values
nspec = spec.shape[0]
# Can do the stretch and shift in one operation
if stretch_func == 'linear': stretch2 = 0.0
return scipy.interpolate.interp1d(np.arange(nspec)**2*stretch2 + np.arange(nspec)*stretch + shift,
spec, kind=stretch_func, bounds_error=False, fill_value=0.0)(np.arange(nspec))
[docs]
def zerolag_shift_stretch(theta, y1, y2, stretch_func = 'quadratic'):
"""
Utility function which is run by the differential evolution
optimizer in scipy. This is the fucntion we optimize. It is the
zero lag cross-correlation coefficient of spectrum with a shift and
stretch applied.
Parameters
----------
theta : float `numpy.ndarray`_
Function parameters to optmize over. theta[0] = shift, theta[1] = stretch, theta[2] = stretch2
y1 : float `numpy.ndarray`_, shape = (nspec,)
First spectrum which acts as the refrence
y2 : float `numpy.ndarray`_, shape = (nspec,)
Second spectrum which will be transformed by a shift and stretch to match y1
stretch_func : str, optional, default = 'quadratic'
Use quadratic ('quadratic') or linear ('linear') stretch.
Returns
-------
corr_norm : float
Negative of the zero lag cross-correlation coefficient (since we
are miniziming with scipy.optimize). scipy.optimize will thus
determine the shift,stretch that maximize the cross-correlation.
"""
shift, stretch, stretch2 = theta
y2_corr = shift_and_stretch(y2, shift, stretch, stretch2, stretch_func= stretch_func)
# Zero lag correlation
corr_zero = np.sum(y1*y2_corr)
corr_denom = np.sqrt(np.sum(y1*y1)*np.sum(y2_corr*y2_corr))
if corr_denom == 0.0:
msgs.warn('The shifted and stretched spectrum is zero everywhere. Cross-correlation cannot be performed. There is likely a bug somewhere')
raise PypeItError()
corr_norm = corr_zero / corr_denom
return -corr_norm
[docs]
def get_xcorr_arc(inspec1, sigdetect=5.0, sig_ceil=10.0, percent_ceil=50.0, use_raw_arc=False, fwhm = 4.0, debug=False):
"""
Utility routine to create a synthetic arc spectrum for cross-correlation
using the location of the peaks in the input spectrum.
Args:
inspec1 (`numpy.ndarray`_):
Input spectrum, shape = (nspec,)
sigdetect (float, optional, default=3.0):
Peak finding threshold for lines that will be used to create the synthetic xcorr_arc
sig_ceil (float, optional, default = 10.0):
Significance threshold for peaks that will be used to determine the line amplitude clipping threshold.
For peaks with significance > sig_ceil, the code will find the amplitude corresponding to
perecent_ceil, and this will be the clipping threshold.
percent_ceil (float, optional, default=50.0):
Upper percentile threshold for thresholding positive and negative values. If set to None, no thresholding
will be performed.
use_raw_arc (bool, optional):
If True, use amplitudes from the raw arc, i.e. do not continuum subtract. Default = False
fwhm (float, optional):
Fwhm of arc lines. Used for peak finding and to assign a fwhm in the xcorr_arc.
debug (bool, optional):
Show plots for line detection debugging. Default = False
Returns:
`numpy.ndarray`_: Synthetic arc spectrum to be used for
cross-correlations, shape = (nspec,)
"""
# Run line detection to get the locations and amplitudes of the lines
tampl1, tampl1_cont, tcent1, twid1, centerr1, w1, arc1, nsig1 = arc.detect_lines(inspec1, sigdetect=sigdetect,
fwhm=fwhm, debug=debug)
ampl = tampl1 if use_raw_arc else tampl1_cont
if percent_ceil is not None and (ampl.size > 0):
# If this is set, set a ceiling on the greater > 10sigma peaks
ampl_pos = (ampl >= 0.0) & (nsig1 > sig_ceil)
ceil_upper = np.percentile(ampl[ampl_pos], percent_ceil) if np.any(ampl_pos) else np.inf
else:
ceil_upper = np.inf
ampl_clip = np.clip(ampl, None, ceil_upper)
if ampl_clip.size == 0:
msgs.warn('No lines were detected in the arc spectrum. Cannot create a synthetic arc spectrum for cross-correlation.')
return None
# Make a fake arc by plopping down Gaussians at the location of every centroided line we found
xcorr_arc = np.zeros_like(inspec1)
spec_vec = np.arange(inspec1.size)
for ind in range(ampl_clip.size):
# If just the width is a bad, use the width implied by the fwhm
#sigma = twid1[ind] if twid1[ind] != -999.0 else fwhm/2.35
sigma = fwhm/2.35
if tcent1[ind] == -999.0:
continue
xcorr_arc += ampl_clip[ind]*np.exp(-0.5*((spec_vec - tcent1[ind])/sigma)**2)
return xcorr_arc
# ToDO can we speed this code up? I've heard numpy.correlate is faster. Someone should investigate optimization. Also we don't need to compute
# all these lags.
[docs]
def xcorr_shift(inspec1, inspec2, percent_ceil=50.0, use_raw_arc=False, sigdetect=5.0, sig_ceil=10.0, fwhm=4.0,
do_xcorr_arc=True, lag_range=None, max_lag_frac=1.0, debug=False):
"""
Determine the shift inspec2 relative to inspec1. This routine computes the
shift by finding the maximum of the cross-correlation coefficient. The
convention for the shift is that positive shift means inspec2 is shifted to
the right (higher pixel values) relative to inspec1.
Parameters
----------
inspec1 : numpy.ndarray_
Reference spectrum
inspec2 : numpy.ndarray_
Spectrum for which the shift and stretch are computed such
that it will match inspec1
sigdetect : float, optional, default=3.0
Peak finding threshold for lines that will be used to create the
synthetic xcorr_arc
sig_ceil : float, optional, default = 10.0
Significance threshold for peaks that will be used to determine the line
amplitude clipping threshold. For peaks with significance > sig_ceil,
the code will find the amplitude corresponding to perecent_ceil, and
this will be the clipping threshold.
percent_ceil : float, default=90.0
Apply a ceiling to the input spectra at the percent_ceil
percentile level of the distribution of peak amplitudes.
This prevents extremely strong lines from completely
dominating the cross-correlation, which can causes the
cross-correlation to have spurious noise spikes that are not
the real maximum.
use_raw_arc : bool, default = False
If this parameter is True the raw arc will be used rather than the
continuum subtracted arc
do_xcorr_arc : bool, default = True
If this parameter is True, peak finding will be performed and a
synthetic arc will be created to be used for the cross-correlations. If
a synthetic arc has already been created by get_xcorr_arc, then set this
to False
lag_range : tuple, default = None
A tuple of the form (lag_min, lag_max) which sets the range of lags to
search over. If None, max_lag_frac will be used to set the range of lags.
max_lag_frac : float, default = 1.0
Fraction of the total spectral pixels used to determine the range of lags
to search over. The range of lags will be [-nspec*max_lag_frac +1, nspec*max_lag_frac].
debug: boolean, default = False
Produce debugging plot
Returns
-------
shift : float
the shift which was determined
cross_corr: float
the maximum of the cross-correlation coefficient at this shift
"""
if do_xcorr_arc:
y1 = get_xcorr_arc(inspec1, percent_ceil=percent_ceil, use_raw_arc=use_raw_arc, sigdetect=sigdetect, sig_ceil=sig_ceil, fwhm=fwhm)
y2 = get_xcorr_arc(inspec2, percent_ceil=percent_ceil, use_raw_arc=use_raw_arc, sigdetect=sigdetect, sig_ceil=sig_ceil, fwhm=fwhm)
else:
y1, y2 = inspec1, inspec2
if np.all(y1 == 0) or np.all(y2 == 0):
msgs.warn('One of the input spectra is all zeros. Returning shift = 0.0')
return 0.0, 0.0
nspec = y1.shape[0]
if lag_range is not None:
lagmin = lag_range[0]
lagmax = lag_range[1]
else:
lagmin = int((-nspec + 1) * max_lag_frac)
lagmax = int((nspec - 1) * max_lag_frac)
lags = np.linspace(lagmin, lagmax, 2*nspec-1)
corr = scipy.signal.correlate(y1, y2, mode='full')
corr_denom = np.sqrt(np.sum(y1*y1)*np.sum(y2*y2))
corr_norm = corr/corr_denom
tampl_true, tampl, pix_max, twid, centerr, ww, arc_cont, nsig = arc.detect_lines(corr_norm, sigdetect=3.0,
fit_frac_fwhm=1.5, fwhm=5.0,
cont_frac_fwhm=1.0, cont_samp=30,
nfind=1)
corr_max = np.interp(pix_max, np.arange(lags.shape[0]),corr_norm)
lag_max = np.interp(pix_max, np.arange(lags.shape[0]),lags)
if debug:
# Interpolate for bad lines since the fitting code often returns nan
plt.figure(figsize=(14, 6))
plt.plot(lags, corr_norm, color='black', drawstyle = 'steps-mid', lw=3, label = 'x-corr')
plt.plot(lag_max[0], corr_max[0],'g+', markersize =6.0, label = 'peak')
plt.title('Best shift = {:5.3f}'.format(lag_max[0]) + ', corr_max = {:5.3f}'.format(corr_max[0]))
plt.legend()
plt.show()
return lag_max[0], corr_max[0]
[docs]
def xcorr_shift_stretch(inspec1, inspec2, cc_thresh=-1.0, percent_ceil=50.0, use_raw_arc=False,
shift_mnmx=(-0.2,0.2), stretch_mnmx=(0.95,1.05), sigdetect=5.0, sig_ceil=10.0,
fwhm = 4.0, max_lag_frac=1.0, lag_range=None, debug=False, toler=1e-5, seed=None,
stretch_func='quadratic'):
"""
Determine the shift and stretch of inspec2 relative to inspec1. This
routine computes an initial guess for the shift via maximimizing the
cross-correlation. It then performs a two parameter search for the shift and
stretch by optimizing the zero lag cross-correlation between the inspec1 and
the transformed inspec2 (shifted and stretched via
wvutils.shift_and_stretch()) in a narrow window about the initial estimated
shift. The convention for the shift is that positive shift means inspec2 is
shifted to the right (higher pixel values) relative to inspec1. The
convention for the stretch is that it is float near unity that increases the
size of the inspec2 relative to the original size (which is the size of
inspec1)
Parameters
----------
inspec1 : ndarray
Reference spectrum
inspec2 : ndarray
Spectrum for which the shift and stretch are computed such that it will match inspec1
cc_thresh: float, default = -1.0
A number in the range [-1.0,1.0] which is the threshold on the
initial cross-correlation coefficient for the shift/stretch. If
the value of the initial cross-correlation is < cc_thresh the
code will just exit and return this value and the best shift.
This is desirable behavior since the shif/stretch optimization
is slow and this allows one to test how correlated the spectra
are before attempting it, since there is little value in that
expensive computation for spectra with little overlap. The
default cc_thresh =-1.0 means shift/stretch is always attempted
since the cross correlation coeficcient cannot be less than
-1.0.
sigdetect : float, optional, default=3.0
Peak finding threshold for lines that will be used to create the synthetic xcorr_arc
sig_ceil : float, optional, default = 10.0
Significance threshold for peaks that will be used to determine the line amplitude clipping threshold.
For peaks with significance > sig_ceil, the code will find the amplitude corresponding to
perecent_ceil, and this will be the clipping threshold.
percent_ceil: float, default=80.0
Apply a ceiling to the input spectra at the percent_ceil
percentile level of the distribution of peak amplitudes. This
prevents extremely strong lines from completely dominating the
cross-correlation, which can causes the cross-correlation to
have spurious noise spikes that are not the real maximum.
use_raw_arc: bool, default = False
If this parameter is True the raw arc will be used rather than the continuum subtracted arc
lag_range: tuple of floats, default = None
Range to search for the shift in the cross correlation. The code will search the window
[lag_range[0],lag_range[1]]. If None, the code will search the window
[shift_cc + nspec*shift_mnmx[0],shift_cc + nspec*shift_mnmx[1]]
where nspec is the spectral dimension and shift_cc is the initial cross-correlation shift.
shift_mnmx: tuple of floats, default = (-0.05,0.05)
Range to search for the shift in the optimization about the
initial cross-correlation based estimate of the shift. The
optimization will search the window (shift_cc +
nspec*shift_mnmx[0],shift_cc + nspec*shift_mnmx[1]) where nspec
is the number of pixels in the spectrum
stretch_mnmx: tuple of floats, default = (0.97,1.03)
Range to search for the stretch in the optimization. The code
may not work well if this range is significantly expanded
because the linear approximation used to transform the arc
starts to break down.
max_lag_frac: float, default = 1.0
Maximum range of lags over which to compute the cross correlation,
expressed as a fraction of the length of the vectors being cross-correlated.
seed: int or np.random.RandomState, optional, default = None
Seed for scipy.optimize.differential_evolution optimizer. If not
specified, the calculation will not be repeatable
toler : float
Tolerance for differential evolution optimizaiton.
debug = False
Show plots to the screen useful for debugging.
stretch_func : str, optional, default = 'quadratic'
Use quadratic ('quadratic') or linear ('linear') stretch.
Returns
-------
success: int
A flag indicating the exit status. Values are:
- success = 1, shift and stretch performed via sucessful
optimization
- success = 0, shift and stretch optimization failed
- success = -1, x-correlation is below cc_thresh
shift: float
the optimal shift which was determined. If cc_thresh is set,
and the initial cross-correlation is < cc_thresh, then this
will be just the cross-correlation shift
stretch: float
the optimal stretch which was determined. If cc_thresh is set,
and the initial cross-correlation is < cc_thresh, then this
will be just be 1.0
stretch2: float
the optimal second order stretch which was determined. If cc_thresh is set,
and the initial cross-correlation is < cc_thresh, then this
will be just be 1.0. This is 0.0 if the stretch_func = linear
cross_corr: float
the value of the cross-correlation coefficient at the optimal
shift and stretch. This is a number between zero and unity,
which unity indicating a perfect match between the two spectra.
shift_init: float
The initial shift determined by maximizing the cross-correlation
coefficient without allowing for a stretch. If cc_thresh is
set, and the initial cross-correlation is < cc_thresh, this will
be just the shift from the initial cross-correlation
cross_corr_init: float
The maximum of the initial cross-correlation coefficient
determined without allowing for a stretch. If cc_thresh is set,
and the initial cross-correlation is < cc_thresh, this will be
just the initial cross-correlation
"""
nspec = inspec1.size
y1 = get_xcorr_arc(inspec1, percent_ceil=percent_ceil, use_raw_arc=use_raw_arc, sigdetect=sigdetect,
sig_ceil=sig_ceil, fwhm=fwhm)
y2 = get_xcorr_arc(inspec2, percent_ceil=percent_ceil, use_raw_arc=use_raw_arc, sigdetect=sigdetect,
sig_ceil=sig_ceil, fwhm=fwhm)
if y1 is None or y2 is None:
msgs.warn('No lines detected punting on shift/stretch')
return 0, None, None, None, None, None, None
# Do the cross-correlation first and determine the initial shift
shift_cc, corr_cc = xcorr_shift(y1, y2, percent_ceil=None, do_xcorr_arc=False, lag_range=lag_range,
sigdetect=sigdetect, fwhm=fwhm, max_lag_frac=max_lag_frac, debug=debug)
# TODO JFH Is this a good idea? Stretch fitting seems to recover better values
#if corr_cc < -np.inf: # < cc_thresh:
# return -1, shift_cc, 1.0, corr_cc, shift_cc, corr_cc
if lag_range is None:
lag_range = (shift_cc + nspec * shift_mnmx[0], shift_cc + nspec * shift_mnmx[1])
# TODO Can we make the differential evolution run faster?
try:
if stretch_func == 'quadratic':
bounds = [lag_range, stretch_mnmx, (-1.0e-6, 1.0e-6)]
x0_guess = np.array([shift_cc, 1.0, 0.0])
if stretch_func == 'linear':
bounds = [lag_range, stretch_mnmx, (0.0,0.0)]
x0_guess = np.array([shift_cc, 1.0, 0.0])
result = scipy.optimize.differential_evolution(zerolag_shift_stretch, args=(y1,y2), x0=x0_guess, tol=toler, bounds=bounds, disp=False, polish=True, seed=seed)
except PypeItError:
msgs.warn("Differential evolution failed.")
return 0, None, None, None, None, None, None
corr_de = -result.fun
shift_de = result.x[0]
stretch_de = result.x[1]
stretch2_de = result.x[2]
if not result.success:
msgs.warn('Fit for shift and stretch did not converge!')
if(corr_de < corr_cc):
# Occasionally the differential evolution crapps out and returns a value worse that the CC value. In these cases just use the cc value
msgs.warn('Shift/Stretch optimizer performed worse than simple x-correlation.' +
'Returning simple x-correlation shift and no stretch:' + msgs.newline() +
' Optimizer: corr={:5.3f}, shift={:5.3f}, stretch={:7.5f}'.format(corr_de, shift_de,stretch_de) + msgs.newline() +
' X-corr : corr={:5.3f}, shift={:5.3f}'.format(corr_cc,shift_cc))
corr_out = corr_cc
shift_out = shift_cc
stretch_out = 1.0
stretch2_out = 0.0
result_out = 1
else:
corr_out = corr_de
shift_out = shift_de
stretch_out = stretch_de
stretch2_out = stretch2_de
result_out = int(result.success)
if debug:
x1 = np.arange(nspec)
y2_trans = shift_and_stretch(y2, shift_out, stretch_out, stretch2_out, stretch_func='quadratic')
plt.figure(figsize=(14, 6))
plt.plot(x1,y1/y1.max(), 'k-', drawstyle='steps', label ='inspec1, input spectrum')
plt.plot(x1,y2/y2.max(), color='grey', drawstyle='steps', label='inspec2, reference original')
print('REFERENCE MAX', np.max(y2))
plt.plot(x1,y2_trans/y2_trans.max(), 'r-', drawstyle='steps', label='inspec2, reference shift & stretch')
plt.title('shift= {:5.3f}'.format(shift_out) +
', stretch = {:7.5f}'.format(stretch_out) +
', stretch2 = {:7.5f}'.format(stretch2_out) + ', corr = {:5.3f}'.format(corr_out))
plt.legend()
plt.show()
# check if the cc is above the threshold
if corr_out < cc_thresh:
result_out = -1
return result_out, shift_out, stretch_out, stretch2_out, corr_out, shift_cc, corr_cc
[docs]
def wavegrid(wave_min, wave_max, dwave, spec_samp_fact=1.0, log10=False):
"""
Utility routine to generate a uniform grid of wavelengths
Parameters
----------
wave_min : float
Mininum wavelength. Must be linear even if log10 is requested
wave_max : float
Maximum wavelength. Must be linear even if log10 is requested.
dwave : float
Delta wavelength interval. Must be linear if ``log10=False``, or log10
if ``log10=True``
spec_samp_fact : float, optional
Make the wavelength grid sampling finer (spec_samp_fact < 1.0) or
coarser (spec_samp_fact > 1.0) by this sampling factor. This basically
multiples the 'native' spectral pixels by ``spec_samp_fact``, i.e. units
of ``spec_samp_fact`` are pixels.
log10 : bool, optional
Return a geometric wavelength grid with steps of constant log base 10 in
wavelength.
Returns
-------
wave_grid : `numpy.ndarray`_, (ngrid +1,)
New wavelength grid, not masked. This is a set of bin edges (rightmost
edge for the last bin and leftmost edges for the rest), while
wave_grid_mid is a set of bin centers, hence wave_grid has 1 more value
than wave_grid_mid.
wave_grid_mid : `numpy.ndarray`_, (ngrid,)
New wavelength grid evaluated at the centers of the wavelength bins,
that is this grid is simply offset from ``wave_grid`` by ``dsamp/2.0``,
in either linear space or log10 depending on whether linear or (log10 or
velocity) was requested. Last bin center is removed since it falls
outside wave_grid. For iref or concatenate, the linear wavelength
sampling will be calculated.
dsamp : float
The pixel sampling for wavelength grid created.
"""
dwave_eff = dwave*spec_samp_fact
if log10:
ngrid = np.ceil((np.log10(wave_max) - np.log10(wave_min))/dwave_eff).astype(int)
loglam_grid = np.log10(wave_min) + dwave_eff*np.arange(ngrid)
wave_grid = np.power(10.0,loglam_grid)
loglam_grid_mid = np.log10(wave_grid) + dwave_eff/2.0
wave_grid_mid = np.power(10.0, loglam_grid_mid)
else:
ngrid = np.ceil((wave_max - wave_min)/dwave_eff).astype(int)
wave_grid = wave_min + dwave_eff*np.arange(ngrid)
wave_grid_mid = wave_grid + dwave_eff/2.0
return wave_grid, wave_grid_mid[:-1], dwave_eff
[docs]
def write_template(nwwv, nwspec, binspec, outpath, outroot, det_cut=None,
order=None, lines_pix_arr=None, lines_wav_arr=None,
lines_fit_ord=None, overwrite=True, to_cache=False):
"""
Write the template spectrum into a binary FITS table
Args:
nwwv (`numpy.ndarray`_):
Wavelengths for the template
nwspec (`numpy.ndarray`_):
Flux of the template
binspec (int):
Binning of the template
outpath (str):
Directory to store the wavelength template file
outroot (str):
Filename to use for the template
det_cut (bool, optional):
Cuts in wavelength for detector snippets
Used primarily for DEIMOS
order (`numpy.ndarray`_, optional):
Echelle order numbers
overwrite (bool, optional):
If True, overwrite any existing file
to_cache (bool, optional):
Store the wavelength solution in the pypeit cache?
lines_pix_arr (`numpy.ndarray`_, optional):
Pixel values of identified arc line centroids
lines_wav_arr (`numpy.ndarray`_, optional):
Wavelength values of identified arc line centroids
lines_fit_ord (`numpy.ndarray`_, optional):
Polynomial order of the wavelength solution
"""
tbl = Table()
tbl['wave'] = nwwv
tbl['flux'] = nwspec
if order is not None:
tbl['order'] = order
if lines_pix_arr is not None:
tbl['lines_pix'] = lines_pix_arr
tbl['lines_wav'] = lines_wav_arr
tbl['lines_fit_ord'] = lines_fit_ord
tbl.meta['BINSPEC'] = binspec
# Detector snippets??
if det_cut is not None:
tbl['det'] = 0
for dets, wcuts in zip(det_cut['dets'], det_cut['wcuts']):
gdwv = (tbl['wave'] > wcuts[0]) & (tbl['wave'] < wcuts[1])
deti = np.sum([2**ii for ii in dets])
tbl['det'][gdwv] += deti
# Write
outfile = os.path.join(outpath, outroot)
tbl.write(outfile, overwrite=overwrite)
msgs.info(f"Your arxiv solution has been written to {outfile}\n")
if to_cache:
# Also copy the file to the cache for direct use
cache.write_file_to_cache(outroot, outroot, "arc_lines/reid_arxiv")
msgs.info(f"Your arxiv solution has also been cached.{msgs.newline()}"
f"To utilize this wavelength solution, insert the{msgs.newline()}"
f"following block in your PypeIt Reduction File:{msgs.newline()}"
f" [calibrations]{msgs.newline()}"
f" [[wavelengths]]{msgs.newline()}"
f" reid_arxiv = {outroot}{msgs.newline()}"
f" method = full_template\n")
print("") # Empty line for clarity
msgs.info(f"To use exactly the solutions created above {msgs.newline()}"
f"disable the 2d fitting by adding the keyword ech_2dfit = False")
print("") # Empty line for clarity
msgs.info("Please consider sharing your solution with the PypeIt Developers.")