"""
Module for performing two-dimensional coaddition of spectra.
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
from pathlib import Path
import os
import copy
from IPython import embed
import numpy as np
from scipy import ndimage
from matplotlib import pyplot as plt
from astropy.table import Table, vstack
from astropy.io import fits
from pypeit import msgs
from pypeit import utils
from pypeit import specobjs
from pypeit import slittrace
from pypeit import extraction
from pypeit import find_objects
from pypeit.images import pypeitimage
from pypeit.core import findobj_skymask
from pypeit.core.wavecal import wvutils
from pypeit.core import coadd
from pypeit.core import parse
from pypeit import spec2dobj
from pypeit.core.moment import moment1d
from pypeit.manual_extract import ManualExtractionObj
#TODO We should decide which parameters go in through the parset
# and which parameters are passed in to the method as arguments
[docs]
class CoAdd2D:
"""
Main routine to run the extraction for 2d coadds.
Algorithm steps are as follows:
- Fill this in.
This performs 2d coadd specific tasks, and then also performs some
of the tasks analogous to the pypeit.extract_one method. Docs coming
soon....
"""
# Superclass factory method generates the subclass instance
[docs]
@classmethod
def get_instance(cls, spec2dfiles, spectrograph, par, det=1,
only_slits=None, exclude_slits=None,
sn_smooth_npix=None, bkg_redux=False, find_negative=False, show=False,
show_peaks=False, debug_offsets=False, debug=False):
"""
Instantiate the subclass appropriate for the provided spectrograph.
The class to instantiate must match the ``pypeline``
attribute of the provided ``spectrograph``, and must be a
subclass of :class:`CoAdd2D`; see the parent class
instantiation for parameter descriptions.
Returns:
:class:`CoAdd2D`: One of the subclasses with
:class:`CoAdd2D` as its base.
"""
return next(c for c in cls.__subclasses__()
if c.__name__ == (spectrograph.pypeline + 'CoAdd2D'))(
spec2dfiles, spectrograph, par, det=det,
only_slits=only_slits, exclude_slits=exclude_slits,
sn_smooth_npix=sn_smooth_npix, bkg_redux=bkg_redux, find_negative=find_negative,
show=show, show_peaks=show_peaks, debug_offsets=debug_offsets, debug=debug)
def __init__(self, spec2d, spectrograph, par, det=1,
only_slits=None, exclude_slits=None,
sn_smooth_npix=None, bkg_redux=False, find_negative=False, show=False,
show_peaks=False, debug_offsets=False, debug=False):
"""
Args:
spec2d_files (:obj:`list`):
List of spec2d files or a list of
:class:`~pypeit.spec2dobj.Spec2dObj` objects.
spectrograph (:class:`~pypeit.spectrographs.spectrograph.Spectrograph`):
The instrument used to collect the data to be reduced.
par (:class:`~pypeit.par.parset.ParSet`):
Processing parameters.
det (:obj:`int`, :obj:`tuple`, optional):
The 1-indexed detector number(s) to process. If a tuple, it
must include detectors viable as a mosaic for the provided
spectrograph; see
:func:`~pypeit.spectrographs.spectrograph.Spectrograph.allowed_mosaics`.
only_slits (:obj:`list`, optional):
List of slits to coadd. It must be `slitord_id`.
exclude_slits (:obj:`list`, optional):
List of slits to exclude from coaddition. It must be `slitord_id`.
sn_smooth_npix (:obj:`int`, optional):
Number of pixels to median filter by when computing S/N used to
decide how to scale and weight spectra. If set to None, the code
will simply take 10% of the image size in the spectral
direction. TODO: for truncated echelle orders we should be
doing something more intelligent.
bkg_redux (:obj:`bool`, optional):
If True, the sciImg has been subtracted by
a background image (e.g. standard treatment in the IR)
This parameter is passed to pypeit.reduce for determining the reduction steps.
find_negative (:obj:`bool`, optional):
Do the images have negative trace as would be generated by
differencing two science frames? This parameter is passed to
pypeit.reduce for determining the reduction steps. If True, then
find and mask negative object traces.
show (:obj:`bool`, optional):
Show results in ginga
show_peaks (:obj:`bool`, optional):
Plot the QA for the object finding algorithm peak finding to the
screen.
debug_offset (:obj:`bool`, optional):
Plot QA for debugging the automatic determination of offsets to
the screen.
debug (:obj:`bool`, optional):
Show QA for debugging.
"""
# Use Cases:
# offsets
# 1) offsets = 'auto' -- auto compute offsets from brightest object (if exists)
# 2) offsets not 'auto' (i.e. a list) - use them
# -------------- only for Multislit --------------
# 3) offsets = 'maskdef_offsets' - use `maskdef_offset` saved in SlitTraceSet
# 4) offsets = 'header' - use the dither offsets recorded in the header
# ===============================================================================
# weights
# 1) weights = 'auto' -- if brightest object exists auto compute weights,
# otherwise use uniform weights
# 2) weights = 'uniform' -- use uniform weights
# 3) weights is a list - use them
self.spec2d = spec2d
self.spectrograph = spectrograph
self.par = par
# This can be a single integer for a single detector or a tuple for
# multiple detectors placed in a mosaic.
self.det = det
# This is the string name of the detector or mosaic used when saving the
# processed data to PypeIt's main output files
self.detname = self.spectrograph.get_det_name(self.det)
self.bkg_redux = bkg_redux
self.find_negative = find_negative
self.show = show
self.show_peaks = show_peaks
self.debug_offsets = debug_offsets
self.debug = debug
self.offsets = None
self.stack_dict = None
self.pseudo_dict = None
# Brightest object attributes used for both MultislitCoAdd2D and EchelleCoAdd2D
# Array with shape = (nexp,) containing spat_pixpos_id (MultiSlit) or
# ech_fracpos_id (Echelle) of the brightest object in each exposure
self.obj_id_bri = None
# Array with shape = (nexp,) containing the S/N of the brightest object in each exposure
self.snr_bar_bri = None
# This is a list of length self.nexp that is assigned by the compute_weights method
self.use_weights = None
self.wave_grid = None
self.good_slits = None
self.maskdef_offset = None
# Load the stack_dict
self.stack_dict = self.load_coadd2d_stacks(self.spec2d)
self.pypeline = self.spectrograph.pypeline
self.nexp = len(self.spec2d)
# Check that there are the same number of slits on every exposure
nslits_list = [slits.nslits for slits in self.stack_dict['slits_list']]
if not len(set(nslits_list)) == 1:
msgs.error('Not all of your exposures have the same number of slits. Check your inputs')
# This is the number of slits of the single (un-coadded) frames
self.nslits_single = nslits_list[0]
# Check that nspec is the same for all the exposures
self.nspec_array = np.array([slits.nspec for slits in self.stack_dict['slits_list']])
self.nspec_max = self.nspec_array.max()
# Check that binning is the same for all the exposures
binspec_list = [slits.binspec for slits in self.stack_dict['slits_list']]
binspat_list = [slits.binspat for slits in self.stack_dict['slits_list']]
if not len(set(binspec_list)) == 1:
msgs.error('Not all of your exposures have the same spectral binning. Check your inputs')
if not len(set(binspat_list)) == 1:
msgs.error('Not all of your exposures have the same spatial binning. Check your inputs')
self.binning = np.array([self.stack_dict['slits_list'][0].binspec,
self.stack_dict['slits_list'][0].binspat])
self.spat_ids = self.stack_dict['slits_list'][0].spat_id
# If smoothing is not input, smooth by 10% of the maximum spectral dimension
self.sn_smooth_npix = sn_smooth_npix if sn_smooth_npix is not None else 0.1*self.nspec_max
# coadded frame parameters
# get slit index that indicates which slits are good for coadding
self.good_slits = self.good_slitindx(only_slits=only_slits, exclude_slits=exclude_slits)
# get the number of slits that are going to be coadded
self.nslits_coadded = self.good_slits.size
# effective exposure time
self.exptime_coadd = self.stack_dict['exptime_coadd']
# define the wavelength grid for the 2d coadd
self.wave_grid, self.wave_grid_mid, self.dsamp = self.get_wave_grid()
# Handle the reference object
self.handle_reference_obj()
# get self.use_weights
self.compute_weights()
# get self.offsets
self.compute_offsets()
[docs]
@staticmethod
def default_par(spectrograph, inp_cfg=None, det=None, only_slits=None, exclude_slits=None):
"""
Get the default 2D coadding parameters.
Args:
spectrograph (:obj:`str`):
The PypeIt-specific name of the spectrograph used to collect the
data.
inp_cfg (:obj:`dict`, optional):
An existing set of parameters to add to.
det (:obj:`list`, :obj:`str`, :obj:`tuple`, optional):
Limit the coadding to this (set of) detector(s)/detector mosaic(s)
only_slits (:obj:`list`, :obj:`str`, optional):
Limit the coadding to this (set of) slit(s). Only_slits and exclude_slits
are mutually exclusive. If both are set, only_slits takes precedence.
exclude_slits (:obj:`list`, :obj:`str`, optional):
Exclude this (set of) slit(s) from the coadding. Only_slits and exclude_slits
are mutually exclusive. If both are set, only_slits takes precedence.
Returns:
:obj:`dict`: The default set of parameters.
"""
cfg = dict(rdx=dict(spectrograph=spectrograph))
if inp_cfg is not None:
cfg = utils.recursive_update(cfg, dict(inp_cfg))
if only_slits is not None and det is not None:
msgs.warn('only_slits and det are mutually exclusive. Ignoring det.')
_det = None
else:
_det = det
if det is not None:
cfg['rdx']['detnum'] = _det
if only_slits is not None and exclude_slits is not None:
msgs.warn('only_slits and exclude_slits are mutually exclusive. Ignoring exclude_slits.')
_exclude_slits = None
else:
_exclude_slits = exclude_slits
if only_slits is not None:
utils.add_sub_dict(cfg, 'coadd2d')
cfg['coadd2d']['only_slits'] = only_slits
if _exclude_slits is not None:
utils.add_sub_dict(cfg, 'coadd2d')
cfg['coadd2d']['exclude_slits'] = _exclude_slits
# TODO: Heliocentric for coadd2d needs to be thought through. Currently
# turning it off.
utils.add_sub_dict(cfg, 'calibrations')
utils.add_sub_dict(cfg['calibrations'], 'wavelengths')
cfg['calibrations']['wavelengths']['refframe'] = 'observed'
# TODO: Flexure correction for coadd2d needs to be thought through.
# Currently turning it off.
utils.add_sub_dict(cfg, 'flexure')
cfg['flexure']['spec_method'] = 'skip'
# TODO: This is currently the default for 2d coadds, but we need a way
# to toggle it on/off
utils.add_sub_dict(cfg, 'reduce')
utils.add_sub_dict(cfg['reduce'], 'findobj')
cfg['reduce']['findobj']['skip_skysub'] = True
return cfg
[docs]
@staticmethod
def default_basename(spec2d_files):
"""
Construct the base name of the output spec2d file produced by coadding.
Args:
spec2d_files (:obj:`list`):
The list of PypeIt spec2d files to be coadded.
Returns:
:obj:`str`: The root base name for the output coadd2d spec2d file.
"""
# Get the output basename
frsthdr = fits.getheader(spec2d_files[0])
lasthdr = fits.getheader(spec2d_files[-1])
if 'FILENAME' not in frsthdr:
msgs.error(f'Missing FILENAME keyword in {spec2d_files[0]}. Set the basename '
'using the command-line option.')
if 'FILENAME' not in lasthdr:
msgs.error(f'Missing FILENAME keyword in {spec2d_files[-1]}. Set the basename '
'using the command-line option.')
if 'TARGET' not in frsthdr:
msgs.error(f'Missing TARGET keyword in {spec2d_files[0]}. Set the basename '
'using the command-line option.')
return f"{frsthdr['FILENAME'].split('.fits')[0]}-" \
f"{lasthdr['FILENAME'].split('.fits')[0]}-{frsthdr['TARGET']}"
[docs]
@staticmethod
def output_paths(spec2d_files, par, coadd_dir=None):
"""
Construct the names and ensure the existence of the science and QA output directories.
Args:
spec2d_files (:obj:`list`):
The list of PypeIt spec2d files to be coadded. The top-level
directory for the coadd2d output directories is assumed to be
same as used by the basic reductions. For example, if one of
the spec2d files is
``/path/to/reductions/Science/spec2d_file.fits``, the parent
directory for the coadd2d directories is
``/path/to/reductions/``.
par (:class:`~pypeit.par.pypeitpar.PypeItPar`):
Full set of parameters. The only used parameters are
``par['rdx']['scidir']`` and ``par['rdx']['qadir']``. WARNING:
This also *alters* the value of ``par['rdx']['qadir']``!!
coadd_dir (:obj:`str`, optional):
Path to the directory to use for the coadd2d output.
If None, the parent of the science directory is used.
Returns:
:obj:`tuple`: Two strings with the names of (1) the science output
directory and (2) the QA output directory. The function also
creates both directories if they do not exist.
"""
# Science output directory
if coadd_dir is not None:
pypeit_scidir = Path(coadd_dir).absolute() / 'Science'
else:
pypeit_scidir = Path(spec2d_files[0]).parent
coadd_scidir = pypeit_scidir.parent / f"{par['rdx']['scidir']}_coadd"
if not coadd_scidir.exists():
coadd_scidir.mkdir(parents=True)
# QA directory
par['rdx']['qadir'] += '_coadd'
qa_path = pypeit_scidir.parent / par['rdx']['qadir'] / 'PNGs'
if not qa_path.exists():
qa_path.mkdir(parents=True)
return str(coadd_scidir), str(qa_path)
[docs]
def good_slitindx(self, only_slits=None, exclude_slits=None):
"""
This provides an array of index of slits in the un-coadded frames that are considered good for 2d coadding.
A bitmask common to all the un-coadded frames is used to determine which slits are good. Also,
If the `only_slits` parameter is provided only those slits are considered good for 2d coadding.
Args:
only_slits (:obj:`list`, optional):
List of slits to combine. It must be `slitord_id`.
Only_slits and exclude_slits are mutually exclusive.
If both are provided, only_slits takes precedence.
exclude_slits (:obj:`list`, optional):
List of slits to exclude. It must be `slitord_id`.
Only_slits and exclude_slits are mutually exclusive.
If both are provided, only_slits takes precedence.
Returns:
`numpy.ndarray`_: array of index of good slits in the un-coadded frames
"""
if exclude_slits is not None and only_slits is not None:
msgs.warn('Both `only_slits` and `exclude_slits` are provided. They are mutually exclusive. '
'Using `only_slits` and ignoring `exclude_slits`')
_exclude_slits = None
else:
_exclude_slits = exclude_slits
# This creates a unified bpm common to all frames
slits0 = self.stack_dict['slits_list'][0]
# bpm for the first frame
reduce_bpm = slits0.bitmask.flagged(slits0.mask,
and_not=slits0.bitmask.exclude_for_reducing)
for i in range(1, self.nexp):
# update bpm with the info from the other frames
slits = self.stack_dict['slits_list'][i]
reduce_bpm |= slits.bitmask.flagged(slits.mask,
and_not=slits.bitmask.exclude_for_reducing)
# these are the good slit index according to the bpm mask
good_slitindx = np.where(np.logical_not(reduce_bpm))[0]
# If we want to coadd all the good slits
if only_slits is None and _exclude_slits is None:
return good_slitindx
# If instead we want to coadd only a selected (by the user) number of slits
if only_slits is not None:
# these are the `slitord_id` of the slits that we want to coadd
_only_slits = np.atleast_1d(only_slits)
# create an array of slit index that are selected by the user and are also good slits
good_onlyslits = np.array([], dtype=int)
msgs.info('Coadding only the following slits:')
for islit in _only_slits:
if islit not in slits0.slitord_id[good_slitindx]:
# Warnings for the slits that are selected by the user but NOT good slits
msgs.warn('Slit {} cannot be coadd because masked'.format(islit))
else:
msgs.info(f'Slit {islit}')
indx = np.where(slits0.slitord_id[good_slitindx] == islit)[0]
good_onlyslits = np.append(good_onlyslits, good_slitindx[indx])
return good_onlyslits
# if we want to exclude some slits (selected by the user) from coadding
# these are the `slitord_id` of the slits that we want to exclude
_exclude_slits = np.atleast_1d(_exclude_slits)
# create an array of slit index that are excluded by the user
exclude_slitindx = np.array([], dtype=int)
msgs.info('Excluding the following slits:')
for islit in _exclude_slits:
if islit in slits0.slitord_id[good_slitindx]:
msgs.info(f'Slit {islit}')
exclude_slitindx = np.append(exclude_slitindx,
np.where(slits0.slitord_id[good_slitindx] == islit)[0][0])
# these are the good slit index excluding the slits that are selected by the user
return np.delete(good_slitindx, exclude_slitindx)
[docs]
def optimal_weights(self, uniq_obj_id, order=None, weight_method='auto'):
"""
Determine optimal weights for 2d coadds. This script grabs the information from SpecObjs list for the
object with specified uniq_obj_id and passes to coadd.sn_weights to determine the optimal weights for
each exposure.
Parameters
----------
uniq_obj_id : `numpy.ndarray`_
Array of unique object IDs with shape = (nexp,) of the
brightest object whose S/N will be used to determine the
weight for each frame.
order : `int`, optional
The order of the object. Needed to accomodate echelle.
weight_method : `str`, optional
Weight method to be used in :func:`~pypeit.coadd.sn_weights`.
Options are ``'auto'``, ``'constant'``, ``'relative'``, or
``'ivar'``. The default is ``'auto'``. Behavior is as follows:
- ``'auto'``: Use constant weights if rms_sn < 3.0, otherwise
use wavelength dependent.
- ``'constant'``: Constant weights based on rms_sn**2
- ``'uniform'``: Uniform weighting.
- ``'wave_dependent'``: Wavelength dependent weights will be
used irrespective of the rms_sn ratio. This option will not
work well at low S/N ratio although it is useful for objects
where only a small fraction of the spectral coverage has high
S/N ratio (like high-z quasars).
- ``'relative'``: Calculate weights by fitting to the ratio of
spectra? Note, relative weighting will only work well when
there is at least one spectrum with a reasonable S/N, and a
continuum. RJC note - This argument may only be better when
the object being used has a strong continuum + emission lines.
The reference spectrum is assigned a value of 1 for all
wavelengths, and the weights of all other spectra will be
determined relative to the reference spectrum. This is
particularly useful if you are dealing with highly variable
spectra (e.g. emission lines) and require a precision better
than ~1 per cent.
- ``'ivar'``: Use inverse variance weighting. This is not well
tested and should probably be deprecated.
Returns
-------
rms_sn : `numpy.ndarray`_
Array of root-mean-square S/N value for each input spectra. Shape = (nexp,)
weights : list
List of len(nexp) containing the signal-to-noise squared weights to be
applied to the spectra. This output is aligned with the vector (or
vectors) provided in waves which is read in by this routine, i.e. it is a
list of arrays of type `numpy.ndarray`_ with the same shape as those in waves.
"""
# Grab the traces, flux, wavelength and noise for this uniq_obj_id.
waves, fluxes, ivars, gpms = [], [], [], []
for iexp, sobjs in enumerate(self.stack_dict['specobjs_list']):
ithis = sobjs.slitorder_uniq_id_indices(uniq_obj_id[iexp], order=order)
if not np.any(ithis):
msgs.error(f'Object {uniq_obj_id[iexp]} provided not valid. Optimal weights cannot be determined.')
order_str = f' on slit/order {order}' if order is not None else ''
# check if OPT_COUNTS is available
if sobjs[ithis][0].has_opt_ext() and np.any(sobjs[ithis][0].OPT_MASK):
wave_iexp, flux_iexp, ivar_iexp, gpm_iexp = sobjs[ithis][0].get_opt_ext()
waves.append(wave_iexp)
fluxes.append(flux_iexp)
ivars.append(ivar_iexp)
gpms.append(gpm_iexp)
# check if BOX_COUNTS is available
elif sobjs[ithis][0].has_box_ext() and np.any(sobjs[ithis][0].BOX_MASK):
wave_iexp, flux_iexp, ivar_iexp, gpm_iexp = sobjs[ithis][0].get_box_ext()
waves.append(wave_iexp)
fluxes.append(flux_iexp)
ivars.append(ivar_iexp)
gpms.append(gpm_iexp)
msgs.warn(f'Optimal extraction not available for object '
f'{uniq_obj_id[iexp]} {order_str} in exp {iexp}. Using box extraction.')
else:
msgs.error(f'Optimal weights cannot be determined because '
f'flux not available for object = {uniq_obj_id[iexp]} {order_str} in exp {iexp}. ')
# TODO For now just use the zero as the reference for the wavelengths? Perhaps we should be rebinning the data though?
rms_sn, weights = coadd.sn_weights(fluxes, ivars, gpms, sn_smooth_npix=self.sn_smooth_npix, weight_method=weight_method)
return rms_sn, weights
[docs]
def coadd(self, interp_dspat=True):
"""
Construct a 2d co-add of a stack of PypeIt spec2d reduction outputs.
This method calls loops over slits/orders and performs the 2d-coadd by
calling coadd.compute.coadd2d, which 'rectifies' images by coadding them
about the reference_trace_stack.
Parameters
----------
interp_dspat : bool, optional
Interpolate in the spatial coordinate image to faciliate running
through core.extract.local_skysub_extract. Default=True
Returns
-------
coadd_list : list
List of dictionaries, one for each slit, containing the 2d stack.
# TODO Make this a PypeIt object, with data model yada-yada.
"""
coadd_list = []
for slit_idx in self.good_slits:
_slitord_id = self.stack_dict['slits_list'][0].slitord_id
msgs.info(f'Performing 2D coadd for slit/order {_slitord_id[slit_idx]} ({slit_idx + 1}/{self.nslits_single})')
# mask identifying the current slit in each exposure
thismask_stack = [np.abs(slitmask - self.spat_ids[slit_idx]) <= self.par['coadd2d']['spat_toler']
for slitmask in self.stack_dict['slitmask_stack']]
# check if the slit is found in every exposure
if not np.all([np.any(thismask) for thismask in thismask_stack]):
msgs.warn(f'Slit/order {_slitord_id[slit_idx]} was not found in every exposures. '
f'2D coadd cannot be performed on this slit. Try increasing the parameter spat_toler')
continue
# reference trace
ref_trace_stack = self.reference_trace_stack(slit_idx, offsets=self.offsets, uniq_obj_id=self.obj_id_bri)
# Perform the 2d coadd
# NOTE: mask_stack is a gpm, and this is called inmask_stack in
# compute_coadd2d, and outmask in coadd_dict is also a gpm
mask_stack = [mask == 0 for mask in self.stack_dict['mask_stack']]
coadd_dict = coadd.compute_coadd2d(ref_trace_stack, self.stack_dict['sciimg_stack'],
self.stack_dict['sciivar_stack'],
self.stack_dict['skymodel_stack'],
mask_stack,
thismask_stack,
self.stack_dict['waveimg_stack'],
self.wave_grid, self.par['coadd2d']['spat_samp_fact'],
maskdef_dict=self.get_maskdef_dict(slit_idx, ref_trace_stack),
weights=self._get_weights(indx=slit_idx), interp_dspat=interp_dspat)
coadd_list.append(coadd_dict)
if len(coadd_list) == 0:
msgs.error("All the slits were missing in one or more exposures. 2D coadd cannot be performed")
return coadd_list
[docs]
def create_pseudo_image(self, coadd_list):
"""
..todo.. see below
THIS UNDOCUMENTED CODE PROBABLY SHOULD GENERATE AND RETURN
STANDARD PYPEIT OBJCTS INSTEAD OF SOME UNDEFINED DICT"""
# Check that self.nslit is equal to len(coadd_list)
if self.nslits_coadded != len(coadd_list):
msgs.error('Wrong number of slits for the 2d coadded frame')
nspec_vec = np.zeros(self.nslits_coadded,dtype=int)
nspat_vec = np.zeros(self.nslits_coadded,dtype=int)
for islit, cdict in enumerate(coadd_list):
nspec_vec[islit]=cdict['nspec']
nspat_vec[islit]=cdict['nspat']
# Determine the size of the pseudo image
nspat_pad = 10
nspec_pseudo = nspec_vec.max()
nspat_pseudo = int(np.sum(nspat_vec) + (self.nslits_coadded + 1)*nspat_pad) # Cast for SlitTraceSet
spec_vec_pseudo = np.arange(nspec_pseudo)
shape_pseudo = (nspec_pseudo, nspat_pseudo)
imgminsky_pseudo = np.zeros(shape_pseudo)
sciivar_pseudo = np.zeros(shape_pseudo)
waveimg_pseudo = np.zeros(shape_pseudo)
waveimg_mid_pseudo = np.zeros(shape_pseudo)
tilts_pseudo = np.zeros(shape_pseudo)
spat_img_pseudo = np.zeros(shape_pseudo)
nused_pseudo = np.zeros(shape_pseudo, dtype=int)
inmask_pseudo = np.zeros(shape_pseudo, dtype=bool)
wave_mid = np.zeros((nspec_pseudo, self.nslits_coadded))
wave_mask = np.zeros((nspec_pseudo, self.nslits_coadded),dtype=bool)
wave_min = np.zeros((nspec_pseudo, self.nslits_coadded))
wave_max = np.zeros((nspec_pseudo, self.nslits_coadded))
dspat_mid = np.zeros((nspat_pseudo, self.nslits_coadded))
spat_left = nspat_pad
slit_left = np.zeros((nspec_pseudo, self.nslits_coadded))
slit_righ = np.zeros((nspec_pseudo, self.nslits_coadded))
spec_min1 = np.zeros(self.nslits_coadded)
spec_max1 = np.zeros(self.nslits_coadded)
# maskdef info
all_maskdef_ids = np.array([cc['maskdef_id'] for cc in coadd_list])
if None not in all_maskdef_ids:
maskdef_id = np.zeros(self.nslits_coadded, dtype=int)
maskdef_objpos = np.zeros(self.nslits_coadded)
maskdef_slitcen = np.zeros((nspec_pseudo, self.nslits_coadded))
maskdef_designtab = Table()
else:
maskdef_id = None
maskdef_objpos = None
maskdef_slitcen = None
maskdef_designtab = None
nspec_grid = self.wave_grid_mid.size
for islit, coadd_dict in enumerate(coadd_list):
spat_righ = spat_left + nspat_vec[islit]
ispec = slice(0,nspec_vec[islit])
ispat = slice(spat_left,spat_righ)
imgminsky_pseudo[ispec, ispat] = coadd_dict['imgminsky']
sciivar_pseudo[ispec, ispat] = coadd_dict['sciivar']
waveimg_pseudo[ispec, ispat] = coadd_dict['waveimg']
# NOTE: inmask is a gpm
inmask_pseudo[ispec, ispat] = coadd_dict['outmask']
image_temp = (coadd_dict['dspat'] - coadd_dict['dspat_mid'][0] + spat_left) #*coadd_dict['outmask']
# spat_img_pseudo is the sub-pixel image position on the rebinned pseudo image
spat_img_pseudo[ispec, ispat] = image_temp
nused_pseudo[ispec, ispat] = coadd_dict['nused']
wave_min[ispec, islit] = coadd_dict['wave_min']
wave_max[ispec, islit] = coadd_dict['wave_max']
wave_mid[ispec, islit] = coadd_dict['wave_mid']
# waveimg_mid_pseudo image containing the bin centers that the data was rebinned onto
waveimg_mid_pseudo[ispec, ispat] = np.repeat(wave_mid[ispec, islit][:, np.newaxis], nspat_vec[islit], axis=1)
# Patch locations where the waveimg is zero with the midpoints of the grid. This prevents discontinuities
# in the wavelength image. This means howver that the 2d wavelength image has wavelengths with
# two different meanings, i.e. where unmasked they are averaged rebinned wavelengths, but where masked
# it is the original grid.
# TODO THink about whether we should just use the fixed grid wavelengths throughout as the waveimg rather than
# have this hybrid defintion.
waveimg_pseudo[ispec, ispat][np.logical_not(inmask_pseudo[ispec, ispat])] = \
waveimg_mid_pseudo[ispec, ispat][np.logical_not(inmask_pseudo[ispec, ispat])]
wave_mask[ispec, islit] = True
tilts_pseudo[ispec, ispat] = (waveimg_pseudo[ispec, ispat] - coadd_dict['wave_min'][0])/(coadd_dict['wave_max'][-1] - coadd_dict['wave_min'][0])
# Fill in the rest of the wave_mid with the corresponding points in the wave_grid
#wave_this = wave_mid[wave_mask[:,islit], islit]
#ind_upper = np.argmin(np.abs(self.wave_grid_mid - wave_this.max())) + 1
#if nspec_vec[islit] != nspec_pseudo:
# wave_mid[nspec_vec[islit]:, islit] = self.wave_grid_mid[ind_upper:ind_upper + (nspec_pseudo-nspec_vec[islit])]
dspat_mid[ispat, islit] = coadd_dict['dspat_mid']
slit_left[:,islit] = np.full(nspec_pseudo, spat_left)
slit_righ[:,islit] = np.full(nspec_pseudo, spat_righ)
spec_max1[islit] = nspec_vec[islit]-1
spat_left = spat_righ + nspat_pad
# maskdef info
if None not in all_maskdef_ids:
maskdef_id[islit] = coadd_dict['maskdef_id']
maskdef_objpos[islit] = coadd_dict['maskdef_objpos']
maskdef_slitcen[:, islit] = np.full(nspec_pseudo, coadd_dict['maskdef_slitcen'] + slit_left[:,islit])
if coadd_dict['maskdef_designtab'] is not None:
maskdef_designtab = vstack([maskdef_designtab, coadd_dict['maskdef_designtab']])
slits_pseudo \
= slittrace.SlitTraceSet(slit_left, slit_righ, self.pypeline, detname=self.detname,
nspat=nspat_pseudo, PYP_SPEC=self.spectrograph.name,
specmin=spec_min1, specmax=spec_max1,
maskdef_id=maskdef_id, maskdef_objpos=maskdef_objpos,
maskdef_offset=0., maskdef_slitcen=maskdef_slitcen,
maskdef_designtab=maskdef_designtab)
# change value of spat_id in maskdef_designtab
# needs to be done here because spat_id is computed in slittrace
if maskdef_designtab is not None:
slits_pseudo.maskdef_designtab['SPAT_ID'] = slits_pseudo.spat_id
# assign ech_order if exist
slits_pseudo.ech_order = self.stack_dict['slits_list'][0].ech_order[self.good_slits] \
if self.stack_dict['slits_list'][0].ech_order is not None else None
slitmask_pseudo = slits_pseudo.slit_img()
# This is a kludge to deal with cases where bad wavelengths result in large regions where the slit is poorly sampled,
# which wreaks havoc on the local sky-subtraction
min_slit_frac = 0.70
spec_min = np.zeros(self.nslits_coadded)
spec_max = np.zeros(self.nslits_coadded)
for islit in range(self.nslits_coadded):
spat_id = slits_pseudo.spat_id[islit]
slit_width = np.sum(inmask_pseudo & (slitmask_pseudo == spat_id), axis=1)
slit_width_img = np.outer(slit_width, np.ones(nspat_pseudo))
med_slit_width = np.median(slit_width_img[slitmask_pseudo == spat_id])
# TODO -- need inline docs
nspec_eff = np.sum(slit_width > min_slit_frac*med_slit_width)
nsmooth = int(np.fmax(np.ceil(nspec_eff*0.02),10))
slit_width_sm = ndimage.filters.median_filter(slit_width, size=nsmooth, mode='reflect')
igood = (slit_width_sm > min_slit_frac*med_slit_width)
# TODO -- need inline docs
spec_min[islit] = spec_vec_pseudo[igood].min()
spec_max[islit] = spec_vec_pseudo[igood].max()
bad_pix = (slit_width_img < min_slit_frac*med_slit_width) & (slitmask_pseudo == spat_id)
inmask_pseudo[bad_pix] = False
# Update slits_pseudo
slits_pseudo.specmin = spec_min
slits_pseudo.specmax = spec_max
return dict(nspec=nspec_pseudo, nspat=nspat_pseudo, imgminsky=imgminsky_pseudo,
sciivar=sciivar_pseudo, inmask=inmask_pseudo, tilts=tilts_pseudo,
waveimg=waveimg_pseudo, waveimg_mid=waveimg_mid_pseudo, spat_img=spat_img_pseudo, slits=slits_pseudo,
wave_mask=wave_mask, wave_mid=wave_mid, wave_min=wave_min, wave_max=wave_max)
[docs]
def reduce(self, pseudo_dict, show=False, clear_ginga=True, show_peaks=False, show_skysub_fit=False, basename=None):
"""
Method to run the reduction on coadd2d pseudo images
Args:
pseudo_dict (dict):
Dictionary containing coadd2d pseudo images
show (bool):
If True, show the outputs to ginga and the screen analogous to run_pypeit with the -s option
show_peaks (bool):
If True, plot the object finding QA to the screen.
basename (str):
The basename for the spec2d output files.
Returns:
"""
show = self.show if show is None else show
show_peaks = self.show_peaks if show_peaks is None else show_peaks
# NOTE: inmask is a gpm
sciImage = pypeitimage.PypeItImage(pseudo_dict['imgminsky'], ivar=pseudo_dict['sciivar'],
bpm=np.logical_not(pseudo_dict['inmask']))
sciImage.detector = self.stack_dict['detectors'][0]
#
slitmask_pseudo = pseudo_dict['slits'].slit_img()
sciImage.build_mask(slitmask=slitmask_pseudo)
# Make changes to parset specific to 2d coadds
parcopy = copy.deepcopy(self.par)
# Enforce low order traces since we are rectified
parcopy['reduce']['findobj']['trace_npoly'] = int(np.clip(parcopy['reduce']['findobj']['trace_npoly'],None,3))
# Manual extraction.
manual_obj = None
if self.par['coadd2d']['manual'] is not None and len(self.par['coadd2d']['manual']) > 0:
manual_obj = ManualExtractionObj.by_fitstbl_input('None', self.par['coadd2d']['manual'], self.spectrograph)
# Get bpm mask. There should not be any masked slits because we excluded those already
# before the coadd, but we need to pass a bpm to FindObjects and Extract
slits = pseudo_dict['slits']
# Initiate FindObjects object
objFind = find_objects.FindObjects.get_instance(sciImage, pseudo_dict['slits'], self.spectrograph, parcopy,
'science_coadd2d', tilts=pseudo_dict['tilts'],
bkg_redux=self.bkg_redux, manual=manual_obj,
find_negative=self.find_negative, basename=basename,
clear_ginga=clear_ginga, show=show)
if show:
gpm = sciImage.select_flag(invert=True)
objFind.show('image', image=pseudo_dict['imgminsky']*gpm.astype(float), chname='imgminsky', slits=True)
global_sky_pseudo, sobjs_obj = objFind.run(show_peaks=show or show_peaks, show_skysub_fit=show_skysub_fit)
# maskdef stuff
if parcopy['reduce']['slitmask']['assign_obj'] and slits.maskdef_designtab is not None:
# Get pixel scale, binned and resampled (if requested), i.e., pixel scale of the pseudo image
resampled_pixscale = parse.parse_binning(sciImage.detector.binning)[1]*sciImage.detector.platescale*self.par['coadd2d']['spat_samp_fact']
# Assign slitmask design information to detected objects
slits.assign_maskinfo(sobjs_obj, resampled_pixscale, None, TOLER=parcopy['reduce']['slitmask']['obj_toler'])
if parcopy['reduce']['slitmask']['extract_missing_objs'] is True:
# Set the FWHM for the extraction of missing objects
fwhm = slits.get_maskdef_extract_fwhm(sobjs_obj, resampled_pixscale,
parcopy['reduce']['slitmask']['missing_objs_fwhm'],
parcopy['reduce']['findobj']['find_fwhm'])
# Assign undetected objects
sobjs_obj = slits.mask_add_missing_obj(sobjs_obj, None, fwhm,
parcopy['reduce']['slitmask']['missing_objs_boxcar_rad']/resampled_pixscale)
# Initiate Extract object
exTract = extraction.Extract.get_instance(sciImage, pseudo_dict['slits'], sobjs_obj, self.spectrograph, parcopy,
'science_coadd2d', global_sky=None, tilts=pseudo_dict['tilts'],
waveimg=pseudo_dict['waveimg'], bkg_redux=self.bkg_redux,
basename=basename, show=show)
skymodel_pseudo, _, objmodel_pseudo, ivarmodel_pseudo, outmask_pseudo, sobjs, _, _, _ = exTract.run(
model_noise=False, spat_pix=pseudo_dict['spat_img'])
# Add the rest to the pseudo_dict
pseudo_dict['skymodel'] = skymodel_pseudo
pseudo_dict['objmodel'] = objmodel_pseudo
pseudo_dict['ivarmodel'] = ivarmodel_pseudo
pseudo_dict['outmask'] = outmask_pseudo
pseudo_dict['sobjs'] = sobjs
self.pseudo_dict=pseudo_dict
return pseudo_dict['imgminsky'], pseudo_dict['sciivar'], skymodel_pseudo, \
objmodel_pseudo, ivarmodel_pseudo, outmask_pseudo, sobjs, sciImage.detector, slits, \
pseudo_dict['tilts'], pseudo_dict['waveimg']
[docs]
@staticmethod
def offsets_report(offsets, pixscale, offsets_method):
"""
Print out a report on the offsets of the frames to be coadded
Args:
offsets (`numpy.ndarray`_)
Array of offsets
pixscale (float):
The (binned) pixelscale in arcsec/pixel.
offsets_method (str):
A string describing the method used to determine the offsets
"""
if offsets_method is not None and offsets is not None:
msg_string = msgs.newline() + '---------------------------------------------------------------------------------'
msg_string += msgs.newline() + ' Summary of offsets from {} '.format(offsets_method)
msg_string += msgs.newline() + '---------------------------------------------------------------------------------'
msg_string += msgs.newline() + ' exp# offset (pixels) offset (arcsec)'
for iexp, off in enumerate(offsets):
msg_string += msgs.newline() + ' {:2d} {:6.2f} {:6.3f}'.format(iexp, off, off*pixscale)
msg_string += msgs.newline() + '---------------------------------------------------------------------------------'
msgs.info(msg_string)
[docs]
def offset_slit_cen(self, slitid, offsets):
"""
Offset the slit centers of the slit designated by slitid by the provided offsets
Args:
slitid (int):
ID of the slit that is being offset
offsets (list, `numpy.ndarray`_):
A list or array of offsets that are being applied to the slit center
Returns:
:obj:`list`: A list of reference traces for the 2d coadding that
have been offset.
"""
return [slits.center[:,slitid] - offsets[iexp]
for iexp, slits in enumerate(self.stack_dict['slits_list'])]
[docs]
def get_wave_grid(self):
"""
Routine to create a wavelength grid for 2d coadds using all of the
wavelengths of the extracted objects. Calls
:func:`~pypeit.core.wavecal.wvutils.get_wave_grid`.
Returns:
tuple: Returns the following:
- wave_grid (`numpy.ndarray`_): New wavelength grid, not
masked
- wave_grid_mid (`numpy.ndarray`_): New wavelength grid
evaluated at the centers of the wavelength bins, that
is this grid is simply offset from wave_grid by
dsamp/2.0, in either linear space or log10 depending
on whether linear or (log10 or velocity) was
requested. For iref or concatenate the linear
wavelength sampling will be calculated.
- dsamp (float): The pixel sampling for wavelength grid
created.
"""
nobjs_tot = int(np.array([len(spec) for spec in self.stack_dict['specobjs_list']]).sum())
# TODO: Do we need this flag since we can determine whether or not we have specobjs from nobjs_tot?
# This all seems a bit hacky
if self.par['coadd2d']['use_slits4wvgrid'] or nobjs_tot==0:
nslits_tot = np.sum([slits.nslits for slits in self.stack_dict['slits_list']])
waves, gpms = [], []
box_radius = 3.
#indx = 0
# Loop on the exposures
for iexp, (waveimg, slitmask, slits) in enumerate(zip(self.stack_dict['waveimg_stack'],
self.stack_dict['slitmask_stack'],
self.stack_dict['slits_list'])):
slits_left, slits_righ, _ = slits.select_edges()
row = np.arange(slits_left.shape[0])
# Loop on the slits
for kk, spat_id in enumerate(slits.spat_id):
mask = slitmask == spat_id
# Create apertures at 5%, 50%, and 95% of the slit width to cover full range of wavelengths
# on this slit
trace_spat = slits_left[:, kk][:,np.newaxis] + np.outer((slits_righ[:,kk] - slits_left[:,kk]),[0.05,0.5,0.95])
box_denom = moment1d(waveimg * mask > 0.0, trace_spat, 2 * box_radius, row=row)[0]
wave_box = moment1d(waveimg * mask, trace_spat, 2 * box_radius,
row=row)[0] / (box_denom + (box_denom == 0.0))
gpm_box = box_denom > 0.
waves += [wave for (wave, gpm) in zip(wave_box.T, gpm_box.T) if np.any(gpm)]
gpms += [(wave > 0.) & gpm for (wave, gpm) in zip(wave_box.T, gpm_box.T) if np.any(gpm)]
else:
waves, gpms = [], []
for iexp, spec_this in enumerate(self.stack_dict['specobjs_list']):
for spec in spec_this:
# NOTE: BOX extraction usage needed for quicklook
good_opt_ext = spec.has_opt_ext() and np.any(spec.OPT_MASK)
good_box_ext = spec.has_box_ext() and np.any(spec.BOX_MASK)
if good_opt_ext or good_box_ext:
waves.append(spec.OPT_WAVE if good_opt_ext else spec.BOX_WAVE)
gpms.append(spec.OPT_MASK if good_opt_ext else spec.BOX_MASK)
# TODO -- OPT_MASK is likely to become a bpm with int values
#gpm[:self.nspec_array[iexp], indx] = spec.OPT_MASK
#indx += 1
return wvutils.get_wave_grid(waves=waves, gpms=gpms, wave_method=self.wave_method(),
spec_samp_fact=self.par['coadd2d']['spec_samp_fact'])
[docs]
def load_coadd2d_stacks(self, spec2d, chk_version=False):
"""
Routine to read in required images for 2d coadds given a list of spec2d files.
Args:
spec2d_files: list
List of spec2d filenames
det: int
detector in question
Returns:
dict: Dictionary containing all the images and keys required
for perfomring 2d coadds.
"""
redux_path = os.getcwd()
# Grab the files
#head2d_list = []
# Image stacks
sciimg_stack = []
waveimg_stack = []
skymodel_stack = []
sciivar_stack = []
mask_stack = []
slitmask_stack = []
exptime_stack = []
#tilts_stack = []
# Object stacks
specobjs_list = []
slits_list = []
nfiles =len(spec2d)
detectors_list = []
maskdef_designtab_list = []
spat_flexure_list = []
for ifile, f in enumerate(spec2d):
if isinstance(f, spec2dobj.Spec2DObj):
# If spec2d is a list of objects
s2dobj = f
else:
# If spec2d is a list of files, option to also use spec1ds
s2dobj = spec2dobj.Spec2DObj.from_file(f, self.detname, chk_version=chk_version)
spec1d_file = f.replace('spec2d', 'spec1d')
if os.path.isfile(spec1d_file):
sobjs = specobjs.SpecObjs.from_fitsfile(spec1d_file, chk_version=chk_version)
this_det = sobjs.DET == self.detname
specobjs_list.append(sobjs[this_det])
# TODO the code should run without a spec1d file, but we need to implement that
slits_list.append(s2dobj.slits)
detectors_list.append(s2dobj.detector)
maskdef_designtab_list.append(s2dobj.maskdef_designtab)
spat_flexure_list.append(s2dobj.sci_spat_flexure)
sciimg_stack.append(s2dobj.sciimg)
exptime_stack.append(s2dobj.head0['EXPTIME'])
waveimg_stack.append(s2dobj.waveimg)
skymodel_stack.append(s2dobj.skymodel)
sciivar_stack.append(s2dobj.ivarmodel)
mask_stack.append(s2dobj.bpmmask.mask)
slitmask_stack.append(s2dobj.slits.slit_img(flexure=s2dobj.sci_spat_flexure))
# check if exptime is consistent for all images
exptime_coadd = np.percentile(exptime_stack, 50., method='higher')
isclose_exptime = np.isclose(exptime_stack, exptime_coadd, atol=1.)
if not np.all(isclose_exptime):
msgs.warn('Exposure time is not consistent (within 1 sec) for all frames being coadded! '
f'Scaling each image by the median exposure time ({exptime_coadd} s) before coadding.')
exp_scale = exptime_coadd / exptime_stack
for iexp in range(nfiles):
if not isclose_exptime[iexp]:
sciimg_stack[iexp] *= exp_scale[iexp]
skymodel_stack[iexp] *= exp_scale[iexp]
sciivar_stack[iexp] /= exp_scale[iexp]**2
return dict(specobjs_list=specobjs_list, slits_list=slits_list,
slitmask_stack=slitmask_stack,
sciimg_stack=sciimg_stack,
sciivar_stack=sciivar_stack,
skymodel_stack=skymodel_stack, mask_stack=mask_stack,
waveimg_stack=waveimg_stack,
exptime_stack=exptime_stack,
exptime_coadd=exptime_coadd,
redux_path=redux_path,
detectors=detectors_list,
spectrograph=self.spectrograph.name,
pypeline=self.spectrograph.pypeline,
maskdef_designtab_list=maskdef_designtab_list,
spat_flexure_list=spat_flexure_list)
# tilts_stack=tilts_stack, waveimg_stack=waveimg_stack,
[docs]
def compute_offsets(self):
"""
Determine self.offsets, the offset of the frames to be coadded with respect to the first frame.
This is partially overloaded by the child methods.
"""
msgs.info('Get Offsets')
# binned pixel scale of the frames to be coadded
pixscale = parse.parse_binning(self.stack_dict['detectors'][0].binning)[1]*self.stack_dict['detectors'][0].platescale
# 1) offsets are provided in the header of the spec2d files
if self.par['coadd2d']['offsets'] == 'header':
msgs.info('Using offsets from header')
dithoffs = [self.spectrograph.get_meta_value(f, 'dithoff') for f in self.spec2d]
if None in dithoffs:
msgs.error('Dither offsets keyword not found for one or more spec2d files. '
'Choose another option for `offsets`')
dithoffs_pix = - np.array(dithoffs) / pixscale
self.offsets = dithoffs_pix[0] - dithoffs_pix
self.offsets_report(self.offsets, pixscale, 'header keyword')
elif self.obj_id_bri is None and self.par['coadd2d']['offsets'] == 'auto':
msgs.error('Offsets cannot be computed because no unique reference object '
'with the highest S/N was found. To continue, provide offsets in `Coadd2DPar`')
# 2) a list of offsets is provided by the user (no matter if we have a bright object or not)
elif isinstance(self.par['coadd2d']['offsets'], (list, np.ndarray)):
msgs.info('Using user input offsets')
# use them
self.offsets = self.check_input(self.par['coadd2d']['offsets'], 'offsets')
self.offsets_report(self.offsets, pixscale, 'user input')
# 3) parset `offsets` is = 'maskdef_offsets' (no matter if we have a bright object or not)
elif self.par['coadd2d']['offsets'] == 'maskdef_offsets':
self.maskdef_offset = np.array([slits.maskdef_offset for slits in self.stack_dict['slits_list']])
# Check if maskdef_offset is actually recoded in the SlitTraceSet
if np.any(self.maskdef_offset == None):
msgs.error('maskdef_offsets are not recoded in the SlitTraceSet '
'for one or more exposures. They cannot be used.')
# the offsets computed during the main reduction (`run_pypeit`) are used
msgs.info('Determining offsets using maskdef_offset recoded in SlitTraceSet')
self.offsets = self.maskdef_offset[0] - self.maskdef_offset
self.offsets_report(self.offsets, pixscale, 'maskdef_offset')
# 4) parset `offsets` = 'auto' but we have a bright object
elif self.par['coadd2d']['offsets'] == 'auto' and self.obj_id_bri is not None:
# see child method
pass
else:
msgs.error('Invalid value for `offsets`')
[docs]
def compute_weights(self):
"""
Determine the weights to be used in the coadd2d.
This is partially overloaded by the child methods.
This method sets the internal :attr:`use_weights`. Documentation on the
form of self.use_weights needs to be written.
"""
msgs.info('Get Weights')
# 1) User input weight
if isinstance(self.par['coadd2d']['weights'], (list, np.ndarray)):
# use those inputs
self.use_weights = self.check_input(self.par['coadd2d']['weights'], 'weights')
msgs.info('Using user input weights')
# 2) No bright object and parset `weights` is 'auto' or 'uniform',
# or Yes bright object but the user wants to still use uniform weights
elif ((self.obj_id_bri is None) and (self.par['coadd2d']['weights'] in ['auto', 'uniform'])) or \
((self.obj_id_bri is not None) and (self.par['coadd2d']['weights'] == 'uniform')):
if self.par['coadd2d']['weights'] == 'auto':
# TODO maybe better behavior here would be to crash out to force the user to change the weight method explicitly
# to 'uniform'. What I don't like here is that we are using uniform weights even though the user requested 'auto'
# and they might miss the warning. Its debatable though.
# warn if the user had put `auto` in the parset
msgs.warn('Weights cannot be computed because no unique reference object '
'with the highest S/N was found. Using uniform weights instead.')
elif self.par['coadd2d']['weights'] == 'uniform':
msgs.info('Using uniform weights')
# use uniform weights
self.use_weights = (np.ones(self.nexp) / float(self.nexp)).tolist()
# 3) Bright object exists and parset `weights` is equal to 'auto'
elif (self.obj_id_bri is not None) and (self.par['coadd2d']['weights'] == 'auto'):
# see child method
pass
else:
msgs.error('Invalid value for `weights`')
[docs]
def _get_weights(self, indx=None):
"""
Method to select the correct weights for the selected slit/order.
This is partially overloaded by the child methods
Args:
indx (:obj:`int`, optional):
Index of the slit/order for which the weights are to be returned.
Not used in the base class but can be used in child classes.
Returns:
:obj:`list`: List of weights to be used for 2D coadd. The length of the list
is equal to the number of exposures.
"""
return self.use_weights
[docs]
@staticmethod
def unpack_specobj(spec, spatord_id=None):
"""
Utility routine to unpack flux, ivar, and gpm from a single SpecObj
object.
Args:
spec (:class:`~pypeit.specobj.SpecObj`):
SpecObj object to unpack.
spatord_id (:obj:`int`, optional):
Slit/order ID to unpack. If None, the Slit/order ID
of the SpecObj object is used.
Returns:
:obj:`tuple`: Returns the following: flux (`numpy.ndarray`_), ivar
(`numpy.ndarray`_), gpm (`numpy.ndarray`_).
"""
# Get the slit/order ID if not provided
if spatord_id is None:
spatord_id = spec.ECH_ORDER if spec.ECH_ORDER is not None else spec.SLITID
# get OBJID, which is different for Echelle and MultiSlit
objid = spec.ECH_FRACPOS_ID if spec.ECH_FRACPOS_ID is not None else spec.SPAT_PIXPOS_ID
# check if OPT_COUNTS is available
if spec.has_opt_ext() and np.any(spec.OPT_MASK):
_, flux, ivar, gpm = spec.get_opt_ext()
# check if BOX_COUNTS is available
elif spec.has_box_ext() and np.any(spec.BOX_MASK):
_, flux, ivar, gpm = spec.get_box_ext()
msgs.warn(f'Optimal extraction not available for obj_id {objid} '
f'in slit/order {spatord_id}. Using box extraction.')
else:
msgs.warn(f'Optimal and Boxcar extraction not available for obj_id {objid} in slit/order {spatord_id}.')
_, flux, ivar, gpm = None, None, None, None
return flux, ivar, gpm
[docs]
def get_brightest_obj(self, specobjs_list, spat_ids):
"""
Dummy method to identify the brightest object. Overloaded by child methods.
Parameters
----------
specobjs_list
spat_ids
Returns
-------
"""
msgs.error('The get_brightest_obj() method should be overloaded by the child class.')
[docs]
def handle_reference_obj(self):
"""
Dummy method to handle the reference object. Overloaded by child methods.
"""
msgs.error('The handle_reference_obj() method should be overloaded by the child class.')
[docs]
def reference_trace_stack(self, slitid, offsets=None, uniq_obj_id=None):
"""
Dummy method to obtain the stack of reference traces. Overloaded by child methods.
Parameters
----------
slitid: int
The slit/order ID for which the reference traces are to be obtained.
offsets: list, `numpy.ndarray`_, optional
List or array of offsets to apply to the reference traces.
It must have the same length as the number of exposures. Optional.
uniq_obj_id: list, `numpy.ndarray`_, optional
List or array of object IDs to use for the reference traces.
It must have the same length as the number of exposures. Optional.
Returns
-------
ref_trace_stack: list
List of reference traces for the slit/order specified by slitid.
"""
msgs.error('The reference_trace_stack() method should be overloaded by the child class.')
[docs]
def get_maskdef_dict(self, slit_idx, ref_trace_stack):
"""
Dummy method to get maskdef info. Overloaded by child methods.
Args:
slit_idx (:obj:`int`):
index of a slit in the uncoadded frames
ref_trace_stack (`numpy.ndarray`_):
Stack of reference traces about which the images are rectified
and coadded. It is the slitcen appropriately shifted according
the frames offsets. Shape is (nspec, nimgs).
Returns:
:obj:`dict`: Dictionary containing all the maskdef info. The
quantities saved are: maskdef_id, maskdef_objpos, maskdef_slitcen,
maskdef_designtab. To learn what they are see
:class:`~pypeit.slittrace.SlitTraceSet` datamodel.
"""
return dict(maskdef_id=None, maskdef_objpos=None, maskdef_slitcen=None, maskdef_designtab=None)
[docs]
def wave_method(self):
"""
Get the wavelength method to be used in the coadd2d. This is a dummy method since
it is overloaded by child classes.
Returns:
str: The wavelength method to be used in the coadd2d.
"""
msgs.error('The wave_method() method should be overloaded by the child class.')
# Multislit can coadd with:
# 1) input offsets or if offsets is None, it will find the brightest trace and compute them
# 2) specified weights, or if weights is None and auto_weights=True, it will compute weights using the brightest object
# Echelle can either stack with:
# 1) input offsets or if offsets is None, it will find the objid of brightest trace and stack all orders relative to the trace of this object.
# 2) specified weights, or if weights is None and auto_weights=True,
# it will use wavelength dependent weights determined from the spectrum of the brightest objects objid on each order
[docs]
class MultiSlitCoAdd2D(CoAdd2D):
"""
Child of Coadd2d for Multislit and Longslit reductions. For documentation see CoAdd2d parent class above.
# Multislit can coadd with:
# 1) input offsets or if offsets is None, it will find the brightest trace and compute them
# 2) specified weights, or if weights is None and auto_weights=True, it will compute weights using the brightest object
"""
def __init__(self, spec2d_files, spectrograph, par, det=1,
only_slits=None, exclude_slits=None, sn_smooth_npix=None,
bkg_redux=False, find_negative=False, show=False, show_peaks=False, debug_offsets=False, debug=False):
# Attributes specifically used by MultislitCoAdd2D
# This is an integer, which is the spatial slit id of the slit with the brightest object.
# Used for both offsets (if offsets='auto') and weights (if weights='auto').
# Can be user specified if user_obj_ids is provided
self.spatid_bri = None
# This will be an array of the object spatial pixel positions used for auto weights in each exposure
self.spat_pixpos_id_weights = None
super().__init__(spec2d_files, spectrograph, det=det,
only_slits=only_slits, exclude_slits=exclude_slits,
sn_smooth_npix=sn_smooth_npix, bkg_redux=bkg_redux, find_negative=find_negative, par=par,
show=show, show_peaks=show_peaks, debug_offsets=debug_offsets,
debug=debug)
[docs]
def handle_reference_obj(self):
"""
Method to handle the syntrax for the reference object to be used for offsets and weights.
"""
# otherwise, find if there is a bright object we could use
if len(self.stack_dict['specobjs_list']) > 0 and (self.par['coadd2d']['offsets'] == 'auto' or self.par['coadd2d']['weights'] == 'auto'):
# If the user passed in user_obj_ids, we will use these for the brighest object to
# be optionally used for offsets and weights.
if self.par['coadd2d']['user_obj_ids'] is not None:
if self.par['coadd2d']['weights'] != 'auto':
msgs.error('Parameter `user_obj_ids` can only be used if weights are set to `auto`.')
if len(self.par['coadd2d']['user_obj_ids']) != self.nexp:
msgs.error('Parameter `user_obj_ids` must have the same number of elements as exposures.')
user_obj_exist = np.zeros(self.nexp, dtype=bool)
# get the flux, ivar, gpm, and spatial pixel position of the user object
fluxes, ivars, gpms, spatids, spat_pixpos = [], [], [], [], []
self.spat_pixpos_id_bri = np.array(self.par['coadd2d']['user_obj_ids'])
for i, sobjs in enumerate(self.stack_dict['specobjs_list']):
user_idx = sobjs.slitorder_uniq_id_indices(self.spat_pixpos_id_bri[i])
if np.any(user_idx):
this_sobj = sobjs[user_idx][0]
flux_iobj, ivar_iobj, gpm_iobj = self.unpack_specobj(this_sobj)
if flux_iobj is not None and ivar_iobj is not None and gpm_iobj is not None:
fluxes.append(flux_iobj)
ivars.append(ivar_iobj)
gpms.append(gpm_iobj)
spat_pixpos.append(this_sobj.SPAT_PIXPOS)
spatids.append(this_sobj.SLITID)
user_obj_exist[i] = True
# check if the user object exists in all the exposures
if not np.all(user_obj_exist):
msgs.error('Not all of the spat_pixpos_ids provided through `user_obj_ids` exist in all of the exposures.')
# Check that all spatids are within the spat_toler of each other
if not np.all(np.abs(spatids - np.mean(spatids[0])) <= self.par['coadd2d']['spat_toler']):
msgs.error('Not all spatial IDs are within spat_toler of each other')
self.spatid_bri = int(np.rint(np.mean(spatids)))
self.spat_pixpos_bri = np.array(spat_pixpos)
self.snr_bar_bri, _ = coadd.calc_snr(fluxes, ivars, gpms)
self.obj_id_bri = self.spat_pixpos_id_bri
else:
# Otherwise, find the brightest object in the stack and obtain the relevant information
self.spat_pixpos_id_bri, self.spat_pixpos_bri, self.spatid_bri, self.snr_bar_bri = self.get_brightest_obj(self.stack_dict['specobjs_list'], self.spat_ids)
self.obj_id_bri = self.spat_pixpos_id_bri
# TODO When we run multislit, we actually compute the rebinned images twice. Once here to compute the offsets
# and another time to weighted_combine the images in compute2d. This could be sped up
# TODO The reason we rebin the images for the purposes of computing the offsets is to deal with combining
# data that are dithered in the spectral direction. In these situations you cannot register the two dithered
# reference objects into the same frame without first rebinning them onto the same grid.
[docs]
def compute_offsets(self):
"""
Determine self.offsets, the offset of the frames to be coadded with respect to the first frame
Args:
offsets (:obj:`list` or :obj:`str`):
Value that guides the determination of the offsets.
It could be a list of offsets, or a string, or None. If equal to 'maskdef_offsets' the
offsets computed during the slitmask design matching will be used.
"""
super().compute_offsets()
# adjustment for multislit to case 4) parset `offsets` = 'auto' but we have a bright object
if self.par['coadd2d']['offsets'] == 'auto' and self.obj_id_bri is not None:
# Compute offsets using the bright object
if self.par['coadd2d']['user_obj_ids'] is not None:
offsets_method = 'user object on slitid = {:d}'.format(self.spatid_bri)
else:
offsets_method = 'brightest object found on slit: {:d} with avg SNR={:5.2f}'.format(self.spatid_bri,np.mean(self.snr_bar_bri))
msgs.info(f'Determining offsets using {offsets_method}')
thismask_stack = [np.abs(slitmask - self.spatid_bri) <= self.par['coadd2d']['spat_toler'] for slitmask in self.stack_dict['slitmask_stack']]
slitidx_bri = np.where(np.abs(self.spat_ids - self.spatid_bri) <= self.par['coadd2d']['spat_toler'])[0][0]
# TODO Need to think abbout whether we have multiple tslits_dict for each exposure or a single one
trace_stack_bri = [slits.center[:, slitidx_bri]
for slits in self.stack_dict['slits_list']]
# Determine the wavelength grid that we will use for the current slit/order
## TODO: Should the spatial and spectral samp_facts here match those of the final coadded data, or she would
## compute offsets at full resolution??
wave_bins = coadd.get_wave_bins(thismask_stack, self.stack_dict['waveimg_stack'], self.wave_grid)
dspat_bins, dspat_stack = coadd.get_spat_bins(thismask_stack, trace_stack_bri)
sci_list = [[sciimg - skymodel for sciimg, skymodel in zip(self.stack_dict['sciimg_stack'], self.stack_dict['skymodel_stack'])]]
var_list = [[utils.inverse(sciivar) for sciivar in self.stack_dict['sciivar_stack']]]
msgs.info('Rebinning Images')
mask_stack = [mask == 0 for mask in self.stack_dict['mask_stack']]
sci_list_rebin, var_list_rebin, norm_rebin_stack, nsmp_rebin_stack = coadd.rebin2d(
wave_bins, dspat_bins, self.stack_dict['waveimg_stack'], dspat_stack, thismask_stack, mask_stack, sci_list, var_list)
thismask = np.ones_like(sci_list_rebin[0][0,:,:],dtype=bool)
nspec_pseudo, nspat_pseudo = thismask.shape
slit_left = np.full(nspec_pseudo, 0.0)
slit_righ = np.full(nspec_pseudo, nspat_pseudo)
inmask = norm_rebin_stack > 0
traces_rect = np.zeros((nspec_pseudo, self.nexp))
user_obj_dspats = []
for iexp in range(self.nexp):
sobjs_exp = findobj_skymask.objs_in_slit(
sci_list_rebin[0][iexp,:,:], utils.inverse(var_list_rebin[0][iexp,:,:]), thismask, slit_left, slit_righ,
inmask=inmask[iexp,:,:], fwhm=self.par['reduce']['findobj']['find_fwhm'],
trim_edg=self.par['reduce']['findobj']['find_trim_edge'],
maxdev=self.par['reduce']['findobj']['find_maxdev'],
numiterfit=self.par['reduce']['findobj']['find_numiterfit'],
ncoeff=3, snr_thresh=self.par['reduce']['findobj']['snr_thresh'],
nperslit=1 if self.par['coadd2d']['user_obj_ids'] is None else None,
find_min_max=self.par['reduce']['findobj']['find_min_max'],
show_trace=self.debug_offsets, show_peaks=self.debug_offsets)
if len(sobjs_exp) == 0:
msgs.error(f'No objects found in the rebinned image for exposure {iexp} '
f'(used to compute the offsets). '
f'Check `FindObjPar` parameters and try to adjust `snr_thresh`')
if self.par['coadd2d']['user_obj_ids'] is not None:
left_edge_orig = self.stack_dict['slits_list'][iexp].select_edges(flexure=self.stack_dict['spat_flexure_list'][iexp])[0]
idx_orig = self.stack_dict['specobjs_list'][iexp].slitorder_uniq_id_indices(self.par['coadd2d']['user_obj_ids'][iexp])
trace_orig = self.stack_dict['specobjs_list'][iexp][idx_orig].TRACE_SPAT
# Compute the mean median offset betweeh the original trace and the left edge of the slit
dist_to_left = np.median(trace_orig - left_edge_orig)
# Identify the trace in the sobjs_exp from the rebinned image that is closest to the original trace taking this offset into account
dspat_exp_orig = np.abs(np.median(sobjs_exp.TRACE_SPAT - dist_to_left, axis=1))
dspat_ex_orig_min = dspat_exp_orig.min()
if dspat_ex_orig_min < self.par['coadd2d']['spat_toler']:
traces_rect[:, iexp] = sobjs_exp[np.argmin(dspat_exp_orig)].TRACE_SPAT
user_obj_dspats.append(dspat_ex_orig_min)
else:
msgs.error(f'Could not identify an object in the rebinned image corresponding '
f'to the trace for the user object {self.par["coadd2d"]["user_obj_ids"][iexp]} '
f'in exposure {iexp+1} within the specified spatial '
f'tolerance ={self.par["coadd2d"]["spat_toler"]}')
else:
traces_rect[:, iexp] = sobjs_exp.TRACE_SPAT
if self.par['coadd2d']['user_obj_ids'] is not None:
msgs.info(f'The median distance between the original traces and those in the '
f'rebinned image for the user_obj_ids is {np.median(user_obj_dspats):.2f} pixels')
# Now deterimine the offsets. Arbitrarily set the zeroth trace to the reference
med_traces_rect = np.median(traces_rect,axis=0)
offsets = med_traces_rect[0] - med_traces_rect
# TODO create a QA with this
if self.debug_offsets:
for iexp in range(self.nexp):
plt.plot(traces_rect[:, iexp], linestyle='--', label='original trace')
plt.plot(traces_rect[:, iexp] + offsets[iexp], label='shifted traces')
plt.legend()
plt.show()
self.offsets = offsets
# binned pixel scale of the frames to be coadded
pixscale = parse.parse_binning(self.stack_dict['detectors'][0].binning)[1]*self.stack_dict['detectors'][0].platescale
self.offsets_report(self.offsets, pixscale, offsets_method)
[docs]
def compute_weights(self):
"""
Determine the weights to be used in the coadd2d.
This method sets the internal :attr:`use_weights`. Documentation on the
form of self.use_weights needs to be written.
Args:
weights (:obj:`list`, :obj:`str`):
Value that guides the determination of the weights. It could be
a list of weights or a string. If equal to 'auto', the weight
will be computed using the brightest trace, if 'uniform' uniform
weights will be used.
"""
super().compute_weights()
# adjustment for multislit to case 3) Bright object exists and parset `weights` is equal to 'auto'
if (self.obj_id_bri is not None) and (self.par['coadd2d']['weights'] == 'auto'):
# compute weights using bright object
_, self.use_weights = self.optimal_weights(self.obj_id_bri, weight_method='constant')
if self.par['coadd2d']['user_obj_ids'] is not None:
msgs.info(f'Weights computed using a unique reference object in slit={self.spatid_bri} provided by the user')
else:
msgs.info(f'Weights computed using a unique reference object in slit={self.spatid_bri} with the highest S/N')
self.snr_report(self.spatid_bri, self.spat_pixpos_bri, self.snr_bar_bri)
[docs]
def get_brightest_obj(self, specobjs_list, slit_spat_ids):
"""
Utility routine to find the brightest reference object in each exposure given a specobjs_list
for MultiSlit reductions.
Args:
specobjs_list: list
List of SpecObjs objects.
slit_spat_ids (`numpy.ndarray`_):
Returns:
tuple: Returns the following:
- spat_pixpos_id: ndarray, int, shape=(len(specobjs_list),):
Array of object spat_pixpos_ids representing the brightest reference object
in each exposure
- spat_pixpos: ndarray, float, shape=(len(specobjs_list),):
Array of spatial pixel positions of the brightest reference object
in each exposure
- spat_id (int):
The SPAT_ID for the slit that the highest S/N ratio object is on
- snr_bar: ndarray, float, shape (len(list),): RMS S/N computed for this brightest reference object in each exposure
"""
msgs.info('Finding brightest object')
nexp = len(specobjs_list)
nslits = slit_spat_ids.size
slit_snr_max = np.zeros((nslits, nexp), dtype=float)
bpm = np.ones(slit_snr_max.shape, dtype=bool)
spat_pixpos_id_max = np.zeros((nslits, nexp), dtype=int)
spat_pixpos_max = np.zeros((nslits, nexp), dtype=float)
# Loop over each exposure, slit, find the brightest object on that slit for every exposure
for iexp, sobjs in enumerate(specobjs_list):
msgs.info("Working on exposure {}".format(iexp))
for islit, spat_id in enumerate(slit_spat_ids):
if len(sobjs) == 0:
continue
ithis = np.abs(sobjs.SLITID - spat_id) <= self.par['coadd2d']['spat_toler']
if np.any(ithis):
spat_pixpos_id_this = sobjs[ithis].SPAT_PIXPOS_ID
spat_pixpos_this = sobjs[ithis].SPAT_PIXPOS
fluxes, ivars, gpms = [], [], []
for spec in sobjs[ithis]:
flux_iobj, ivar_iobj, gpm_iobj = self.unpack_specobj(spec, spatord_id=spat_id)
if flux_iobj is not None and ivar_iobj is not None and gpm_iobj is not None:
fluxes.append(flux_iobj)
ivars.append(ivar_iobj)
gpms.append(gpm_iobj)
# if there are objects on this slit left, we can proceed with computing rms_sn
if len(fluxes) > 0:
rms_sn, _ = coadd.calc_snr(fluxes, ivars, gpms)
imax = np.argmax(rms_sn)
slit_snr_max[islit, iexp] = rms_sn[imax]
spat_pixpos_id_max[islit, iexp] = spat_pixpos_id_this[imax]
spat_pixpos_max[islit, iexp] = spat_pixpos_this[imax]
bpm[islit, iexp] = False
# If a slit has bpm = True for some exposures and not for others, set bpm = True for all exposures
# Find the rows where any of the bpm values are True
bpm_true_idx = np.array([np.any(b) for b in bpm])
if np.any(bpm_true_idx):
# Flag all exposures in those rows
bpm[bpm_true_idx, :] = True
# Find the highest snr object among all the slits
if np.all(bpm):
msgs.warn('You do not appear to have a unique reference object that was traced as the highest S/N '
'ratio on the same slit of every exposure. Try increasing the parameter `spat_toler`')
return None, None, None, None
else:
# mask the bpm
slit_snr_max_masked = np.ma.array(slit_snr_max, mask=bpm)
slit_snr = np.mean(slit_snr_max_masked, axis=1)
slitid = np.argmax(slit_snr)
snr_bar_mean = slit_snr[slitid]
snr_bar = slit_snr_max[slitid, :]
spat_pix_pos_id = spat_pixpos_id_max[slitid, :]
spat_pixpos = spat_pixpos_max[slitid, :]
return spat_pix_pos_id, spat_pixpos, slit_spat_ids[slitid], snr_bar
[docs]
def snr_report(self, slitid, spat_pixpos, snr_bar):
"""
Print out a SNR report for the reference object used to compute the weights for multislit 2D coadds.
Args:
slitid (:obj:`int`):
The SPAT_ID of the slit that the reference object is on
spat_pixpos (:obj:`numpy.ndarray`):
Array of spatial pixel position of the reference object in the slit for each exposure shape = (nexp,)
snr_bar (:obj:`numpy.ndarray`):
Array of average S/N ratios for the reference object in each exposure, shape = (nexp,)
Returns:
"""
# Print out a report on the SNR
msg_string = msgs.newline() + '-------------------------------------'
msg_string += msgs.newline() + ' Summary for highest S/N object'
msg_string += msgs.newline() + ' found on slitid = {:d} '.format(slitid)
msg_string += msgs.newline() + '-------------------------------------'
msg_string += msgs.newline() + ' exp# spat_pixpos S/N'
msg_string += msgs.newline() + '-------------------------------------'
for iexp, (spat,snr) in enumerate(zip(spat_pixpos, snr_bar)):
msg_string += msgs.newline() + ' {:2d} {:7.1f} {:5.2f}'.format(iexp, spat, snr)
msg_string += msgs.newline() + '-------------------------------------'
msgs.info(msg_string)
# TODO add an option here to actually use the reference trace for cases where they are on the same slit and it is
# single slit???
[docs]
def reference_trace_stack(self, slitid, offsets=None, uniq_obj_id=None):
"""
Method to obtain the stack of reference traces for Multislit reductions.
Parameters
----------
slitid: int
The slit ID for which the reference traces are to be obtained.
offsets: list, `numpy.ndarray`_, optional
List or array of offsets to apply to the reference traces.
It must have the same length as the number of exposures. Optional.
uniq_obj_id: list, `numpy.ndarray`_, optional
List or array of object IDs to use for the reference traces.
Not used in Multislit reductions.
Returns
-------
ref_trace_stack: list
List of reference traces for the slit specified by slitid. Each element in the list corresponds
to a different exposure and contains the reference trace for that exposure.
"""
return self.offset_slit_cen(slitid, offsets)
[docs]
def get_maskdef_dict(self, slit_idx, ref_trace_stack):
"""
Args:
slit_idx (:obj:`int`):
index of a slit in the uncoadded frames
ref_trace_stack (`numpy.ndarray`_):
Stack of reference traces about which the images are rectified
and coadded. It is the slitcen appropriately shifted according
the frames offsets. Shape is (nspec, nimgs).
Returns:
:obj:`dict`: Dictionary containing all the maskdef info. The
quantities saved are: maskdef_id, maskdef_objpos, maskdef_slitcen,
maskdef_designtab. To learn what they are see
:class:`~pypeit.slittrace.SlitTraceSet` datamodel.
"""
# maskdef info
if self.par['calibrations']['slitedges']['use_maskdesign'] and \
self.stack_dict['slits_list'][0].maskdef_id is not None and \
self.stack_dict['slits_list'][0].maskdef_objpos is not None and \
self.stack_dict['maskdef_designtab_list'][0] is not None and \
self.par['coadd2d']['offsets'] == 'maskdef_offsets':
# maskdef_designtab info for only this slit
this_idx = self.stack_dict['maskdef_designtab_list'][0]['SPAT_ID'] == self.spat_ids[slit_idx]
this_maskdef_designtab = self.stack_dict['maskdef_designtab_list'][0][this_idx]
# remove columns that are irrelevant in the coadd2d frames
this_maskdef_designtab.remove_columns(['TRACEID', 'TRACESROW', 'TRACELPIX', 'TRACERPIX',
'SLITLMASKDEF', 'SLITRMASKDEF'])
this_maskdef_designtab.meta['MASKRMSL'] = 0.
this_maskdef_designtab.meta['MASKRMSR'] = 0.
# maskdef_id for this slit
imaskdef_id = self.stack_dict['slits_list'][0].maskdef_id[slit_idx]
# maskdef_slitcen (slit center along the spectral direction) and
# maskdef_objpos (expected position of the target, as distance from left slit edge) for this slit
# These are the binned maskdef_slitcen positions w.r.t. the center of the slit in ref_trace_stack
slit_cen_dspat_vec = np.zeros(self.nexp)
# These are the binned maskdef_objpos positions w.r.t. the center of the slit in ref_trace_stack
objpos_dspat_vec = np.zeros(self.nexp)
for iexp in range(self.nexp):
# get maskdef_slitcen
mslitcen_pixpos = self.stack_dict['slits_list'][iexp].maskdef_slitcen
if mslitcen_pixpos.ndim < 2:
mslitcen_pixpos = mslitcen_pixpos[:, None]
maskdef_slitcen_pixpos = mslitcen_pixpos[self.nspec_array[0]//2, slit_idx] + self.maskdef_offset[iexp]
# get maskdef_objpos
# find left edge
slits_left, _, _ = \
self.stack_dict['slits_list'][iexp].select_edges(flexure=self.stack_dict['spat_flexure_list'][iexp])
# targeted object spat pix
maskdef_obj_pixpos = \
self.stack_dict['slits_list'][iexp].maskdef_objpos[slit_idx] + self.maskdef_offset[iexp] \
+ slits_left[slits_left.shape[0]//2, slit_idx]
# change reference system
ref_trace = ref_trace_stack[iexp]
nspec_this = ref_trace.shape[0]
slit_cen_dspat_vec[iexp] = (maskdef_slitcen_pixpos - ref_trace[nspec_this // 2]) / self.par['coadd2d']['spat_samp_fact']
objpos_dspat_vec[iexp] = (maskdef_obj_pixpos - ref_trace[nspec_this // 2]) / self.par['coadd2d']['spat_samp_fact']
imaskdef_slitcen_dspat = np.mean(slit_cen_dspat_vec)
imaskdef_objpos_dspat = np.mean(objpos_dspat_vec)
else:
this_maskdef_designtab = None
imaskdef_id = None
imaskdef_slitcen_dspat = None
imaskdef_objpos_dspat = None
return dict(maskdef_id=imaskdef_id, maskdef_objpos=imaskdef_objpos_dspat,
maskdef_slitcen=imaskdef_slitcen_dspat, maskdef_designtab=this_maskdef_designtab)
[docs]
def wave_method(self):
"""
Return the wavelength method used for the coadd2d.
Returns:
:obj:`str`: The wavelength method used for the coadd2d.
"""
return self.par['coadd2d']['wave_method'] if self.par['coadd2d']['wave_method'] is not None else 'linear'
[docs]
class EchelleCoAdd2D(CoAdd2D):
"""
Coadd Echelle reductions.
For documentation see :class:`CoAdd2D`.
Echelle can either stack with:
- input ``offsets`` or if ``offsets`` is None, it will find
the ``obj_id`` of brightest trace and stack all orders
relative to the trace of this object.
- specified ``weights``, or if ``weights`` is None and
``auto_weights`` is True, it will use wavelength dependent
weights determined from the spectrum of the brightest
objects ``obj_id`` on each order
"""
def __init__(self, spec2d_files, spectrograph, par, det=1,
only_slits=None, exclude_slits=None, sn_smooth_npix=None,
bkg_redux=False, find_negative=False, show=False, show_peaks=False, debug_offsets=False, debug=False):
super().__init__(spec2d_files, spectrograph, det=det,
only_slits=only_slits, exclude_slits=exclude_slits,
sn_smooth_npix=sn_smooth_npix, bkg_redux=bkg_redux, find_negative=find_negative,
par=par, show=show, show_peaks=show_peaks, debug_offsets=debug_offsets,
debug=debug)
[docs]
def handle_reference_obj(self):
"""
Method to handle the syntrax for the reference object to be used for offsets and weights.
"""
# If a user-input object to compute offsets and weights is provided, check if it exists and get the needed info
if len(self.stack_dict['specobjs_list']) > 0 and self.par['coadd2d']['user_obj_ids'] is not None:
if len(self.par['coadd2d']['user_obj_ids']) != self.nexp:
msgs.error(f'Parameter `user_obj_ids` {self.par["coadd2d"]["user_obj_ids"]} must have the same number '
f'of elements as exposures {self.nexp}.')
else:
# does it exists?
user_obj_exist = np.zeros((self.nexp,self.nslits_single), dtype=bool)
orders= self.stack_dict['slits_list'][0].slitord_id
for iexp, sobjs in enumerate(self.stack_dict['specobjs_list']):
for iord, ord in enumerate(orders):
# check if the object exists in this exposure
ind = sobjs.slitorder_uniq_id_indices(self.par['coadd2d']['user_obj_ids'][iexp], order=ord)
if (len(ind) == 0) or (not np.any(ind)):
msgs.error(f'Object with user_obj_id {self.par["coadd2d"]["user_obj_ids"][iexp]} does not exist in exposure {iexp+1} for order {ord}.')
flux, ivar, mask = self.unpack_specobj(sobjs[ind][0])
if flux is not None and ivar is not None and mask is not None:
user_obj_exist[iexp, iord] = True
if not np.all(user_obj_exist):
msgs.error('Object provided through `user_obj_ids` does not exist in all the exposures.')
# get the needed info about the user object
self.obj_id_bri = np.array(self.par['coadd2d']['user_obj_ids'])
elif len(self.stack_dict['specobjs_list']) > 0 and (self.par['coadd2d']['offsets'] == 'auto' or self.par['coadd2d']['weights'] == 'auto'):
self.obj_id_bri, self.snr_bar_bri = \
self.get_brightest_obj(self.stack_dict['specobjs_list'], self.stack_dict['slits_list'][0].slitord_id)
[docs]
def compute_offsets(self):
"""
Determine self.offsets, the offset of the frames to be coadded with respect to the first frame
Args:
offsets (:obj:`list` or :obj:`str`):
Value that guides the determination of the offsets.
It could be a list of offsets, or a string, or None.
"""
super().compute_offsets()
# adjustment for echelle to case 2): a list of offsets is provided by the user
if isinstance(self.offsets, (list, np.ndarray)):
self.obj_id_bri = None
# adjustment for echelle to case 4) parset `offsets` = 'auto' but we have a bright object
elif self.par['coadd2d']['offsets'] == 'auto' and self.obj_id_bri is not None:
# offsets are not determined, but the bright object is used to construct
# a reference trace (this is done in coadd using method `reference_trace_stack`)
self.offsets = None
if self.par['coadd2d']['user_obj_ids'] is not None:
msgs.info('Reference trace about which 2d coadd is performed is computed using user object')
else:
msgs.info('Reference trace about which 2d coadd is performed is computed using the brightest object')
[docs]
def compute_weights(self):
"""
Determine self.use_weights, the weights to be used in the coadd2d
Args:
weights (:obj:`list` or :obj:`str`):
Value that guides the determination of the weights.
It could be a list of weights or a string. If 'auto' the weight will be computed using
the brightest trace, if 'uniform' uniform weights will be used.
"""
super().compute_weights()
# adjustment for echelle to case 3) Bright object exists and parset `weights` is equal to 'auto'
if (self.obj_id_bri is not None) and (self.par['coadd2d']['weights'] == 'auto'):
# computing a list of weights for all the slitord_ids that we than parse in coadd
slitord_ids = self.stack_dict['slits_list'][0].slitord_id
self.use_weights = []
for order in slitord_ids:
_, iweights = self.optimal_weights(self.obj_id_bri, order=order)
self.use_weights.append(iweights)
if self.par['coadd2d']['user_obj_ids'] is not None:
msgs.info('Weights computed using a unique reference object provided by the user')
# TODO: implement something here to print out the snr_report
else:
msgs.info('Weights computed using a unique reference object with the highest S/N')
self.snr_report(self.snr_bar_bri)
[docs]
def _get_weights(self, indx=None):
"""
Method to select the correct weights for the selected slit/order.
This is partially overloaded by the child methods
Args:
indx (:obj:`int`, optional):
Index of the slit/order for which the weights are to be returned.
Returns:
:obj:`list`: List of weights to be used for 2D coadd. The length of the list
is equal to the number of exposures.
"""
# if this is echelle data and the parset 'weights' is set to 'auto',
# then the weights are computed per order, i.e., every order has a
# different set of weights in each exposure (len(self.use_weights[indx]) = nexp)
if self.par['coadd2d']['weights'] == 'auto' and indx is None:
msgs.error('The index of the slit/order must be provided when using auto weights for Echelle data.')
return self.use_weights[indx] if self.par['coadd2d']['weights'] == 'auto' else super()._get_weights()
[docs]
def get_brightest_obj(self, specobjs_list, orders):
"""
Utility routine to find the brightest object in each exposure given a specobjs_list for Echelle reductions.
Args:
specobjs_list: list
List of SpecObjs objects.
orders: `numpy.ndarray`_
Array of order ids for which the brightest object is to be found.
Returns:
tuple: Returns the following:
- fracpos_id: ndarray, int, shape (len(specobjs_list),):
Array of object ids representing the brightest object
in each exposure
- snr_bar: ndarray, float, shape (len(list),): Average
S/N over all the orders for this object
"""
msgs.info('Finding brightest object')
nexp = len(specobjs_list)
fracpos_id = np.zeros(nexp, dtype=int)
snr_bar = np.zeros(nexp)
for iexp, sobjs in enumerate(specobjs_list):
msgs.info("Working on exposure {}".format(iexp))
uni_fracpos_id = np.unique(sobjs.ECH_FRACPOS_ID)
nobjs = len(uni_fracpos_id)
order_snr = np.zeros((orders.size, nobjs), dtype=float)
bpm = np.ones((orders.size, nobjs), dtype=bool)
for iord, ord in enumerate(orders):
for iobj in range(nobjs):
ind = sobjs.slitorder_uniq_id_indices(uni_fracpos_id[iobj], order=ord)
flux, ivar, mask = self.unpack_specobj(sobjs[ind][0], spatord_id=sobjs[ind][0].ECH_ORDER)
if flux is not None and ivar is not None and mask is not None:
rms_sn, _ = coadd.calc_snr([flux], [ivar], [mask])
order_snr[iord, iobj] = rms_sn[0]
bpm[iord, iobj] = False
# If there are orders that have bpm = True for some objs and not for others, set bpm = True for all objs
# Find the rows where any of the bpm values are True
bpm_true_idx = np.array([np.any(b) for b in bpm])
if np.any(bpm_true_idx):
# Flag all objs in those rows
bpm[bpm_true_idx, :] = True
# Compute the average SNR and find the brightest object
if not np.all(bpm):
# mask the bpm
order_snr_masked = np.ma.array(order_snr, mask=bpm)
snr_bar_vec = np.mean(order_snr_masked, axis=0)
fracpos_id[iexp] = uni_fracpos_id[snr_bar_vec.argmax()]
snr_bar[iexp] = snr_bar_vec[snr_bar_vec.argmax()]
if 0 in snr_bar:
msgs.warn('You do not appear to have a unique reference object that was traced as the highest S/N '
'ratio for every exposure')
return None, None
return fracpos_id, snr_bar
[docs]
def snr_report(self, snr_bar):
"""
Printo out a SNR report for echelle 2D coadds.
Args:
snr_bar (:obj:`numpy.ndarray`):
Array of average S/N ratios for the brightest object in each exposure. Shape = (nexp,)
Returns:
"""
# Print out a report on the SNR
msg_string = msgs.newline() + '-------------------------------------'
msg_string += msgs.newline() + ' Summary for highest S/N object'
msg_string += msgs.newline() + '-------------------------------------'
msg_string += msgs.newline() + ' exp# S/N'
for iexp, snr in enumerate(snr_bar):
msg_string += msgs.newline() + ' {:d} {:5.2f}'.format(iexp, snr)
msg_string += msgs.newline() + '-------------------------------------'
msgs.info(msg_string)
[docs]
def reference_trace_stack(self, slitid, offsets=None, uniq_obj_id=None):
"""
Method to obtain the stack of reference traces for Echelle reductions.
There are two modes of operation to determine the reference
trace for the 2d coadd of a given slit/order:
#. ``offsets``: We stack about the center of the slit for
the slit in question with the input offsets added
#. ``uniq_obj_id``: We stack about the trace of a reference
object for this slit given for each exposure by the
input user_obj_ids
Either offsets or uniq_obj_id must be provided, but the code will
raise an exception if both are provided.
Parameters
----------
slitid: int
The slit/order ID for which the reference traces are to be obtained.
offsets: list, `numpy.ndarray`_, optional
List or array of offsets to apply to the reference traces.
It must have the same length as the number of exposures. Optional.
uniq_obj_id: list, `numpy.ndarray`_, optional
List or array of object IDs to use for the reference traces.
It must have the same length as the number of exposures.
This is the ``ECH_FRACPOS_ID`` attribute of SpecObj for echelle reductions. Optional.
Returns
-------
ref_trace_stack: list
List of reference traces for the slit/order specified by slitid. Each element in the list corresponds
to a different exposure and contains the reference trace for that exposure.
"""
# check inputs
if offsets is not None and uniq_obj_id is not None:
msgs.error('You can only input offsets or an uniq_obj_id, but not both')
if offsets is None and uniq_obj_id is None:
msgs.error('You must input either offsets or a uniq_obj_id to determine the stack of '
'reference traces')
# if offset is provided, we stack about the center of the slit
if isinstance(offsets, (list, np.ndarray)):
return self.offset_slit_cen(slitid, offsets)
# if uniq_obj_id is provided, we stack about the trace of the object
orders = self.stack_dict['slits_list'][0].slitord_id
specobjs_list = self.stack_dict['specobjs_list']
ref_trace_stack = []
for iexp, sobjs in enumerate(specobjs_list):
ithis = sobjs.slitorder_uniq_id_indices(uniq_obj_id[iexp], order=orders[slitid])
ref_trace_stack.append(sobjs[ithis][0].TRACE_SPAT)
return ref_trace_stack
[docs]
def wave_method(self):
"""
Returns the wavelength method used for the Echelle coadd2d.
Returns:
:obj:`str`: The wavelength method used for the Echelle coadd2d.
"""
return 'log10' if self.par['coadd2d']['wave_method'] is None else self.par['coadd2d']['wave_method']