""" Module for flexure routines
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import copy
import inspect
import pathlib
import numpy as np
from matplotlib import pyplot as plt
from matplotlib import gridspec
import matplotlib
from astropy import stats
from astropy import units
from astropy.io import ascii
from astropy.table import Table
import scipy.signal
import scipy.optimize as opt
from scipy import interpolate
from linetools.spectra import xspectrum1d
from pypeit import msgs
from pypeit import dataPaths
from pypeit import io
from pypeit import utils
from pypeit.display import display
from pypeit.core.wavecal import autoid
from pypeit.core import arc
from pypeit.core import extract
from pypeit.core import fitting
from pypeit.core import qa
from pypeit.datamodel import DataContainer
from pypeit.images.detector_container import DetectorContainer
from pypeit.images.mosaic import Mosaic
from pypeit import specobj, specobjs
from pypeit import wavemodel
from IPython import embed
[docs]
def spat_flexure_shift(sciimg, slits, debug=False, maxlag = 20):
"""
Calculate a rigid flexure shift in the spatial dimension
between the slitmask and the science image.
It is *important* to use original=True when defining the
slitmask as everything should be relative to the initial slits
Otherwise, the WaveTilts could get out of sync with science images
Args:
sciimg (`numpy.ndarray`_):
slits (:class:`pypeit.slittrace.SlitTraceSet`):
maxlag (:obj:`int`, optional):
Maximum flexure searched for
Returns:
float: The spatial flexure shift relative to the initial slits
"""
# Mask -- Includes short slits and those excluded by the user (e.g. ['rdx']['slitspatnum'])
slitmask = slits.slit_img(initial=True, exclude_flag=slits.bitmask.exclude_for_flexure)
_sciimg = sciimg if slitmask.shape == sciimg.shape \
else arc.resize_mask2arc(slitmask.shape, sciimg)
onslits = slitmask > -1
corr_slits = onslits.astype(float).flatten()
# Compute
mean_sci, med_sci, stddev_sci = stats.sigma_clipped_stats(_sciimg[onslits])
thresh = med_sci + 5.0*stddev_sci
corr_sci = np.fmin(_sciimg.flatten(), thresh)
lags, xcorr = utils.cross_correlate(corr_sci, corr_slits, maxlag)
xcorr_denom = np.sqrt(np.sum(corr_sci*corr_sci)*np.sum(corr_slits*corr_slits))
xcorr_norm = xcorr / xcorr_denom
# TODO -- Generate a QA plot
tampl_true, tampl, pix_max, twid, centerr, ww, arc_cont, nsig \
= arc.detect_lines(xcorr_norm, sigdetect=3.0, fit_frac_fwhm=1.5, fwhm=5.0,
cont_frac_fwhm=1.0, cont_samp=30, nfind=1, debug=debug)
# No peak? -- e.g. data fills the entire detector
if len(tampl) == 0:
msgs.warn('No peak found in spatial flexure. Assuming there is none...')
return 0.
# Find the peak
xcorr_max = np.interp(pix_max, np.arange(lags.shape[0]), xcorr_norm)
lag_max = np.interp(pix_max, np.arange(lags.shape[0]), lags)
msgs.info('Spatial flexure measured: {}'.format(lag_max[0]))
if debug:
plt.figure(figsize=(14, 6))
plt.plot(lags, xcorr_norm, color='black', drawstyle='steps-mid', lw=3, label='x-corr')
plt.plot(lag_max[0], xcorr_max[0], 'g+', markersize=6.0, label='peak')
plt.title('Best shift = {:5.3f}'.format(lag_max[0]) + ', corr_max = {:5.3f}'.format(xcorr_max[0]))
plt.legend()
plt.show()
#tslits_shift = trace_slits.shift_slits(tslits_dict, lag_max)
# Now translate the tilts
#slitmask_shift = pixels.tslits2mask(tslits_shift)
#slitmask_shift = slits.slit_img(flexure=lag_max[0])
if debug:
# Now translate the slits in the tslits_dict
all_left_flexure, all_right_flexure, mask = slits.select_edges(flexure=lag_max[0])
gpm = mask == 0
viewer, ch = display.show_image(_sciimg)
#display.show_slits(viewer, ch, left_flexure[:,gpm], right_flexure)[:,gpm]#, slits.id) #, args.det)
#embed(header='83 of flexure.py')
return lag_max[0]
[docs]
def spec_flex_shift(obj_skyspec, sky_file=None, arx_skyspec=None, arx_fwhm_pix=None,
spec_fwhm_pix=None, mxshft=20, excess_shft="crash",
method="boxcar", minwave=None, maxwave=None):
""" Calculate shift between object sky spectrum and archive sky spectrum
Args:
obj_skyspec (`linetools.spectra.xspectrum1d.XSpectrum1d`_):
Spectrum of the sky related to our object
sky_file (:obj:`str`, optional):
Name of the archival sky file. If equal to 'model', instead,
a model sky spectrum will be generated using :func:`~pypeit.wavemodel.nearIR_modelsky`
and the spectral resolution of obj_skyspec. If None, arx_skyspec and arx_fwhm_pix
must be provided.
arx_skyspec (`linetools.spectra.xspectrum1d.XSpectrum1d`_, optional):
Archived sky spectrum. If None, it will be loaded from the sky_file
(sky_file must be provided).
arx_fwhm_pix (:obj:`float`, optional):
Spectral FWHM (in pixels) of the archived sky spectrum. If None, it will be
calculated using sky_file (sky_file must be provided).
spec_fwhm_pix (:obj:`float`, optional):
Spectral FWHM (in pixels) of the sky spectrum related to our object/slit.
mxshft (:obj:`int`, optional):
Maximum allowed shift from flexure; note there are cases that
have been known to exceed even 30 pixels.
excess_shft (:obj:`str`, optional):
Behavior of the code when a measured flexure exceeds ``mxshft``.
Options are "crash", "set_to_zero", and "continue", where
"set_to_zero" sets the shift to zero and moves on, and
"continue" simply uses the large flexure shift value.
method (:obj:`str`, optional):
Which method is used for the spectral flexure correction.
Two methods are available: 'boxcar' and 'slitcen' (see spec_flexure_slit()).
In this routine, 'method' is only passed to final dict.
minwave (:obj:`float`, optional):
Minimum wavelength to use for the correlation. If ``None`` or less than
the minumum wavelength of either ``obj_skyspec`` or ``arx_skyspec``,
this has no effect. Default is None.
maxwave (:obj:`float`, optional):
Maximum wavelength to use for the correlation. If ``None`` or greater than
the maximum wavelength of either ``obj_skyspec`` or ``arx_skyspec``,
this has no effect. Default is None.
Returns:
dict: Contains flexure info. Keys are:
- polyfit= fit to the cross-correlation
- shift= best shift in pixels
- subpix= subpixelation of input spectrum
- corr= correlation function
- sky_spec= object sky spectrum used (rebinned, etc.)
- arx_spec= archived sky spectrum used
- corr_cen= center of the correlation function
- smooth= Degree of smoothing of input spectrum to match archive
"""
# TODO None of these routines should have dependencies on XSpectrum1d!
# Check input mode
if sky_file is None and arx_skyspec is None:
msgs.error("sky_file or arx_skyspec must be provided")
elif sky_file is not None and arx_skyspec is not None:
msgs.warn("sky_file and arx_skyspec both provided. Using arx_skyspec.")
sky_file = None
# Arxiv sky spectrum
if sky_file is not None:
# Load arxiv sky spectrum
msgs.info("Loading the arxiv sky spectrum and computing its spectral FWHM")
arx_skyspec, arx_fwhm_pix = get_archive_spectrum(sky_file, obj_skyspec=obj_skyspec, spec_fwhm_pix=spec_fwhm_pix)
elif arx_fwhm_pix is None:
# get arxiv sky spectrum resolution (FWHM in pixels)
msgs.info("Computing the spectral FWHM for the provided arxiv sky spectrum")
arx_fwhm_pix = autoid.measure_fwhm(arx_skyspec.flux.value, sigdetect=4., fwhm=4.)
if arx_fwhm_pix is None:
msgs.error('Failed to measure the spectral FWHM of the archived sky spectrum. '
'Not enough sky lines detected. Provide a value using arx_fwhm_pix')
# initialize smooth_fwhm_pix
smooth_fwhm_pix = None
# smooth to the same resolution as the object sky spectrum? Yes, if not using a model sky
if sky_file != 'model':
# get gaussian sigma (pixels) for smoothing
smooth_fwhm_pix = get_fwhm_gauss_smooth(arx_skyspec, obj_skyspec, arx_fwhm_pix, spec_fwhm_pix=spec_fwhm_pix)
if smooth_fwhm_pix is None:
# smooth_fwhm_pix is None if spec_fwhm_pix<0, i.e., the wavelength calibration is bad
msgs.warn('No flexure correction could be computed for this slit/object')
return None
if smooth_fwhm_pix > 0:
arx_skyspec = arx_skyspec.gauss_smooth(smooth_fwhm_pix)
# Determine region of wavelength overlap
minwave = 0 if minwave is None else minwave
maxwave = np.inf if maxwave is None else maxwave
min_wave = max(np.amin(arx_skyspec.wavelength.value), np.amin(obj_skyspec.wavelength.value), minwave)
max_wave = min(np.amax(arx_skyspec.wavelength.value), np.amax(obj_skyspec.wavelength.value), maxwave)
# Define wavelengths of overlapping spectra
keep_idx = np.where((obj_skyspec.wavelength.value>=min_wave) &
(obj_skyspec.wavelength.value<=max_wave))[0]
# Rebin both spectra onto overlapped wavelength range
if len(keep_idx) <= 50:
msgs.warn("Not enough overlap between sky spectra")
return None
# rebin onto object ALWAYS
keep_wave = obj_skyspec.wavelength[keep_idx]
arx_skyspec = arx_skyspec.rebin(keep_wave)
obj_skyspec = obj_skyspec.rebin(keep_wave)
# Deal with bad pixels
msgs.work("Need to mask bad pixels")
# Trim edges (rebinning is junk there)
arx_skyspec.data['flux'][0,:2] = 0.
arx_skyspec.data['flux'][0,-2:] = 0.
obj_skyspec.data['flux'][0,:2] = 0.
obj_skyspec.data['flux'][0,-2:] = 0.
# Set minimum to 0. For bad rebinning and for pernicious extractions
obj_skyspec.data['flux'][0,:] = np.maximum(obj_skyspec.data['flux'][0,:], 0.)
arx_skyspec.data['flux'][0,:] = np.maximum(arx_skyspec.data['flux'][0,:], 0.)
# clip too large values (>90%) only in obj_skyspec (assuming arx_skyspec is being vetted before)
# this is used ony for the cross-correlation
obj_skyspec_flux = obj_skyspec.flux.value
_lower, _upper = get_percentile_clipping(obj_skyspec_flux, percent=90.0)
obj_skyspec_flux = np.clip(obj_skyspec_flux, _lower, _upper)
# Normalize spectra to unit average sky count
norm = np.sum(obj_skyspec_flux)/obj_skyspec.npix
norm2 = np.sum(arx_skyspec.flux.value)/arx_skyspec.npix
if norm <= 0:
msgs.warn("Bad normalization of object in flexure algorithm")
msgs.warn("Will try the median")
norm = np.median(obj_skyspec_flux)
if norm <= 0:
msgs.warn("Improper sky spectrum for flexure. Is it too faint??")
return None
if norm2 <= 0:
msgs.warn('Bad normalization of archive in flexure. You are probably using wavelengths '
'well beyond the archive.')
return None
obj_skyspec_flux = obj_skyspec_flux / norm
arx_skyspec.flux = arx_skyspec.flux / norm2
# Subtract continuum and apply a ceiling to the spectra
percent_ceil = 50.
# obj_skyspec
_, obj_ampl, _, _, _, _, obj_sky_flux, _ = arc.detect_lines(obj_skyspec_flux, sigdetect=5.0)
if obj_ampl.size > 0:
obj_lower, obj_upper = get_percentile_clipping(obj_ampl, percent=percent_ceil)
obj_sky_flux = np.clip(obj_sky_flux, obj_lower, obj_upper)
# arx_skyspec
_, arx_ampl, _, _, _, _, arx_sky_flux, _ = arc.detect_lines(arx_skyspec.flux.value, sigdetect=5.0)
if arx_ampl.size > 0:
arx_lower, arx_upper = get_percentile_clipping(arx_ampl, percent=percent_ceil)
arx_sky_flux = np.clip(arx_sky_flux, arx_lower, arx_upper)
#
# # Consider sharpness filtering (e.g. LowRedux)
# msgs.work("Consider taking median first [5 pixel]")
# Cross correlation of spectra
corr = np.correlate(arx_sky_flux, obj_sky_flux, "same")
# Create array around the max of the correlation function for fitting for subpixel max
# Restrict to pixels within maxshift of zero lag
lag0 = corr.size//2
max_corr = np.argmax(corr[lag0-mxshft:lag0+mxshft]) + lag0-mxshft
subpix_grid = np.linspace(max_corr-3., max_corr+3., 7)
# Fit a 2-degree polynomial to peak of correlation function. JFH added this if/else to not crash for bad slits
if np.any(np.isfinite(corr[subpix_grid.astype(int)])):
fit = fitting.PypeItFit(xval=subpix_grid, yval=corr[subpix_grid.astype(int)],
func='polynomial', order=np.atleast_1d(2))
fit.fit()
max_fit = -0.5 * fit.fitc[1] / fit.fitc[2]
shift = float(max_fit) - lag0
# Deal with the case of shifts greater than ``mxshft``
# We need to compare the absolute value of shift to ``mxshft``, since shift can be
# positive or negative, while ``mxshft`` is generally only positive
# We use the int of abs(shift) to avoid to trigger the error/warning for differences <1pixel
# TODO :: I'm not convinced that we need int here...
if int(abs(shift)) > mxshft:
msgs.warn(f"Computed shift {shift:.1f} pix is "
f"larger than specified maximum {mxshft} pix.")
if excess_shft == "crash":
msgs.error(f"Flexure compensation failed for one of your{msgs.newline()}"
f"objects. Either adjust the \"spec_maxshift\"{msgs.newline()}"
f"FlexurePar Keyword, or see the flexure documentation{msgs.newline()}"
f"for information on how to bypass this error using the{msgs.newline()}"
f"\"excessive_shift\" keyword.{msgs.newline()}"
"https://pypeit.readthedocs.io/en/release/flexure.html")
elif excess_shft == "set_to_zero":
msgs.warn("Flexure compensation failed for one of your objects.")
msgs.warn("Setting the flexure correction shift to 0 pixels.")
# Return the usual dictionary, but with a shift == 0
shift = 0.0
elif excess_shft == "continue":
msgs.warn("Applying flexure shift larger than specified max!")
elif excess_shft == "use_median":
msgs.warn("Will try to use a flexure shift from other slit/object. "
"If not available, flexure correction will not be applied.")
return None
else:
msgs.error(f"FlexurePar Keyword excessive_shift = \"{excess_shft}\" "
"not recognized.")
msgs.info(f"Flexure correction of {shift:.3f} pixels")
else:
fit = fitting.PypeItFit(xval=subpix_grid, yval=0.0*subpix_grid,
func='polynomial', order=np.atleast_1d(2))
fit.fit()
msgs.warn('Flexure compensation failed for one of your objects')
return None
return dict(polyfit=fit, shift=shift, subpix=subpix_grid,
corr=corr[subpix_grid.astype(int)], sky_spec=obj_skyspec, arx_spec=arx_skyspec,
corr_cen=lag0, smooth=smooth_fwhm_pix, method=method)
[docs]
def get_percentile_clipping(arr, percent=90.0):
"""
Get the values for clipping based on a percentile
Args:
arr (`numpy.ndarray`_):
Array to clip.
percent (:obj:`float`):
Percentile to clip at. Default is 90.0
Returns:
:obj:`float`: Lower value for clipping
:obj:`float`: Upper value for clipping
"""
lower = np.percentile(arr[arr < 0.0], percent) if np.any(arr < 0.0) else 0.0
upper = np.percentile(arr[arr >= 0.0], percent) if np.any(arr >= 0.0) else 0.0
return lower, upper
[docs]
def get_fwhm_gauss_smooth(arx_skyspec, obj_skyspec, arx_fwhm_pix, spec_fwhm_pix=None):
"""
Args:
arx_skyspec (`linetools.spectra.xspectrum1d.XSpectrum1d`_):
Archived sky spectrum.
obj_skyspec (`linetools.spectra.xspectrum1d.XSpectrum1d`_):
Sky spectrum associated with the science target.
arx_fwhm_pix (:obj:`float`):
Spectral FWHM (in pixels) of the archived sky spectrum.
spec_fwhm_pix (:obj:`float`, optional):
Spectral FWHM (in pixels) of the sky spectrum related to our object.
Returns:
:obj:`float`: FWHM of the smoothing Gaussian in pixels.
"""
# determine object spectral FWHM (in Angstrom) using obj_skyspec
# if spec_fwhm_pix (typically from wave calibration) is None
if spec_fwhm_pix is None:
# pixels
spec_fwhm_pix = autoid.measure_fwhm(obj_skyspec.flux.value, sigdetect=4., fwhm=4.)
msgs.info('Measuring spectral FWHM using the boxcar extracted sky spectrum.')
if spec_fwhm_pix is None:
msgs.warn('Failed to measure the spectral FWHM using the boxcar extracted sky spectrum. '
'Not enough sky lines detected.')
return None
# object sky spectral dispersion (Angstrom/pixel)
obj_disp = np.median(np.diff(obj_skyspec.wavelength.value))
# Angstrom
spec_fwhm = spec_fwhm_pix * obj_disp
# determine arxiv sky spectral FWHM (in Angstrom)
# arxiv sky spectral dispersion (Angstrom/pixel)
arx_disp = np.median(np.diff(arx_skyspec.wavelength.value))
arx_fwhm = arx_fwhm_pix * arx_disp
msgs.info(f"Resolution (FWHM) of Archive={arx_fwhm:.2f} Ang and Observation={spec_fwhm:.2f} Ang")
if spec_fwhm <= 0:
msgs.warn('Negative spectral FWHM, likely due to a bad wavelength calibration.')
return None
# Determine fwhm of the smoothing gaussian
# object sky spectral fwhm (Angstrom)
obj_med_fwhm2 = np.power(spec_fwhm, 2)
# arxiv sky spectral fwhm (Angstrom)
arx_med_fwhm2 = np.power(arx_fwhm, 2)
if obj_med_fwhm2 >= arx_med_fwhm2:
smooth_fwhm = np.sqrt(obj_med_fwhm2-arx_med_fwhm2) # Ang
smooth_fwhm_pix = smooth_fwhm / arx_disp
else:
msgs.warn("Prefer archival sky spectrum to have higher resolution")
smooth_fwhm_pix = 0.
msgs.warn("New Sky has higher resolution than Archive. Not smoothing")
return smooth_fwhm_pix
[docs]
def flexure_interp(shift, wave):
"""
Perform interpolation on wave given a shift in pixels
Args:
shift (float):
Shift in pixels
wave (`numpy.ndarray`_):
extracted wave of size nspec
wavein (`numpy.ndarray`_, optional):
Apply the shift to this array of wavelengths
Returns:
`numpy.ndarray`_: Wavelength scale corrected for spectral flexure
"""
npix = wave.size
x = np.linspace(0., 1., npix)
f = interpolate.interp1d(x, wave, bounds_error=False, fill_value="extrapolate")
twave = f(x + shift / (npix - 1))
return twave
[docs]
def spec_flex_shift_global(slit_specs, islit, sky_file, empty_flex_dict,
return_later_slits, flex_list, keys_to_update, spec_fwhm_pix=None,
mxshft=20, excess_shft="crash", method='slitcen', minwave=None, maxwave=None):
""" Calculate flexure shifts using the sky spectrum extracted at the center of the slit
Args:
slit_specs (:obj:`list`):
A list of `linetools.xspectrum1d`, one for each slit. The spectra stored in
this list are sky spectra, extracted from the center of each slit.
islit (:obj:`int`):
Index of the slit where the sky spectrum related to our object is.
sky_file (`str`):
Name of the archival sky file. If equal to 'model', instead,
a model sky spectrum will be generated using :func:`~pypeit.wavemodel.nearIR_modelsky`
and the spectral resolution of each spectrum from slit_specs.
empty_flex_dict (:obj:`dict`):
Empty dictionary to be filled with flexure results.
return_later_slits (:obj:`list`):
List of slit indexes that failed the shift calcultion and we want to come back to
to assign a value from a different slit.
flex_list (:obj:`list`):
A list of :obj:`dict` objects containing flexure results of each slit.
keys_to_update (:obj:`list`):
List of flexure dictionary keys that we need to update.
spec_fwhm_pix (:obj:`float`, optional):
Spectral FWHM (in pixels) of the sky spectrum related to our object.
mxshft (:obj:`int`, optional):
Maximum allowed shift from flexure. Passed to spec_flex_shift().
excess_shft (:obj:`str`, optional):
Behavior of the code when a measured flexure exceeds ``mxshft``.
Passed to spec_flex_shift()
method (:obj:`str`, optional):
Which method is used for the spectral flexure correction.
Two methods are available: 'boxcar' and 'slitcen' (see spec_flexure_slit()).
Passed to spec_flex_shift().
minwave (:obj:`float`, optional):
Minimum wavelength to use for the correlation. If ``None`` or less than
the minumum wavelength of either this sky or ``sky_spectrum``,
this has no effect. Default is None.
maxwave (:obj:`float`, optional):
Maximum wavelength to use for the correlation. If ``None`` or greater than
the maximum wavelength of either this sky or ``sky_spectrum``,
this has no effect. Default is None.
Returns:
:obj:`list`: A list of :obj:`dict` objects containing flexure
results of each slit. This is filled with a basically empty
dict if the shift calculation failed for the relevant slit.
"""
# Reset the flexure dictionary
flex_dict = copy.deepcopy(empty_flex_dict)
# Calculate the shift
fdict = spec_flex_shift(slit_specs[islit], sky_file=sky_file, mxshft=mxshft, excess_shft=excess_shft,
spec_fwhm_pix=spec_fwhm_pix, method=method, minwave=minwave, maxwave=maxwave)
# Was it successful?
if fdict is not None:
# Update dict
for key in keys_to_update[:-1]:
flex_dict[key].append(fdict[key])
# Interpolate
sky_wave_new = flexure_interp(fdict['shift'], slit_specs[islit].wavelength.value)
flex_dict['sky_spec'].append(xspectrum1d.XSpectrum1D.from_tuple((sky_wave_new, slit_specs[islit].flux.value)))
else:
# No success, come back to it later
return_later_slits.append(islit)
msgs.warn("Flexure shift calculation failed for this slit.")
msgs.info("Will come back to this slit to attempt "
"to use saved estimates from other slits")
# Append flex_dict, which will be an empty dictionary if the flexure failed for the all the slits
flex_list.append(flex_dict.copy())
return flex_list
[docs]
def spec_flex_shift_local(slits, slitord, specobjs, islit, sky_file, empty_flex_dict,
return_later_slits, flex_list, keys_to_update, spec_fwhm_pix=None, mxshft=20,
excess_shft="crash", method='boxcar', minwave=None, maxwave=None):
""" Calculate flexure shifts using the sky spectrum boxcar-extracted at the location of the detected objects
Args:
slits (:class:`~pypeit.slittrace.SlitTraceSet`):
Slit trace set.
slitord (`numpy.ndarray`_):
Array of slit/order numbers.
specobjs (:class:`~pypeit.specobjs.SpecObjs`, optional):
Spectral extractions.
islit (:obj:`int`):
Index of the slit where the sky spectrum related to our object is.
sky_file (`str`):
Name of the archival sky file. If equal to 'model', instead,
a model sky spectrum will be generated using :func:`~pypeit.wavemodel.nearIR_modelsky`
and the spectral resolution of each spectrum in specobjs.
empty_flex_dict (:obj:`dict`):
Empty dictionary to be filled with flexure results.
return_later_slits (:obj:`list`):
List of slit indexes that failed the shift calcultion and we want to come back to
to assign a value from a different slit.
flex_list (:obj:`list`):
A list of :obj:`dict` objects containing flexure results of each slit.
keys_to_update (:obj:`list`):
List of flexure dictionary keys that we need to update.
spec_fwhm_pix (:obj:`float`, optional):
Spectral FWHM (in pixels) of the sky spectrum related to our object.
mxshft (:obj:`int`, optional):
Maximum allowed shift from flexure. Passed to spec_flex_shift().
excess_shft (:obj:`str`, optional):
Behavior of the code when a measured flexure exceeds ``mxshft``.
Passed to spec_flex_shift()
method (:obj:`str`, optional):
Which method is used for the spectral flexure correction.
Two methods are available: 'boxcar' and 'slitcen' (see spec_flexure_slit()).
Passed to spec_flex_shift().
minwave (:obj:`float`, optional):
Minimum wavelength to use for the correlation. If ``None`` or less than
the minumum wavelength of either this sky or ``sky_spectrum``,
this has no effect. Default is None.
maxwave (:obj:`float`, optional):
Maximum wavelength to use for the correlation. If ``None`` or greater than
the maximum wavelength of either this sky or ``sky_spectrum``,
this has no effect. Default is None.
Returns:
:obj:`list`: A list of :obj:`dict` objects containing flexure
results of each slit. This is filled with a basically empty
dict if the shift calculation failed for the relevant slit.
"""
# Reset the flexure dictionary
flex_dict = copy.deepcopy(empty_flex_dict)
# get objects in this slit
i_slitord = slitord[islit]
indx = specobjs.slitorder_indices(i_slitord)
this_specobjs = specobjs[indx]
# if no objects in this slit, append an empty dict
if len(this_specobjs) == 0:
msgs.info('No object extracted in this slit.')
flex_list.append(empty_flex_dict.copy())
return flex_list
# Objects in this slit that failed and we want to come back to
# to assign values from other objects in the same slit (if available)
return_later_sobjs = []
# Loop through objects
for ss, sobj in enumerate(this_specobjs):
if sobj is None or sobj['BOX_WAVE'] is None: # Nothing extracted; only the trace exists
msgs.info(f'Object # {ss} was not extracted.')
# Update dict
for key in keys_to_update:
# append None
flex_dict[key].append(None)
continue
msgs.info(f"Working on spectral flexure for object # {ss} in slit {slits.spat_id[islit]}")
# get 1D spectrum for this object
obj_sky = xspectrum1d.XSpectrum1D.from_tuple((sobj.BOX_WAVE[sobj.BOX_MASK], sobj.BOX_COUNTS_SKY[sobj.BOX_MASK]))
# Calculate the shift
fdict = spec_flex_shift(obj_sky, sky_file=sky_file, mxshft=mxshft, excess_shft=excess_shft,
spec_fwhm_pix=spec_fwhm_pix, method=method, minwave=minwave, maxwave=maxwave)
if fdict is not None:
# Update dict
for key in keys_to_update:
flex_dict[key].append(fdict[key])
else:
# No success, come back to it later
return_later_sobjs.append(ss)
msgs.warn("Flexure shift calculation failed for this spectrum.")
msgs.info("Will come back to this spectrum to attempt "
"to use saved estimates from other slits/objects")
# Check if we need to go back
if (len(return_later_sobjs) > 0) and (len(flex_dict['shift']) > 0):
msgs.warn(f'Flexure shift calculation failed for {len(return_later_sobjs)} '
f'object(s) in slit {slits.spat_id[islit]}')
# get the median shift among all objects in this slit
idx_med_shift = np.where(flex_dict['shift'] == np.percentile(flex_dict['shift'], 50,
method='nearest'))[0][0]
msgs.info(f"Median value of the measured flexure shifts in this slit, equal to "
f"{flex_dict['shift'][idx_med_shift]:.3f} pixels, will be used")
# assign the median shift to the failed objects
for obj_idx in return_later_sobjs:
# Update dict
for key in keys_to_update[:-1]:
# insert the median value at the location of the object that failed the calculation
flex_dict[key].insert(obj_idx, flex_dict[key][idx_med_shift])
# Interpolate
sky_wave_new = flexure_interp(flex_dict['shift'][obj_idx], this_specobjs[obj_idx].BOX_WAVE)
flex_dict['sky_spec'].insert(obj_idx, xspectrum1d.XSpectrum1D.from_tuple(
(sky_wave_new, this_specobjs[obj_idx].BOX_COUNTS_SKY)))
# if flexure failed for every objects in this slit, save for later to use value from other slits
elif (len(return_later_sobjs) > 0) and (len(flex_dict['shift']) == 0):
return_later_slits.append(islit)
# Append flex_dict, which will be an empty dictionary if the flexure failed for the whole slit
flex_list.append(flex_dict.copy())
return flex_list
[docs]
def spec_flexure_slit(slits, slitord, slit_bpm, sky_file, method="boxcar", specobjs=None,
slit_specs=None, wv_calib=None, mxshft=None, excess_shft="crash",
minwave=None, maxwave=None):
"""Calculate the spectral flexure for every slit (global) or object (local)
Args:
slits (:class:`~pypeit.slittrace.SlitTraceSet`):
Slit trace set
slitord (`numpy.ndarray`_):
Array of slit/order numbers
slit_bpm (`numpy.ndarray`_):
True = masked slit
sky_file (:obj:`str`):
Name of the archival sky file. If equal to 'model', instead,
a model sky spectrum will be generated using :func:`~pypeit.wavemodel.nearIR_modelsky`
and the spectral resolution of each spectrum that we want to correct for flexure.
method (:obj:`str`, optional):
Two methods are available:
- 'boxcar': Recommended for object extractions. This
method uses the boxcar extracted sky and wavelength
spectra from the input specobjs
- 'slitcen': Recommended when no objects are being
extracted. This method uses a spectrum (stored in
slitspecs) that is extracted from the center of
each slit.
specobjs (:class:`~pypeit.specobjs.SpecObjs`, optional):
Spectral extractions
slit_specs (:obj:`list`, optional):
A list of linetools.xspectrum1d, one for each slit. The spectra stored in
this list are sky spectra, extracted from the center of each slit.
wv_calib (:class:`pypeit.wavecalib.WaveCalib`):
Wavelength calibration object
mxshft (:obj:`int`, optional):
Passed to spec_flex_shift()
excess_shft (:obj:`str`, optional):
Passed to spec_flex_shift()
minwave (:obj:`float`, optional):
Minimum wavelength to use for the correlation. If ``None`` or less than
the minumum wavelength of either this sky or ``sky_spectrum``,
this has no effect. Default is None.
maxwave (:obj:`float`, optional):
Maximum wavelength to use for the correlation. If ``None`` or greater than
the maximum wavelength of either this sky or ``sky_spectrum``,
this has no effect. Default is None.
Returns:
:obj:`list`: A list of :obj:`dict` objects containing flexure
results of each slit. This is filled with a basically empty
dict if the slit is skipped.
"""
msgs.work("Consider doing 2 passes in flexure as in LowRedux")
# Determine the method
slit_cen = True if (specobjs is None) or (method == "slitcen") else False
# Initialise the flexure list for each slit
flex_list = []
# initiate list of slits to come back to if flexure calculation failed
return_later_slits = []
# empty dict
empty_flex_dict = dict(polyfit=[], shift=[], subpix=[], corr=[],
corr_cen=[], spec_file=sky_file, smooth=[],
arx_spec=[], sky_spec=[], method=[])
# flex dict keys that we need to update through the routine
keys_to_update = ['polyfit', 'shift', 'subpix', 'corr', 'corr_cen', 'smooth', 'method', 'arx_spec', 'sky_spec']
# Loop over slits
# good slits
gdslits = np.where(np.logical_not(slit_bpm))[0]
for islit in range(slits.nslits):
msgs.info(f"Working on spectral flexure of slit: {slits.spat_id[islit]}")
# If no objects on this slit append an empty dictionary
if islit not in gdslits:
flex_list.append(empty_flex_dict.copy())
continue
# get spectral FWHM (in pixels) if available
spec_fwhm_pix = None
if wv_calib is not None:
# Allow for wavelength failures
if wv_calib.fwhm_map is not None:
# Evaluate the spectral FWHM at the centre of the slit (in both the spectral and spatial directions)
spec_fwhm_pix = wv_calib.fwhm_map[islit].eval(slits.nspec/2, 0.5)
if slit_cen:
# global flexure
flex_list = spec_flex_shift_global(slit_specs, islit, sky_file, empty_flex_dict,
return_later_slits, flex_list, keys_to_update,
spec_fwhm_pix=spec_fwhm_pix, mxshft=mxshft, excess_shft=excess_shft,
minwave=minwave, maxwave=maxwave)
else:
# local flexure
flex_list = spec_flex_shift_local(slits, slitord, specobjs, islit, sky_file,
empty_flex_dict, return_later_slits, flex_list, keys_to_update,
spec_fwhm_pix=spec_fwhm_pix, mxshft=mxshft, excess_shft=excess_shft,
minwave=minwave, maxwave=maxwave)
# Check if we need to go back to some failed slits
if len(return_later_slits) > 0:
msgs.warn(f'Flexure shift calculation failed for {len(return_later_slits)} slits')
# take the median value to deal with the cases when there are more than one shift per slit (e.g., local flexure)
saved_shifts = np.array([np.percentile(flex['shift'], 50, method='nearest')
if len(flex['shift']) > 0 else None for flex in flex_list])
if np.all(saved_shifts == None):
# If all the elements in saved_shifts are None means that there are no saved shifts available
msgs.warn(f'No previously saved flexure shift estimates available. '
f'Flexure corrections cannot be performed.')
for islit in range(slits.nslits):
# we append an empty dictionary
flex_list.append(empty_flex_dict.copy())
else:
# get the median shift value among all slit
med_shift = np.percentile(saved_shifts[saved_shifts!= None], 50, method='nearest')
# in which slit the median is?
islit_med_shift = np.where(saved_shifts == med_shift)[0][0]
msgs.info(f"Median value of all the measured flexure shifts, equal to "
f"{saved_shifts[islit_med_shift]:.3f} pixels, will be used")
# global flexure
if slit_cen:
# get the dict where the med shift is
fdict = flex_list[islit_med_shift].copy()
# assign fdict to the failed slits
for sidx in return_later_slits:
# Reset the dict
flex_dict = copy.deepcopy(empty_flex_dict)
# Update dict
for key in keys_to_update[:-1]:
flex_dict[key].append(fdict[key][0])
# Interpolate
sky_wave_new = flexure_interp(fdict['shift'][0], slit_specs[sidx].wavelength.value)
flex_dict['sky_spec'].append(xspectrum1d.XSpectrum1D.from_tuple((sky_wave_new, slit_specs[sidx].flux.value)))
# insert flex_dict in flex_list at the location of the slit that failed the calculation
flex_list[sidx] = flex_dict
# local flexure
else:
# get the dict where the med shift is
idx_med_shift = np.where(flex_list[islit_med_shift]['shift'] == med_shift)[0][0]
fdict = copy.deepcopy(empty_flex_dict)
for key in keys_to_update[:-1]:
fdict[key].append(flex_list[islit_med_shift][key][idx_med_shift])
# assign fdict to the failed object
for sidx in return_later_slits:
# get objects in this slit
i_slitord = slitord[sidx]
indx = specobjs.slitorder_indices(i_slitord)
for i in range(len(specobjs[indx])):
# Reset the dict
flex_dict = copy.deepcopy(empty_flex_dict)
# Update dict
for key in keys_to_update[:-1]:
flex_dict[key].append(fdict[key][0])
# Interpolate
sky_wave_new = flexure_interp(fdict['shift'][0], specobjs[indx][i].BOX_WAVE)
flex_dict['sky_spec'].append(
xspectrum1d.XSpectrum1D.from_tuple((sky_wave_new, specobjs[indx][i].BOX_COUNTS_SKY)))
# insert flex_dict in flex_list at the location of the slit that failed the calculation
flex_list[sidx] = flex_dict
return flex_list
[docs]
def spec_flexure_slit_global(sciImg, waveimg, global_sky, par, slits, slitmask, trace_spat, gd_slits, wv_calib, pypeline, det):
"""Calculate the spectral flexure for every slit
Args:
sciImg (:class:`~pypeit.images.pypeitimage.PypeItImage`):
Science image.
waveimg (`numpy.ndarray`_):
Wavelength image - shape (nspec, nspat)
global_sky (`numpy.ndarray`_):
2D array of the global_sky fit - shape (nspec, nspat)
par (:class:`~pypeit.par.pypeitpar.PypeItPar`):
Parameters of the reduction.
slits (:class:`~pypeit.slittrace.SlitTraceSet`):
Slit trace set
slitmask (`numpy.ndarray`_):
An image with the slit index identified for each pixel (returned from slittrace.slit_img).
trace_spat (`numpy.ndarray`_):
Spatial pixel values (usually the center of each slit) where the sky spectrum will be extracted.
The shape of this array should be (nspec, nslits)
gd_slits (`numpy.ndarray`_):
True = good slit
wv_calib (:class:`pypeit.wavecalib.WaveCalib`):
Wavelength calibration
pypeline (:obj:`str`):
Name of the ``PypeIt`` pipeline method. Allowed options are
MultiSlit, Echelle, or IFU.
det (:obj:`str`):
The name of the detector or mosaic from which the spectrum will be
extracted. For example, DET01.
Returns:
:obj:`list`: A list of :obj:`dict` objects containing flexure
results of each slit. This is filled with a basically empty
dict if the slit is skipped.
"""
# TODO :: Need to think about spatial flexure - is the appropriate spatial flexure already included in trace_spat via left/right slits?
slit_specs = []
# get boxcar radius
box_radius = par['reduce']['extraction']['boxcar_radius']
for ss in range(slits.nslits):
if not gd_slits[ss]:
slit_specs.append(None)
continue
thismask = (slitmask == slits.spat_id[ss])
inmask = sciImg.select_flag(invert=True) & thismask
# Pack
slit_specs.append(get_sky_spectrum(sciImg.image, sciImg.ivar, waveimg, inmask, global_sky,
box_radius, slits, trace_spat[:, ss], pypeline, det))
# Measure flexure
flex_list = spec_flexure_slit(slits, slits.slitord_id, np.logical_not(gd_slits),
par['flexure']['spectrum'],
method=par['flexure']['spec_method'],
mxshft=par['flexure']['spec_maxshift'],
excess_shft=par['flexure']['excessive_shift'],
specobjs=None, slit_specs=slit_specs, wv_calib=wv_calib,
minwave=par['flexure']['minwave'],
maxwave=par['flexure']['maxwave'])
return flex_list
[docs]
def get_archive_spectrum(sky_file, obj_skyspec=None, spec_fwhm_pix=None):
""" Load an archival sky spectrum
Args:
sky_file (:obj:`str`):
Name of the archival sky file. If equal to 'model', instead,
a model sky spectrum will be generated using :func:`~pypeit.wavemodel.nearIR_modelsky`
and the spectral resolution of obj_skyspec. If obj_skyspec is None, then
sky_file cannot be 'model'.
obj_skyspec (`linetools.spectra.xspectrum1d.XSpectrum1d`_, optional):
Sky spectrum associated with the science target. This must be provided if sky_file is 'model'.
spec_fwhm_pix (:obj:`float`, optional):
Spectral FWHM (in pixels) of the sky spectrum related to our object.
Returns:
tuple: The sky spectrum (`linetools.spectra.xspectrum1d.XSpectrum1D`_)
and the FWHM (float) of the sky lines in pixels.
"""
if sky_file != 'model':
# Load Archive. Save the fwhm to avoid the performance hit from calling it on the archive sky spectrum
# multiple times
sky_spectrum = io.load_sky_spectrum(sky_file)
# get arxiv sky spectrum resolution (FWHM in pixels)
arx_fwhm_pix = autoid.measure_fwhm(sky_spectrum.flux.value, sigdetect=4., fwhm=4.)
if arx_fwhm_pix is None:
msgs.error('Failed to measure the spectral FWHM of the archived sky spectrum. '
'Not enough sky lines detected.')
elif obj_skyspec is not None:
if spec_fwhm_pix is None:
# measure spec_fwhm_pix
spec_fwhm_pix = autoid.measure_fwhm(obj_skyspec.flux.value, sigdetect=4., fwhm=4.)
if spec_fwhm_pix is None:
msgs.warn('Failed to measure the spectral FWHM using the boxcar extracted sky spectrum. '
'Choose one of the provided sky files.')
# get the spectral resolution of obj_skyspec
# obj_skyspec spectral dispersion (Angstrom/pixel)
obj_disp = np.median(np.diff(obj_skyspec.wavelength.value))
# FWHM
spec_fwhm = spec_fwhm_pix * obj_disp
# Compute the resolution at the midpoints of the spectrum in the spectral direction
midpix = obj_skyspec.wavelength.value.size // 2
# R = lambda / dlambda
res = obj_skyspec.wavelength.value[midpix] / spec_fwhm
# get model sky spectrum
wave_sky, flux_sky = wavemodel.nearIR_modelsky(res,
(obj_skyspec.wavelength.value.min() / 10000.,
obj_skyspec.wavelength.value.max() / 10000.),
dlam=obj_disp / 10000., flgd=False)
sky_spectrum = xspectrum1d.XSpectrum1D.from_tuple((wave_sky, flux_sky))
arx_fwhm_pix = spec_fwhm_pix
else:
msgs.error('Archived sky spectrum cannot be loaded. ')
return sky_spectrum, arx_fwhm_pix
[docs]
def get_sky_spectrum(sciimg, ivar, waveimg, thismask, global_sky, box_radius, slits, trace_spat, pypeline, det):
""" Obtain a boxcar extraction of the sky spectrum
Args:
sciimg (`numpy.ndarray`_):
Science image - shape (nspec, nspat)
ivar (`numpy.ndarray`_):
Inverse variance of the science image - shape (nspec, nspat)
waveimg (`numpy.ndarray`_):
Wavelength image - shape (nspec, nspat)
thismask (`numpy.ndarray`_):
Good pixel mask (True=good) that indicates the pixels that should be included in the boxcar extraction
global_sky (`numpy.ndarray`_):
2D array of the global_sky fit - shape (nspec, nspat)
box_radius (float):
Radius of the boxcar extraction (in pixels)
slits (:class:`~pypeit.slittrace.SlitTraceSet`):
Slit trace set
trace_spat (`numpy.ndarray`_):
Spatial pixel values (usually the center of each slit) where the sky spectrum will be extracted.
The shape of this array should be (nspec, nslits)
pypeline (:obj:`str`):
Name of the ``PypeIt`` pipeline method. Allowed options are
MultiSlit, Echelle, or IFU.
det (:obj:`str`):
The name of the detector or mosaic from which the spectrum will be
extracted. For example, DET01.
Returns:
(`linetools.spectra.xspectrum1d.XSpectrum1D`_): Sky spectrum
"""
spec = specobj.SpecObj(PYPELINE=pypeline, SLITID=-1, DET=str(det))
spec.trace_spec = np.arange(slits.nspec)
spec.TRACE_SPAT = trace_spat
spec.BOX_RADIUS = box_radius
# Extract
extract.extract_boxcar(sciimg, ivar, thismask, waveimg, global_sky, spec)
slit_wave, slit_sky = spec.BOX_WAVE[spec.BOX_MASK], spec.BOX_COUNTS_SKY[spec.BOX_MASK]
# TODO :: Need to remove this XSpectrum1D dependency - it is required in: flexure.spec_flex_shift
obj_skyspec = xspectrum1d.XSpectrum1D.from_tuple((slit_wave, slit_sky))
return obj_skyspec
[docs]
def spec_flexure_corrQA(ax, this_flex_dict, cntr, name):
# Fit
fit = this_flex_dict['polyfit'][cntr]
if fit is not None:
xval = np.linspace(-10., 10, 100) + this_flex_dict['corr_cen'][cntr] + this_flex_dict['shift'][cntr]
# model = (fit[2]*(xval**2.))+(fit[1]*xval)+fit[0]
model = fit.eval(xval)
# model = utils.func_val(fit, xval, 'polynomial')
mxmod = np.max(model)
ylim_min = np.min(model / mxmod) if np.isfinite(np.min(model / mxmod)) else 0.0
ylim = [ylim_min, 1.3]
ax.plot(xval - this_flex_dict['corr_cen'][cntr], model / mxmod, 'k-')
# Measurements
ax.scatter(this_flex_dict['subpix'][cntr] - this_flex_dict['corr_cen'][cntr],
this_flex_dict['corr'][cntr] / mxmod, marker='o')
# Final shift
ax.plot([this_flex_dict['shift'][cntr]] * 2, ylim, 'g:')
# Label
ax.text(0.5, 0.25, name, transform=ax.transAxes, size='large', ha='center')
ax.text(0.5, 0.15, 'flex_shift = {:g}'.format(this_flex_dict['shift'][cntr]),
transform=ax.transAxes, size='large', ha='center') # , bbox={'facecolor':'white'})
# Axes
ax.set_ylim(ylim)
ax.set_xlabel('Lag')
else:
ax.text(0.5, 0.25, name, transform=ax.transAxes, size='large', ha='center')
ax.text(0.5, 0.15, 'flex_shift calculation failed', transform=ax.transAxes, size='large', ha='center')
# Axes
ax.set_xlabel('Lag')
[docs]
def spec_flexure_qa(slitords, bpm, basename, flex_list,
specobjs=None, out_dir=None):
"""
Generate QA for the spectral flexure calculation
Args:
slitords (`numpy.ndarray`_):
Array of slit/order numbers
bpm (`numpy.ndarray`_):
Boolean mask; True = masked slit
basename (str):
Used to generate the output file name
flex_list (list):
list of :obj:`dict` objects containing the flexure information
specobjs (:class:`~pypeit.specobjs.SpecObjs`, optional):
Spectrally extracted objects
out_dir (str, optional):
Path to the output directory for the QA plots. If None, the current
is used.
"""
plt.rcdefaults()
plt.rcParams['font.family'] = 'serif'
# What type of QA are we doing
slit_cen = False
if specobjs is None: slit_cen = True
# Grab the named of the method
method = inspect.stack()[0][3]
# Mask
gdslits = np.where(np.invert(bpm))[0]
# Loop over slits, and then over objects here
for islit in gdslits:
# Slit/order number
slitord = slitords[islit]
this_flex_dict = flex_list[islit]
# Check that the default was overwritten
if len(this_flex_dict['shift']) == 0 or \
(len(this_flex_dict['shift']) > 0 and np.all([ss is None for ss in this_flex_dict['shift']])):
continue
# Parse and Setup
if slit_cen:
nobj = 1
ncol = 1
else:
indx = specobjs.slitorder_indices(slitord)
this_specobjs = specobjs[indx]
nobj = np.sum(indx)
if nobj == 0:
continue
ncol = min(3, nobj)
nrow = nobj // ncol + ((nobj % ncol) > 0)
# Outfile, one QA file per slit
outfile = qa.set_qa_filename(basename, method + '_corr', slit=slitord, out_dir=out_dir)
plt.figure(figsize=(8, 5.0))
plt.clf()
gs = gridspec.GridSpec(nrow, ncol)
# Correlation QA
if slit_cen:
ax = plt.subplot(gs[0, 0])
spec_flexure_corrQA(ax, this_flex_dict, 0, 'Slit Center')
else:
iplt = 0
for ss, specobj in enumerate(this_specobjs):
if specobj is None or (specobj.BOX_WAVE is None and specobj.OPT_WAVE is None):
continue
ax = plt.subplot(gs[iplt//ncol, iplt % ncol])
spec_flexure_corrQA(ax, this_flex_dict, ss, '{:s}'.format(specobj.NAME))
iplt += 1
# Finish
plt.tight_layout(pad=0.2, h_pad=0.0, w_pad=0.0)
plt.savefig(outfile)#, dpi=400)
plt.close()
# Sky line QA (just one object)
if slit_cen:
iobj = 0
else:
# only show the first object in this slit that does not have None shift
iobj = np.where([ss is not None for ss in this_flex_dict['shift']])[0][0]
specobj = this_specobjs[iobj]
# Repackage
sky_spec = this_flex_dict['sky_spec'][iobj]
arx_spec = this_flex_dict['arx_spec'][iobj]
min_wave = max(np.amin(arx_spec.wavelength.value), np.amin(sky_spec.wavelength.value))*units.AA
max_wave = min(np.amax(arx_spec.wavelength.value), np.amax(sky_spec.wavelength.value))*units.AA
# Sky lines
sky_lines = np.array([3370.0, 3914.0, 4046.56, 4358.34, 5577.338, 6300.304,
7340.885, 7993.332, 8430.174, 8919.610, 9439.660,
10013.99, 10372.88])*units.AA
dwv = 20.*units.AA
gdsky = np.where((sky_lines > min_wave) & (sky_lines < max_wave))[0]
if len(gdsky) == 0:
msgs.warn("No sky lines for Flexure QA")
continue
if len(gdsky) > 6:
idx = np.array([0, 1, len(gdsky)//2, len(gdsky)//2+1, -2, -1])
gdsky = gdsky[idx]
# Outfile
outfile = qa.set_qa_filename(basename, method+'_sky', slit=slitord, out_dir=out_dir)
# Figure
plt.figure(figsize=(8, 5.0))
plt.clf()
nrow, ncol = 2, 3
gs = gridspec.GridSpec(nrow, ncol)
if slit_cen:
plt.suptitle('Sky Comparison for Slit Center', y=0.99)
else:
plt.suptitle('Sky Comparison for {:s}'.format(specobj.NAME), y=0.99)
for ii, igdsky in enumerate(gdsky):
skyline = sky_lines[igdsky]
ax = plt.subplot(gs[ii//ncol, ii % ncol])
# Norm
pix1 = np.where(np.abs(sky_spec.wavelength-skyline) < dwv)[0]
pix2 = np.where(np.abs(arx_spec.wavelength-skyline) < dwv)[0]
f1 = np.sum(sky_spec.flux[pix1])
f2 = np.sum(arx_spec.flux[pix2])
norm = f1/f2
# Plot
ax.plot(sky_spec.wavelength[pix1], sky_spec.flux[pix1], 'k-', label='Obj',
drawstyle='steps-mid')
ax.plot(arx_spec.wavelength[pix2], arx_spec.flux[pix2]*norm, 'r-', label='Arx',
drawstyle='steps-mid')
# Axes
ax.xaxis.set_major_locator(plt.MultipleLocator(dwv.value))
ax.set_xlabel('Wavelength')
ax.set_ylabel('Counts')
# Legend
plt.legend(loc='upper left', scatterpoints=1, borderpad=0.3,
handletextpad=0.3, fontsize='small', numpoints=1)
# Finish
plt.tight_layout(pad=0.2, h_pad=0.0, w_pad=0.0)
plt.savefig(outfile)#, dpi=400)
plt.close()
msgs.info("Wrote spectral flexure QA: {}".format(outfile))
plt.rcdefaults()
[docs]
def calculate_image_phase(imref, imshift, gpm_ref=None, gpm_shift=None, maskval=None):
"""
Perform a masked cross-correlation and optical flow calculation to robustly
estimate the subpixel shifts of two images.
If gpm_ref, gpm_shift, and maskval are all None, no pixels will be masked
This routine (optionally) requires skimage to calculate the image phase. If
skimage is not installed, a standard (unmasked) cross-correlation is used.
Parameters
----------
im_ref : `numpy.ndarray`_
Reference image
imshift : `numpy.ndarray`_
Image that we want to measure the shift of (relative to im_ref)
gpm_ref : `numpy.ndarray`_
Mask of good pixels (True = good) in the reference image
gpm_shift : `numpy.ndarray`_
Mask of good pixels (True = good) in the shifted image
maskval : float, optional
If gpm_ref and gpm_shift are both None, a single value can be specified
and this value will be masked in both images.
Returns
-------
ra_diff : float
Relative shift (in pixels) of image relative to im_ref (x direction).
In order to align image with im_ref, ra_diff should be added to the
x-coordinates of image
dec_diff : float
Relative shift (in pixels) of image relative to im_ref (y direction).
In order to align image with im_ref, dec_diff should be added to the
y-coordinates of image
"""
# Do some checks first
try:
from skimage.registration import optical_flow_tvl1, phase_cross_correlation
except ImportError:
msgs.warn("scikit-image is not installed. Adopting a basic image cross-correlation")
return calculate_image_offset(imref, imshift)
if imref.shape != imshift.shape:
msgs.warn("Input images shapes are not equal. Adopting a basic image cross-correlation")
return calculate_image_offset(imref, imshift)
# Set the masks
if gpm_ref is None:
gpm_ref = np.ones(imref.shape, dtype=bool) if maskval is None else imref != maskval
if gpm_shift is None:
gpm_shift = np.ones(imshift.shape, dtype=bool) if maskval is None else imshift != maskval
# Get a crude estimate of the shift
shift, _, _ = phase_cross_correlation(imref, imshift, reference_mask=gpm_ref, moving_mask=gpm_shift)
shift = shift.astype(int)
# Extract the overlapping portion of the images
exref = imref.copy()
exshf = imshift.copy()
if shift[0] != 0:
if shift[0] < 0:
exref = exref[:shift[0], :]
exshf = exshf[-shift[0]:, :]
else:
exref = exref[shift[0]:, :]
exshf = exshf[:-shift[0], :]
if shift[1] != 0:
if shift[1] < 0:
exref = exref[:, :shift[1]]
exshf = exshf[:, -shift[1]:]
else:
exref = exref[:, shift[1]:]
exshf = exshf[:, :-shift[1]]
# Compute the flow vector for a fine correction to the cross-correlation
v, u = optical_flow_tvl1(exref, exshf)
shift = shift.astype(float)
shift[0] -= np.median(v)
shift[1] -= np.median(u)
# Return the total estimated shift
return shift[0], shift[1]
[docs]
def calculate_image_offset(im_ref, image, nfit=3):
"""Calculate the x,y offset between two images
Args:
im_ref (`numpy.ndarray`_):
Reference image
image (`numpy.ndarray`_):
Image that we want to measure the shift of (relative to im_ref)
nfit (int, optional):
Number of pixels (left and right of the maximum) to include in
fitting the peak of the cross correlation.
Returns:
tuple: Returns two floats, the x and y offset of the image.
- ra_diff -- Relative shift (in pixels) of image relative to im_ref (x direction).
In order to align image with im_ref, ra_diff should be added to the
x-coordinates of image
- dec_diff -- Relative shift (in pixels) of image relative to im_ref (y direction).
In order to align image with im_ref, dec_diff should be added to the
y-coordinates of image
"""
# Subtract median (should be close to zero, anyway)
image -= np.median(image)
im_ref -= np.median(im_ref)
# cross correlate (note, convolving seems faster)
ccorr = scipy.signal.correlate2d(im_ref, image, boundary='fill', mode='same')
#ccorr = scipy.signal.fftconvolve(im_ref, image[::-1, ::-1], mode='same')
# Find the maximum
amax = np.unravel_index(np.argmax(ccorr), ccorr.shape)
# Extract a small region around the maximum, and check the limits
xlo, xhi = amax[0]-nfit, amax[0] + nfit+1
ylo, yhi = amax[1]-nfit, amax[1] + nfit+1
if xlo < 0: xlo = 0
if xhi > ccorr.shape[0]-1: xhi = ccorr.shape[0]-1
if ylo < 0: ylo = 0
if yhi > ccorr.shape[1]-1: yhi = ccorr.shape[1]-1
x = np.arange(xlo, xhi)
y = np.arange(ylo, yhi)
# Setup some initial parameters
initial_guess = (np.max(ccorr), amax[0], amax[1], 3, 3, 0, 0)
xx, yy = np.meshgrid(x, y, indexing='ij')
# Fit the neighborhood of the maximum with a Gaussian to calculate the offset
popt, _ = opt.curve_fit(fitting.twoD_Gaussian, (xx, yy), ccorr[xlo:xhi, ylo:yhi].ravel(), p0=initial_guess)
# Return the RA and DEC shift, in pixels
xoff = 1 - (ccorr.shape[0] % 2) # Need to add 1 for even shaped array
yoff = 1 - (ccorr.shape[1] % 2) # Need to add 1 for even shaped array
return xoff + popt[1] - ccorr.shape[0]//2, yoff+popt[2] - ccorr.shape[1]//2
[docs]
def sky_em_residuals(wave:np.ndarray, flux:np.ndarray,
ivar:np.ndarray, sky_waves:np.ndarray,
plot=False, noff=5., nfit_min=20):
"""Calculate residuals and other metrics for a set of
input sky emission lines
Args:
wave (`numpy.ndarray`_):
Wavelengths (in air!)
flux (`numpy.ndarray`_):
Fluxes
ivar (`numpy.ndarray`_):
Inverse variance
sky_waves (`numpy.ndarray`_):
Skyline wavelengths (in air!)
plot (bool, optional):
If true, plot the residuals
noff (int, optional):
Range in Ang to analyze labout emission line. Defaults to 5.
nfit_min (int, optional):
Minimum number of pixels required to do a fit. Defaults to 20.
Returns:
tuple of `numpy.ndarray`_ -- sky line wavelength of good lines, wavelength offset,
error in wavelength offset, sky line width,
error in sky line width
"""
dwave = []
diff = []
diff_err = []
los = []
los_err= []
good_ivar = ivar > 0
# Loop on known sky lines
for line in sky_waves:
wline = [line-noff,line+noff]
mw = (wave > wline[0]) & (wave < wline[1]) & good_ivar
# Reuire minimum number
if np.sum(mw) <= nfit_min:
continue
p=[0,0,0,0]
# Guess
p0 = list(fitting.guess_gauss(wave[mw], flux[mw]))
# Fit
try:
p, pcov = fitting.fit_gauss(wave[mw], flux[mw], w_out=1./np.sqrt(ivar[mw]),
guesses=p0, nparam=4)
except RuntimeError as e:
msgs.warn('First attempt at Gaussian fit failed, ending with RuntimeError. Original '
f'exception: {e.args[0]} Assuming this is because it hit the maximum '
'number of function evaluations. Trying again with a maximum of 10000.')
# Try again with larger limit on the number of function evaluations
p, pcov = fitting.fit_gauss(wave[mw], flux[mw], w_out=1./np.sqrt(ivar[mw]),
guesses=p0, nparam=4, maxfev=10000)
perr = np.sqrt(np.diag(pcov))
#except:
# p=p0
# p[2] = -99
# perr=p0
# Continue
d = p[2] - line
# For debugging
if plot:
gfit = fitting.gauss_4deg(wave[mw],*p)
plt.figure(figsize=(8,3))
plt.plot(wave[mw],gfit,'g')
plt.plot(wave[mw],flux[mw])
plt.title('{} {:0.2f} diff= {:0.3f}'.format(line,p[3],d))
plt.show()
# Check
if not np.isfinite(perr[2]):
perr[2] = 1000.
# Save
dwave = np.append(dwave,line)
diff = np.append(diff,d)
diff_err = np.append(diff_err,perr[2])
los = np.append(los,p[3])
los_err = np.append(los_err,perr[3])
# Cut on quality
m=(diff_err < 0.1) & (diff_err > 0.0)
# Return
return dwave[m], diff[m], diff_err[m], los[m], los_err[m]
# TODO -- Consider separating the methods from the DataContainer as per calibrations
[docs]
class MultiSlitFlexure(DataContainer):
"""
Class to perform multi-detector flexure analysis.
Based on code written by Marla Geha for DEIMOS.
The datamodel attributes are:
.. include:: ../include/class_datamodel_multislitflexure.rst
"""
# Set the version of this class
version = '1.1.0'
datamodel = {'s1dfile': dict(otype=str, descr='spec1d filename'),
'PYP_SPEC': dict(otype=str, descr='PypeIt spectrograph name'),
'ndet': dict(otype=int, descr='Number of detectors per spectrum'),
'nslits': dict(otype=int, descr='Number of slits'),
'is_msc': dict(otype=np.ndarray, atype=(int, np.integer),
descr='Flag that the "det" is the mosaic ID (ndet, nslits)'),
'det': dict(otype=np.ndarray, atype=(int, np.integer),
descr='Integer identifiers for the detector or mosaic (ndet, nslits)'),
'SN': dict(otype=np.ndarray, atype=np.floating, descr='S/N (ndet, nslits)'),
'slitid': dict(otype=np.ndarray, atype=np.floating, descr='Slit ID (nslits)'),
'mn_wv': dict(otype=np.ndarray, atype=np.floating,
descr='Mininum wavelength of the slit [Ang] (nslits)'),
'indiv_fit_slope': dict(otype=np.ndarray, atype=np.floating,
descr='Fits to each slit individually (nslits)'),
'indiv_fit_b': dict(otype=np.ndarray, atype=np.floating,
descr='Same as above but for b (nslits)'),
'indiv_fit_los': dict(otype=np.ndarray, atype=np.floating,
descr='Same as above but for line width (nslits)'),
'fit_slope': dict(otype=np.ndarray, atype=np.floating,
descr='Fitted slope (nslits)'),
'fit_b': dict(otype=np.ndarray, atype=np.floating,
descr='Fitted b value(nslits)'),
'fit_los': dict(otype=np.ndarray, atype=np.floating,
descr='Fitted line width(nslits)'),
'resid_sky': dict(otype=np.ndarray, atype=np.floating,
descr='Residuals of flexure model on sky lines (nslits)'),
'objra': dict(otype=np.ndarray, atype=np.floating, descr='Object RA (nslits)'),
'objdec': dict(otype=np.ndarray, atype=np.floating, descr='Object DEC (nslits)'),
'maskdef_id': dict(otype=np.ndarray, atype=np.integer, descr='Mask ID (nslits)'),
'rms_arc': dict(otype=np.ndarray, atype=np.floating,
descr='RMS of fit (ndet, nslits)')}
internals = ['flex_par', # Parameters (FlexurePar)
'spectrograph', # spectrograph
'specobjs', # SpecObjs object
'sobj_idx', # (ndet, nslits); Index to specobjs (tuple of arrays)
'sky_table', # Sky line table
# 2D models
'pmodel_m',
'pmodel_b',
'pmodel_l'
]
def __init__(self, s1dfile=None, PYP_SPEC=None, nslits=None, det=None,
SN=None, slitid=None, mn_wv=None, fit_slope=None, fit_b=None,
fit_los=None, objra=None, objdec=None, maskdef_id=None, rms_arc=None,
resid_sky=None, indiv_fit_slope=None, indiv_fit_b=None,
indiv_fit_los=None):
# Setup the DataContainer
args, _, _, values = inspect.getargvalues(inspect.currentframe())
_d = {k: values[k] for k in args[1:]}
# Init
super().__init__(d=_d)
# Load up specobjs
self.specobjs = specobjs.SpecObjs.from_fitsfile(self.s1dfile, chk_version=False)
# Sky lines -- This one is ASCII, so don't use load_sky_spectrum()
sky_file = 'sky_single_mg.dat'
self.sky_table = ascii.read(dataPaths.sky_spec.get_file_path(sky_file))
# NOTE: If you make changes to how this object is bundled into the output
# datamodel, make sure you update the documentation in
# doc/calibrations/flexure.rst!
[docs]
def _bundle(self):
"""
Override the base class method simply to set the HDU extension name.
"""
return super()._bundle(ext='FLEXURE')
[docs]
def init(self, spectrograph, par):
""" Initialize this and that about the slits, par, spectrograph
e.g. RA, DEC, S/N
Args:
spectrograph (:class:`pypeit.spectrographs.spectrograph.Spectrograph`):
The spectrograph instance that sets the instrument used to take
the observations. Used to set :attr:`spectrograph`.
par (:class:`~pypeit.par.pypeitpar.FlexurePar`):
The parameters used for the flexure processing
"""
# Internals
self.spectrograph = spectrograph
self.flex_par = par
# Set
self.PYP_SPEC = self.spectrograph.name
self.sobj_idx = self.spectrograph.spec1d_match_spectra(self.specobjs)
#
self.nslits = len(self.sobj_idx[0])
self.ndet = len(self.sobj_idx)
# Fill in 1D
self['slitid'] = self.specobjs[self.sobj_idx[0]]['SLITID'].astype(float)
self['objra'] = self.specobjs[self.sobj_idx[0]]['RA']
self['objdec'] = self.specobjs[self.sobj_idx[0]]['DEC']
#self['slitname'] = self.specobjs[self.sobj_idx[0]]['MASKDEF_OBJNAME']
self['maskdef_id'] = self.specobjs[self.sobj_idx[0]]['MASKDEF_ID']
# Compile the list of detector *names* once
DETs = self.specobjs.DET
# Find which ones are actually mosaics
is_msc = np.array([Mosaic.name_prefix in d for d in DETs]).astype(np.uint16)
# Use the relevant parser to get the integer identifier
det_msc_num = np.array([Mosaic.parse_name(d) if m else DetectorContainer.parse_name(d)
for d,m in zip(DETs, is_msc)])
# Then assign the attributes
self.is_msc = np.vstack(tuple(is_msc[self.sobj_idx[det]] for det in range(self.ndet)))
self.det = np.vstack(tuple(det_msc_num[self.sobj_idx[det]] for det in range(self.ndet)))
# S/N and mn_wv from the spectra
self['SN'] = np.zeros((self.ndet, self.nslits), dtype=float)
self['mn_wv'] = np.zeros((self.ndet, self.nslits), dtype=float)
for det in range(self.ndet):
self['SN'][det] = [sobj.S2N for sobj in self.specobjs[self.sobj_idx[det]]]
self['mn_wv'][det] = [sobj.mnx_wave[0] for sobj in self.specobjs[self.sobj_idx[det]]]
[docs]
def fit_mask_surfaces(self):
"""
Fit 2D model to linear flexure models from each slit as a function of
RA, DEC.
"""
# Cut on S/N
good_SN = self['SN'] > self.flex_par['multi_min_SN']
good_slit = np.sum(good_SN, axis=0) == self.ndet
# Basic stats
mu = np.median(self['indiv_fit_slope'][good_slit])
sd = np.std(self['indiv_fit_slope'][good_slit])
mu2 = np.median(self['indiv_fit_b'][good_slit])
sd2 = np.std(self['indiv_fit_b'][good_slit])
# Cut down to +/- 2sigma
mgood = (np.abs(self['indiv_fit_slope']-mu) < 2.*sd) \
& ( np.abs(self['indiv_fit_b']-mu2) < 2.*sd2) & good_slit
# Fit me (without additional rejection)
# TODO -- Allow for x,y position instead of RA, DEC
self.pmodel_m = fitting.robust_fit(self['objra'][mgood],
self['indiv_fit_slope'][mgood], (2,2),
function='polynomial2d',
x2=self['objdec'][mgood])
self.pmodel_b = fitting.robust_fit(self['objra'][mgood],
self['indiv_fit_b'][mgood], (2,2),
function='polynomial2d',
x2=self['objdec'][mgood])
self.pmodel_l = fitting.robust_fit(self['objra'][mgood],
self['indiv_fit_los'][mgood], (2,2),
function='polynomial2d',
x2=self['objdec'][mgood])
[docs]
def measure_sky_lines(self):
"""Main method to analyze the sky lines for all the slits
"""
# Init
for key in ['indiv_fit_slope', 'indiv_fit_b', 'indiv_fit_los']:
self[key] = np.zeros(self.nslits)
# Loop on slits
for i in np.arange(0,self.nslits,1):
if (i % 10) == 0:
msgs.info("Working on slit {} of {}".format(i, self.nslits))
if not np.all(self['SN'][:,i] > 1.):
continue
# Loop on detectors
sky_lines, sky_diffs, sky_ediffs, sky_loss = [], [], [], []
for det in range(self.ndet):
sobj = self.specobjs[self.sobj_idx[det][i]]
# Measure em
# The following will break if only boxcar...
# TODO -- Allow for boxcar
sky_line, sky_diff, sky_ediff, los, _ = sky_em_residuals(
sobj['OPT_WAVE'],
sobj['OPT_COUNTS_SKY'],
sobj['OPT_COUNTS_IVAR'],
self.sky_table['Wave'])
# Hold em
sky_lines.append(sky_line)
sky_diffs.append(sky_diff)
sky_ediffs.append(sky_ediff)
sky_loss.append(los)
# Concatenate
sky_lines = np.concatenate(sky_lines)
sky_diffs = np.concatenate(sky_diffs)
sky_ediffs = np.concatenate(sky_ediffs)
sky_loss = np.concatenate(sky_loss)
# FIT SINGLE SLIT SKY LINES WITH A LINE
linear_fit = fitting.robust_fit(sky_lines,
sky_diffs,
weights=1./sky_ediffs**2,
function='polynomial',
order=1,
maxrej=1, # Might increase
lower=3., upper=3.)
# Save
self['indiv_fit_b'][i] = linear_fit.fitc[0]
self['indiv_fit_slope'][i] = linear_fit.fitc[1]
self['indiv_fit_los'][i] = np.median(sky_loss)
[docs]
def update_fit(self):
"""Update fits for each slit based on 2D model
"""
# Do it
self['fit_slope'] = self.pmodel_m.eval(self['objra'],x2=self['objdec'])
self['fit_b'] = self.pmodel_b.eval(self['objra'],x2=self['objdec'])
self['fit_los'] = self.pmodel_l.eval(self['objra'],x2=self['objdec'])
# CALCULATE RESIDUALS FROM FIT
# Only for QA (I think)
resid_sky = []
for i in range(self.nslits):
# Require sufficient S/N in reddest detector
if self['SN'][-1,i] > 0:
# Load up the full spectrum
tmp_wave, all_flux, all_sky, all_ivar = np.ndarray(0), \
np.ndarray(0), np.ndarray(0), np.ndarray(0)
# TODO -- Allow for Boxcar
for det in range(self.ndet):
sobj = self.specobjs[self.sobj_idx[det][i]]
tmp_wave = np.concatenate((tmp_wave, sobj.OPT_WAVE))
all_flux = np.concatenate((all_flux, sobj.OPT_COUNTS))
all_sky = np.concatenate((all_sky, sobj.OPT_COUNTS_SKY))
all_ivar = np.concatenate((all_ivar, sobj.OPT_COUNTS_IVAR))
# Massage
fitwave = self['fit_slope'][i]*tmp_wave + self['fit_b'][i]
all_wave = tmp_wave - fitwave
# TRIM ENDS
all_wave=all_wave[5:-15]
all_flux=all_flux[5:-15]
all_ivar=all_ivar[5:-15]
all_sky=all_sky[5:-15]
# REMOVE CRAZY 500-SIGMA VALUES
cmask = (all_sky > np.percentile(all_sky,0.1)) & (all_sky < np.percentile(all_sky,99.9))
m=np.median(all_sky[cmask])
s=np.std(all_sky[cmask])
mm = (all_sky > 500.*s + m) | (all_sky < m-50.*s)
all_sky[mm] = m
all_ivar[mm] = 1e6
if (np.sum(mm) > 10):
msgs.warn('Removing more than 10 pixels of data')
_,diff,diff_err,_,_ = sky_em_residuals(all_wave, all_sky, all_ivar,
self.sky_table['Wave'])
m = np.isfinite(diff)
sky_mean = np.average(np.abs(diff[m]), weights = 1./diff_err[m]**2)
resid_sky = np.append(resid_sky,sky_mean)
else:
resid_sky = np.append(resid_sky,-1)
self['resid_sky'] = resid_sky
[docs]
def qa_plots(self, plot_dir:str, root:str):
"""Generate QA plots
Args:
plot_dir (str): Top-lvel folder for QA
QA/ is generated beneath this, as needed
root (str): Root for output files
"""
# Generate QA folder as need be
qa_dir = pathlib.Path(plot_dir) / 'QA'
if not qa_dir.is_dir():
qa_dir.mkdir(parents=True)
'''
# Slopes
pdf2 = matplotlib.backends.backend_pdf.PdfPages(os.path.join(qa_dir, 'flex_slits_'+root+'.pdf'))
plt.rcParams.update({'figure.max_open_warning': 0})
for i in np.arange(0,self.nslits,1):
if not np.all(self['SN'][:,i] > 0.):
continue
# SKY LINES FIRST
r_sky_line, r_sky_diff,r_sky_ediff,r_los,r_elos = sky_em_residuals(hdu[r].data['OPT_WAVE'], \
hdu[r].data['OPT_COUNTS_SKY'],\
hdu[r].data['OPT_COUNTS_IVAR'])
b_sky_line, b_sky_diff,b_sky_ediff,b_los,b_elos = sky_em_residuals(hdu[b].data['OPT_WAVE'], \
hdu[b].data['OPT_COUNTS_SKY'],\
hdu[b].data['OPT_COUNTS_IVAR'])
fig, (ax1,ax2) = plt.subplots(1, 2,figsize=(20,4))
ax1.plot(r_sky_line,r_sky_diff,'ro',alpha=0.8,label='Red chip: Sky Emission')
ax1.plot(b_sky_line,b_sky_diff,'bo',alpha=0.8,label='Blue chip: Sky Emission')
ax1.errorbar(b_sky_line,b_sky_diff,yerr=b_sky_ediff,fmt='none',ecolor='b',alpha=0.5)
ax1.errorbar(r_sky_line,r_sky_diff,yerr=r_sky_ediff,fmt='none',ecolor='r',alpha=0.5)
ax1.text(6320,0,'{}'.format(b),fontsize=11)
ax1.text(8500,0,'{}'.format(r),fontsize=11)
ax1.set_ylim(-0.45,0.45)
x=np.arange(6000,9000,1)
l1 = slits['fit_slope'][i]*x + slits['fit_b'][i]
l2 = fslits['fit_slope'][i]*x + fslits['fit_b'][i]
ax1.plot(x,l1,'-')
ax1.plot(x,l2,'--')
ax1.axhline(linewidth=1, color='grey',alpha=0.5)
ax1.set_ylabel('Wavelength offset (AA)')
ax1.set_xlabel('Wavelength (AA)')
ax1.set_xlim(6300,9100)
t = 'Sky Line Fits , resid = {:0.4f} AA, arc = {:0.2f}'.format(slits['resid_sky'][i],0.32*slits['rms_arc_r'][i])
ax1.set_title(t)
sky_diff = np.concatenate((r_sky_diff,b_sky_diff),axis=None)
sky_lines = np.concatenate((r_sky_line,b_sky_line),axis=None)
sky_ediff = np.concatenate((r_sky_ediff,b_sky_ediff),axis=None)
sky_los = np.concatenate((r_los,b_los),axis=None)
ax2.plot(r_sky_line,r_los,'ro',alpha=0.8,label='Red chip: Sky Emission')
ax2.plot(b_sky_line,b_los,'bo',alpha=0.8,label='Blue chip: Sky Emission')
ax2.errorbar(r_sky_line,r_los,yerr=r_elos,fmt='none',ecolor='r',alpha=0.5)
ax2.errorbar(b_sky_line,b_los,yerr=b_elos,fmt='none',ecolor='b',alpha=0.5)
ax2.axhline(fslits['fit_los'][i],linewidth=1, color='grey',alpha=0.5)
ax2.set_title('Line widths')
ax2.set_xlabel('Wavelength (AA)')
ax2.set_ylim(0.3,0.8)
ax2.set_xlim(6300,9100)
pdf2.savefig()
pdf2.close()
plt.close('all')
'''
#########################################################################
# CREATE FULL MASK FITS
pdf = matplotlib.backends.backend_pdf.PdfPages(
plot_dir+'QA/flex_mask_'+root+'.pdf')
xslit = self['objra']
yslit = self['objdec']
t=2.
mu = np.median(self['indiv_fit_slope'])
sd = np.std(self['indiv_fit_slope'])
mu2 = np.median(self['indiv_fit_b'])
sd2 = np.std(self['indiv_fit_b'])
mu3 = np.median(self['indiv_fit_los'])
sd3 = np.std(self['indiv_fit_los'])
# PLOT FITTED VALUES
fig, (ax1,ax2,ax3) = plt.subplots(1, 3,figsize=(22,5))
mm1=-0.00005
mm2=0.00005
print(mu-t*sd,mu+t*sd)
ax1.scatter(xslit,yslit,c=self['indiv_fit_slope'],
cmap="cool",vmin = mm1,vmax=mm2 )# mu-t*sd,vmax=mu+t*sd)
ax1.set_ylabel('Dec [deg]')
ax1.set_xlabel('RA [deg]')
ax1.set_title('Wave MEASURE: line slope')
#cax, _ = matplotlib.colorbar.make_axes(ax1)
#normalize = matplotlib.colors.Normalize(vmin = mu-t*sd,vmax=mu+t*sd)
#cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='cool',norm=normalize)
ax2.scatter(xslit,yslit,c=self['indiv_fit_b'],cmap="summer",
vmin = mu2-t*sd2,vmax=mu2+t*sd2)
ax2.set_ylabel('Dec [deg]')
ax2.set_xlabel('RA [deg]')
ax2.set_title('Wave MEASURE: line intercept')
cax, _ = matplotlib.colorbar.make_axes(ax2)
normalize = matplotlib.colors.Normalize(vmin = mu2-t*sd2,vmax=mu2+t*sd2)
#cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='summer',norm=normalize)
ax3.scatter(xslit,yslit,c=self['indiv_fit_los'],cmap="cool",vmin = mu3-t*sd3,vmax=mu3+t*sd3)
ax3.set_ylabel('Dec [deg]')
ax3.set_xlabel('RA [deg]')
ax3.set_title('Wave MEASURE: line width')
cax, _ = matplotlib.colorbar.make_axes(ax3)
normalize = matplotlib.colors.Normalize(vmin = mu3-t*sd3,vmax=mu3+t*sd3)
#cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='cool',norm=normalize)
pdf.savefig()
#######################
# PLOT MEASURED VALUES
fig, (ax1,ax2,ax3) = plt.subplots(1, 3,figsize=(22,5))
ax1.scatter(xslit,yslit,c=self['fit_slope'],
cmap="cool",vmin = mu-t*sd,vmax=mu+t*sd)
ax1.set_ylabel('Dec [deg]')
ax1.set_xlabel('RA [deg]')
ax1.set_title('Wave fit: line slope')
cax, _ = matplotlib.colorbar.make_axes(ax1)
normalize = matplotlib.colors.Normalize(vmin = mu-t*sd,vmax=mu+t*sd)
#cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='cool',norm=normalize)
ax2.scatter(xslit,yslit,c=self['fit_b'],
cmap="summer",vmin = mu2-t*sd2,vmax=mu2+t*sd2)
ax2.set_ylabel('Dec [deg]')
ax2.set_xlabel('RA [deg]')
ax2.set_title('Wave fit: line intercept')
cax, _ = matplotlib.colorbar.make_axes(ax2)
normalize = matplotlib.colors.Normalize(vmin = mu2-t*sd2,vmax=mu2+t*sd2)
#cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='summer',norm=normalize)
ax3.scatter(xslit,yslit,c=self['fit_los'],
cmap="cool",vmin = mu3-t*sd3,vmax=mu3+t*sd3)
ax3.set_ylabel('Dec [deg]')
ax3.set_xlabel('RA [deg]')
ax3.set_title('Wave fit: line width')
cax, _ = matplotlib.colorbar.make_axes(ax3)
normalize = matplotlib.colors.Normalize(vmin = mu3-t*sd3,vmax=mu3+t*sd3)
#cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='cool',norm=normalize)
pdf.close()