Module for the SpecObj classes

import copy
import inspect
from IPython import embed

import numpy as np

from astropy import units

from linetools.spectra import xspectrum1d

from pypeit import msgs
from pypeit.core import flexure
from pypeit.core import flux_calib
from pypeit.core import parse
from pypeit import utils
from pypeit import datamodel
from pypeit.images.detector_container import DetectorContainer
from pypeit.images.mosaic import Mosaic

[docs] class SpecObj(datamodel.DataContainer): """ Class to handle single spectra from a single exposure. One generates one of these objects for each spectrum in the exposure. They are instantiated by the object finding routine, and then all spectral extraction information for the object are assigned as attributes The datamodel attributes are: .. include:: ../include/class_datamodel_specobj.rst Args: PYPELINE (:obj:`str`): Name of the ``PypeIt`` pipeline method. Allowed options are MultiSlit, Echelle, or SlicerIFU. DET (:obj:`str`): The name of the detector or mosaic from which the spectrum was extracted. For example, DET01. OBJTYPE (:obj:`str`, optional): Object type. For example: 'unknown', 'standard', 'science'. SLITID (:obj:`int`, optional): For multislit and SlicerIFU reductions, this is an identifier for the slit (max=9999). ECH_ORDER (:obj:`int`, optional): Physical order number. ECH_ORDERINDX (:obj:`int`, optional): Running index for the order. """ version = '1.1.11' """ Current datamodel version number. """ datamodel = {'TRACE_SPAT': dict(otype=np.ndarray, atype=float, descr='Object trace along the spec (spatial pixel)'), 'FWHM': dict(otype=float, descr='Spatial FWHM of the object (pixels)'), 'FWHMFIT': dict(otype=np.ndarray, atype=float, descr='Spatial FWHM across the detector (pixels)'), 'SPAT_FWHM': dict(otype=float, descr='Spatial FWHM of the object (arcsec)'), 'smash_peakflux': dict(otype=float, descr='Peak value of the spectral direction collapsed spatial profile'), 'smash_snr': dict(otype=float, descr='Peak S/N ratio of the spectral direction collapsed patial profile'), 'OPT_WAVE': dict(otype=np.ndarray, atype=float, descr='Optimal Wavelengths in vacuum (Angstroms)'), 'OPT_FLAM': dict(otype=np.ndarray, atype=float, descr='Optimal flux (1e-17 erg/s/cm^2/Ang)'), 'OPT_FLAM_SIG': dict(otype=np.ndarray, atype=float, descr='Optimal flux uncertainty (1e-17 erg/s/cm^2/Ang)'), 'OPT_FLAM_IVAR': dict(otype=np.ndarray, atype=float, descr='Optimal flux inverse variance (1e-17 erg/s/cm^2/Ang)^-2'), 'OPT_COUNTS': dict(otype=np.ndarray, atype=float, descr='Optimal flux (counts)'), 'OPT_COUNTS_IVAR': dict(otype=np.ndarray, atype=float, descr='Inverse variance of optimally extracted flux ' 'using modelivar image (counts^2)'), 'OPT_COUNTS_SIG': dict(otype=np.ndarray, atype=float, descr='Optimally extracted noise from IVAR (counts)'), 'OPT_COUNTS_NIVAR': dict(otype=np.ndarray, atype=float, descr='Optimally extracted noise variance, sky+read ' 'noise only (counts^2)'), 'OPT_FLAT': dict(otype=np.ndarray, atype=float, descr='Optimally extracted flatfield spectrum, normalised to the peak value.'), 'OPT_MASK': dict(otype=np.ndarray, atype=np.bool_, descr='Mask for optimally extracted flux. True=good'), 'OPT_FWHM': dict(otype=np.ndarray, atype=float, descr='Spectral FWHM (in Angstroms) at every pixel of the optimally extracted flux.'), 'OPT_COUNTS_SKY': dict(otype=np.ndarray, atype=float, descr='Optimally extracted sky (counts)'), 'OPT_COUNTS_SIG_DET': dict(otype=np.ndarray, atype=float, descr='Optimally extracted detector noise (counts)'), 'OPT_FRAC_USE': dict(otype=np.ndarray, atype=float, descr='Fraction of pixels in the object profile subimage ' 'used for this extraction'), 'OPT_CHI2': dict(otype=np.ndarray, atype=float, descr='Reduced chi2 of the model fit for this spectral pixel'), 'BOX_NPIX': dict(otype=np.ndarray, atype=float, descr='Number of pixels used for the boxcar extraction; can be ' 'fractional'), 'BOX_WAVE': dict(otype=np.ndarray, atype=float, descr='Boxcar Wavelengths in vacuum (Angstroms)'), 'BOX_FLAM': dict(otype=np.ndarray, atype=float, descr='Boxcar flux (erg/s/cm^2/Ang)'), 'BOX_FLAM_SIG': dict(otype=np.ndarray, atype=float, descr='Boxcar flux uncertainty (1e-17 erg/s/cm^2/Ang)'), 'BOX_FLAM_IVAR': dict(otype=np.ndarray, atype=float, descr='Boxcar flux inverse variance (1e-17 erg/s/cm^2/Ang)^-2'), 'BOX_COUNTS': dict(otype=np.ndarray, atype=float, descr='Boxcar flux (counts)'), 'BOX_COUNTS_IVAR': dict(otype=np.ndarray, atype=float, descr='Inverse variance of optimally extracted flux ' 'using modelivar image (counts^2)'), 'BOX_COUNTS_SIG': dict(otype=np.ndarray, atype=float, descr='Boxcar extracted noise from IVAR (counts)'), 'BOX_COUNTS_NIVAR': dict(otype=np.ndarray, atype=float, descr='Boxcar extracted noise variance, sky+read noise ' 'only (counts^2)'), 'BOX_FLAT': dict(otype=np.ndarray, atype=float, descr='Boxcar extracted flatfield spectrum, normalized to the peak value.'), 'BOX_MASK': dict(otype=np.ndarray, atype=np.bool_, descr='Mask for boxcar extracted flux. True=good'), 'BOX_FWHM': dict(otype=np.ndarray, atype=float, descr='Spectral FWHM (in Angstroms) at every pixel of the boxcar extracted flux.'), 'BOX_COUNTS_SKY': dict(otype=np.ndarray, atype=float, descr='Boxcar extracted sky (counts)'), 'BOX_COUNTS_SIG_DET': dict(otype=np.ndarray, atype=float, descr='Boxcar extracted detector noise (counts)'), 'BOX_FRAC_USE': dict(otype=np.ndarray, atype=float, descr='Fraction of pixels in the object profile subimage ' 'used for this extraction'), 'BOX_CHI2': dict(otype=np.ndarray, atype=float, descr='Reduced chi2 of the model fit for this spectral pixel'), 'BOX_RADIUS': dict(otype=float, descr='Size of boxcar radius (pixels)'), 'S2N': dict(otype=float, descr='Median signal to noise ratio of the extracted spectrum' '(OPT if available, otherwise BOX)'), # 'FLEX_SHIFT_GLOBAL': dict(otype=float, descr='Global shift of the spectrum to correct for spectral' 'flexure (pixels). This is based on the sky spectrum at' 'the center of the slit'), 'FLEX_SHIFT_LOCAL': dict(otype=float, descr='Local shift of the spectrum to correct for spectral' 'flexure (pixels). This should be a small correction to' 'the global value, and is based on the sky spectrum' 'extracted near the object'), 'FLEX_SHIFT_TOTAL': dict(otype=float, descr='Total shift of the spectrum to correct for spectral' 'flexure (pixels). This is the sum of the global and' 'local FLEX_SHIFT'), 'VEL_TYPE': dict(otype=str, descr='Type of heliocentric correction (if any)'), 'VEL_CORR': dict(otype=float, descr='Relativistic velocity correction for wavelengths'), # Detector # TODO: Change this to DETNAME? # NOTE: DET (or DETNAME) is needed in the case when DETECTOR is None. 'DET': dict(otype=str, descr='A string identifier for the reduced detector or mosaic.'), 'DETECTOR': dict(otype=(DetectorContainer, Mosaic), descr='Object with the detector or mosaic metadata'), 'PYPELINE': dict(otype=str, descr='Name of the PypeIt pipeline mode'), # TODO: It's unclear if OBJTYPE has to be one among a set of # specific keywords. 'OBJTYPE': dict(otype=str, descr='Object type (e.g., standard, science)'), 'SPAT_PIXPOS': dict(otype=(float, np.floating), descr='Spatial location of the trace on detector (pixel) at half-way'), 'SPAT_FRACPOS': dict(otype=(float, np.floating), descr='Fractional location of the object on the slit'), 'trace_spec': dict(otype=np.ndarray, atype=(int,np.integer), descr='Array of pixels along the spectral direction'), 'maskwidth': dict(otype=(float, np.floating), descr='Size (in units of spatial fwhm) of the region used for local sky subtraction'), # Slit and Object 'WAVE_RMS': dict(otype=(float, np.floating), descr='RMS (pix) for the wavelength solution for this slit.'), 'SLITID': dict(otype=(int, np.integer), descr='PypeIt slit ID (aka SPAT_ID).'), 'OBJID': dict(otype=(int, np.integer), descr='Object ID for multislit data. Each object is given an index ' 'for the slit it appears increasing from from left to right. ' 'These are one based.'), 'NAME': dict(otype=str, descr='Name of the object following the naming model'), 'RA': dict(otype=float, descr='Right Ascension (J2000) decimal degree'), 'DEC': dict(otype=float, descr='Declination (J2000) decimal degree'), 'MASKDEF_ID': dict(otype=(int, np.integer), descr='Slitmask definition ID'), 'MASKDEF_OBJNAME': dict(otype=str, descr='Name of the object from the slitmask definition'), 'MASKDEF_OBJMAG': dict(otype=float, descr='Magnitude of the object from the slitmask definition'), 'MASKDEF_OBJMAG_BAND': dict(otype=str, descr='Magnitude band of the object from the slitmask ' 'definition'), 'MASKDEF_EXTRACT': dict(otype=bool, descr='Boolean indicating if this is a forced extraction ' 'at the expected location from slitmask design. '), 'hand_extract_flag': dict(otype=bool, descr='Boolean indicating if this is a forced extraction ' 'at the location provided by the user. '), # 'ECH_OBJID': dict(otype=(int, np.integer), descr='Object ID for echelle data. Each object is given an ' 'index in the order it appears increasing from from left ' 'to right. These are one based.'), # TODO ECH_ORDERINDX should be purged. It is not reliable for anything given masking. Instead # one needs to use SLITID or ECH_ORDER 'ECH_ORDERINDX': dict(otype=(int, np.integer), descr='Order indx, analogous to SLITID for echelle. ' 'Zero based.'), 'ECH_FRACPOS': dict(otype=(float, np.floating), descr='Synced echelle fractional location of the object on ' 'the slit'), 'ECH_ORDER': dict(otype=(int, np.integer), descr='Physical echelle order'), 'ECH_NAME': dict(otype=str, descr='Name of the object for echelle data. Same as NAME above ' 'but order numbers are omitted giving a unique name per ' 'object.')} """ Defines the current datmodel. """ internals = [# Object finding 'smash_peakflux', 'smash_snr', # Hand 'hand_extract_flag', 'hand_extract_spec', 'hand_extract_spat', 'hand_extract_det', 'hand_extract_fwhm', # Object profile 'prof_nsigma', 'sign', 'min_spat', 'max_spat', # Echelle 'ech_frac_was_fit', 'ech_snr' ] def __init__(self, PYPELINE, DET, OBJTYPE='unknown', SLITID=None, ECH_ORDER=None, ECH_ORDERINDX=None): args, _, _, values = inspect.getargvalues(inspect.currentframe()) _d = dict([(k,values[k]) for k in args[1:]]) super().__init__(d=_d) # Initialize internal values that are not None self.hand_extract_flag = False self.sign = 1. self.FLEX_SHIFT_GLOBAL = 0. self.FLEX_SHIFT_LOCAL = 0. self.FLEX_SHIFT_TOTAL = 0. # Name self.set_name()
[docs] @classmethod def from_arrays(cls, PYPELINE:str, wave:np.ndarray, counts:np.ndarray, ivar:np.ndarray, flat=None, mode='OPT', DET='DET01', SLITID=0, **kwargs): # Instantiate slf = cls(PYPELINE, DET, SLITID=SLITID) # Check the type of the flat field if it's not None if flat is not None: if not isinstance(flat, np.ndarray): msgs.error('Flat must be a numpy array') if flat.shape != counts.shape: msgs.error('Flat and counts must have the same shape') # Add in arrays for item, attr in zip([wave, counts, ivar, flat], ['_WAVE', '_COUNTS', '_COUNTS_IVAR', '_FLAT']): # Check if any of the arrays are None. If so, skip if item is None: continue setattr(slf, mode+attr, item.astype(float)) # Mask. Watch out for places where ivar is infinite due to a divide by 0 slf[mode+'_MASK'] = (slf[mode+'_COUNTS_IVAR'] > 0.) & np.isfinite(slf[mode+'_COUNTS_IVAR']) return slf
[docs] def _validate(self): """ Validate the object. """ pypelines = ['MultiSlit', 'SlicerIFU', 'Echelle'] if self.PYPELINE not in pypelines: msgs.error(f'{self.PYPELINE} is not a known pipeline procedure. Options are: ' f"{', '.join(pypelines)}")
[docs] def _bundle(self, **kwargs): """ Override base class to handle inclusion of :class:`~pypeit.images.detector_container.DetectorContainer` or :class:`~pypeit.images.mosaic.Mosaic` objects for each spectrum. Args: kwargs (:obj:`dict`): Passed directly to the base class method. Returns: :obj:`list`: List of dictionaries used to construct fits extensions. """ # Use the base class for most of the data model _d = super()._bundle(**kwargs) # Separate out the detector into its own HDU if _d[0]['DETECTOR'] is not None: _d.append(dict(detector=_d[0].pop('DETECTOR'))) return _d
[docs] def to_hdu(self, **kwargs): """ Override the base class function to force the main datamodel attributes to be written to an ``_ object. This is identical to the base class method except ``force_to_bintbl`` is always set to True. """ if 'force_to_bintbl' in kwargs and not kwargs['force_to_bintbl']: msgs.warn(f'Writing a {self.__class__.__name__} always requires force_to_bintbl=True') del kwargs['force_to_bintbl'] return super().to_hdu(force_to_bintbl=True, **kwargs)
@property def slit_order(self): if self.PYPELINE == 'Echelle': return self.ECH_ORDER elif self.PYPELINE == 'MultiSlit': return self.SLITID elif self.PYPELINE == 'SlicerIFU': return self.SLITID else: msgs.error("Bad PYPELINE") @property def slit_orderindx(self): if self.PYPELINE == 'Echelle': return self.ECH_ORDERINDX elif self.PYPELINE == 'MultiSlit': return self.SLITID elif self.PYPELINE == 'SlicerIFU': return self.SLITID else: msgs.error("Bad PYPELINE") @property def mnx_wave(self): """Return min, max wavelength of the spectrum Uses OPT_WAVE if present and then BOX_WAVE Returns: tuple: min, max (float) """ mnx = (0., 0.) for pref in ['OPT', 'BOX']: if self[pref+'_WAVE'] is not None: mnx = self[pref+'_WAVE'].min(), self[pref+'_WAVE'].max() if mnx[0] != 0.: break return mnx
[docs] def med_s2n(self): """Return median S/N of the spectrum Uses OPT_COUNTS if present and then BOX_COUNTS Returns: float """ SN = 0. for pref in ['OPT', 'BOX']: if self[pref+'_COUNTS'] is not None and np.any(self[pref+'_MASK']): SN = np.median(self[pref+'_COUNTS'][self[pref+'_MASK']] * np.sqrt(self[pref+'_COUNTS_IVAR'][self[pref+'_MASK']])) if SN != 0.: break return SN
[docs] def med_fwhm(self): """Return median spatial FWHM of the spectrum Returns: float """ FWHM = 0. if self['FWHMFIT'] is not None and self['OPT_COUNTS'] is not None: _, binspatial = parse.parse_binning(self['DETECTOR']['binning']) FWHM = np.median(self['FWHMFIT']) * binspatial * self['DETECTOR']['platescale'] return FWHM
[docs] def set_name(self): """ Construct the ``PypeIt`` name for this object. The ``PypeIt`` name depends on the type of data being processed: - For multislit and SlicerIFU data, the name is ``SPATnnnn-SLITmmmm-{DET}``, where ``nnnn`` is the nearest integer pixel in the spatial direction (at the spectral midpoint) where the object was extracted, ``mmmm`` is the slit identification number, and ``{DET}`` is the string identifier for the detector or mosaic. - For echelle data, the name is ``OBJnnnn-{DET}-ORDERoooo``, where ``nnnn`` is 1000 times the fractional position along the spatial direction rounded to the nearest integer, ``{DET}`` is the string identifier for the detector or mosaic, and ``oooo`` is the echelle order. For echelle data, this modifies both :attr:`ECH_NAME` and :attr:`NAME`; otherwise, only the latter is set. """ naming_model = {} for skey in ['SPAT', 'SLIT', 'SCI', 'OBJ', 'ORDER']: naming_model[skey.lower()] = skey if self.PYPELINE == 'Echelle': # ObjID name = naming_model['obj'] ech_name = naming_model['obj'] if self['ECH_FRACPOS'] is None: name += '----' else: # JFH TODO Why not just write it out with the decimal place. That is clearer than this?? name += '{:04d}'.format(int(np.rint(1000*self.ECH_FRACPOS))) ech_name += '{:04d}'.format(int(np.rint(1000*self.ECH_FRACPOS))) name += f'-{self.DET}' ech_name += f'-{self.DET}' # Order number name += '-'+naming_model['order'] name += '{:04d}'.format(self.ECH_ORDER) self.ECH_NAME = ech_name self.NAME = name elif self.PYPELINE in ['MultiSlit', 'SlicerIFU']: # Spat name = naming_model['spat'] if self['SPAT_PIXPOS'] is None: name += '----' else: name += '{:04d}'.format(int(np.rint(self.SPAT_PIXPOS))) # Slit name += '-'+naming_model['slit'] name += '{:04d}'.format(self.SLITID) name += f'-{self.DET}' self.NAME = name else: msgs.error(f'{self.PYPELINE} is not an understood pipeline.')
[docs] def copy(self): """ Generate a copy of this object Returns: :class:`SpecObj`: """ # Return return copy.deepcopy(self)
[docs] def apply_spectral_flexure(self, shift, sky_spec): """ Apply interpolation with the flexure dict Args: shift (float): additive spectral flexure in pixels sky_spec (`linetools.spectra.xspectrum1d.XSpectrum1D`_): Sky Spectrum Returns: `linetools.spectra.xspectrum1d.XSpectrum1D`_: New sky spectrum (mainly for QA) """ # Simple interpolation to apply # Apply for attr in ['BOX', 'OPT']: if self[attr+'_WAVE'] is not None:"Applying flexure correction to {0:s} extraction for object:".format(attr) + msgs.newline() + "{0:s}".format(str(self.NAME))) self[attr+'_WAVE'] = flexure.flexure_interp(shift, self[attr+'_WAVE']).copy() # Shift sky spec too twave = flexure.flexure_interp(shift, sky_spec.wavelength.value) * units.AA new_sky = xspectrum1d.XSpectrum1D.from_tuple((twave, sky_spec.flux)) # Save - since flexure may have been applied/calculated twice, this needs to be additive self.update_flex_shift(shift, flex_type='local') # Return return new_sky
[docs] def update_flex_shift(self, shift, flex_type='local'): """Store the total spectral flexure shift in pixels Args: shift (float): additive spectral flexure in pixels """ if flex_type == 'global': self.FLEX_SHIFT_GLOBAL = shift elif flex_type == 'local': self.FLEX_SHIFT_LOCAL = shift else: msgs.error("Spectral flexure type must be 'global' or 'local' only") # Now update the total flexure self.FLEX_SHIFT_TOTAL += shift
[docs] def apply_flux_calib(self, wave_zp, zeropoint, exptime, tellmodel=None, extinct_correct=False, airmass=None, longitude=None, latitude=None, extinctfilepar=None, extrap_sens=False): """ Apply a sensitivity function to our spectrum FLAM, FLAM_SIG, and FLAM_IVAR are generated Args: wave_zp (`numpy.ndarray`_): Zeropoint wavelength array zeropoint (`numpy.ndarray`_): zeropoint array exptime (float): Exposure time tellmodel (?): Telluric correction. Note: This is deprecated and will be removed in a future version. extinct_correct (bool, optional): If True, extinction correct airmass (float, optional): Airmass longitude (float, optional): longitude in degree for observatory latitude (float, optional): latitude in degree for observatory Used for extinction correction extinctfilepar (str, optional): [sensfunc][UVIS][extinct_file] parameter Used for extinction correction extrap_sens (bool, optional): Extrapolate the sensitivity function (instead of crashing out) """ # Loop on extraction modes for attr in ['BOX', 'OPT']: if self[attr+'_WAVE'] is None: continue"Fluxing {:s} extraction for:".format(attr) + msgs.newline() + "{}".format(self)) wave = self[attr+'_WAVE'] # Interpolate the sensitivity function onto the wavelength grid of the data sens_factor = flux_calib.get_sensfunc_factor( wave, wave_zp, zeropoint, exptime, tellmodel=tellmodel, extinct_correct=extinct_correct, airmass=airmass, longitude=longitude, latitude=latitude, extinctfilepar=extinctfilepar, extrap_sens=extrap_sens) flam = self[attr+'_COUNTS']*sens_factor flam_sig = sens_factor/np.sqrt(self[attr+'_COUNTS_IVAR']) flam_ivar = self[attr+'_COUNTS_IVAR']/sens_factor**2 # Mask bad pixels" Masking bad pixels") msk = np.zeros_like(sens_factor).astype(bool) msk[sens_factor <= 0.] = True msk[self[attr+'_COUNTS_IVAR'] <= 0.] = True flam[msk] = 0. flam_sig[msk] = 0. flam_ivar[msk] = 0. # TODO JFH We need to update the mask here. I think we need a mask for the counts and a mask for the flam, # since they can in principle be different. We are masking bad sensfunc locations. # Finish self[attr+'_FLAM'] = flam self[attr+'_FLAM_SIG'] = flam_sig self[attr+'_FLAM_IVAR'] = flam_ivar
[docs] def apply_helio(self, vel_corr, refframe): """ Apply a heliocentric correction Wavelength arrays are modified in place Args: vel_corr (float): refframe (str): """ # Apply for attr in ['BOX', 'OPT']: if self[attr+'_WAVE'] is not None:'Applying {0} correction to '.format(refframe) + '{0} extraction for object:'.format(attr) + msgs.newline() + "{0}".format(str(self.NAME))) self[attr+'_WAVE'] *= vel_corr # Record self['VEL_TYPE'] = refframe self['VEL_CORR'] = vel_corr
[docs] def to_arrays(self, extraction='OPT', fluxed=True): """ Convert spectrum into np.ndarray arrays Args: extraction (str): Extraction method to convert fluxed: Use the fluxed tags Returns: tuple: wave, flux, ivar, mask arrays """ swave = extraction+'_WAVE' smask = extraction+'_MASK' if self[swave] is None: msgs.error("This object has not been extracted with extract={}.".format(extraction)) # Fluxed? if fluxed: sflux = extraction+'_FLAM' sivar = extraction+'_FLAM_IVAR' else: sflux = extraction+'_COUNTS' sivar = extraction+'_COUNTS_IVAR' # Return return self[swave], self[sflux], self[sivar], self[smask]
[docs] def to_xspec1d(self, masked=True, extraction='OPT', fluxed=True): """ Create an `XSpectrum1D <linetools.spectra.xspectrum1d.XSpectrum1D>`_ using this spectrum. Args: masked (:obj:`bool`, optional): If True, only unmasked data are included. extraction (str): Extraction method to convert fluxed: Use the fluxed tags Returns: `linetools.spectra.xspectrum1d.XSpectrum1D`_: Spectrum object """ wave, flux, ivar, gpm = self.to_arrays(extraction=extraction, fluxed=fluxed) sig = np.sqrt(utils.inverse(ivar)) wave_gpm = wave > 1.0 wave, flux, sig, gpm = wave[wave_gpm], flux[wave_gpm], sig[wave_gpm], gpm[wave_gpm] if masked: flux = flux*gpm sig = sig*gpm # Create return xspectrum1d.XSpectrum1D.from_tuple((wave, flux, sig))
[docs] def ready_for_extraction(self): """ Simple method to check all the items are filled and ready for skysub and extraction. Returns: bool: True if all checks have passed """ required = ['TRACE_SPAT', 'SPAT_PIXPOS', 'SPAT_FRACPOS', 'trace_spec', 'OBJID', 'FWHM', 'maskwidth', 'NAME', 'smash_peakflux', 'smash_snr', 'SLITID', 'DET', 'PYPELINE', 'OBJTYPE'] if 'Echelle' in self.PYPELINE: required += ['ECH_NAME'] passed = True for key in required: if self[key] is None: msgs.warn("Item {} is missing from SpecObj. Failing vette".format(key)) msgs.warn('{}'.format(self)) passed = False # return passed
[docs] def __repr__(self): """ Over-ride print representation Returns: str: Basics of the Data Container """ repr = '<{:s}: '.format(self.__class__.__name__) # Image rdict = {} for attr in self.datamodel.keys(): if hasattr(self, attr) and getattr(self, attr) is not None: # Special ones if attr in ['DET', 'SLITID', 'SPAT_PIXPOS', 'NAME', 'RA', 'DEC', 'MASKDEF_ID', 'MASKDEF_OBJNAME', 'MASKDEF_EXTRACT', 'MASKDEF_OBJMAG', 'MASKDEF_OBJMAG_BAND']: rdict[attr] = getattr(self,attr) else: rdict[attr] = True else: rdict[attr] = False #repr += ' items={}'.format(rdict) repr += ' items={' for key in rdict.keys(): if rdict[key] is not False: repr += '{}: {}\n'.format(key, rdict[key]) return repr + '>'
[docs] def has_opt_ext(self, fluxed=False): """ Check that all the values of the optimal extraction exist Args: fluxed (:obj:`bool`, optional): Check that the flux-calibrated data exist. Returns: :obj:`bool`: True if all OPT values are available """ flx = 'FLAM' if fluxed else 'COUNTS' return self.check_populated(['OPT_WAVE', f'OPT_{flx}', f'OPT_{flx}_IVAR', 'OPT_MASK'])
[docs] def get_opt_ext(self, fluxed=False): """ Return the optimal extraction values Args: fluxed (:obj:`bool`, optional): Return the flux-calibrated data. Returns: :obj:`tuple`: OPT_WAVE, OPT_COUNTS, OPT_COUNTS_IVAR, OPT_MASK attributes of SpecObj """ if fluxed: # TODO: This does not check if the fluxed data exists! return self.OPT_WAVE, self.OPT_FLAM, self.OPT_FLAM_IVAR, self.OPT_MASK return self.OPT_WAVE, self.OPT_COUNTS, self.OPT_COUNTS_IVAR, self.OPT_MASK
[docs] def has_box_ext(self, fluxed=False): """ Check that all the values of the boxcar extraction exist Args: fluxed (:obj:`bool`, optional): Check that the flux-calibrated data exist. Returns: :obj:`bool`: True if all BOX values are available """ flx = 'FLAM' if fluxed else 'COUNTS' return self.check_populated(['BOX_WAVE', f'BOX_{flx}', f'BOX_{flx}_IVAR', 'BOX_MASK'])
[docs] def get_box_ext(self, fluxed=False): """ Return the boxcar extraction values Args: fluxed (:obj:`bool`, optional): Return the flux-calibrated data. Returns: :obj:`tuple`: BOX_WAVE, BOX_COUNTS, BOX_COUNTS_IVAR, BOX_MASK attributes of SpecObj """ if fluxed: # TODO: This does not check if the fluxed data exists! return self.BOX_WAVE, self.BOX_FLAM, self.BOX_FLAM_IVAR, self.BOX_MASK return self.BOX_WAVE, self.BOX_COUNTS, self.BOX_COUNTS_IVAR, self.BOX_MASK
[docs] def best_ext_match(self, extract=None, fluxed=True): """ Determine the extraction and calibration type that best matches a user request. Precedence is given to the requested extraction and calibration types. Beyond that, optimal extraction takes precedence over boxcar extraction, and flux-calibrated data take precedence over uncalibrated counts. Args: extract (:obj:`str`, optional): The extraction used to produce the spectrum. Must be either None, ``'BOX'`` (for a boxcar extraction), or ``'OPT'`` for optimal extraction. If None, the optimal extraction will be returned, if it exists, otherwise the boxcar extraction will be returned. fluxed (:obj:`bool`, optional): If True, return the flux calibrated spectrum, if it exists. If the flux calibration hasn't been performed or ``fluxed=False``, the spectrum is returned in counts. Returns: :obj:`tuple`: The adjusted extraction type (``'BOX'`` or ``'OPT'``) and the adjusted calibration type (True for flux-calibrated, False for uncalibrated counts). """ # If not set, prefer the optimal extraction over the boxcar one. _extract = 'OPT' if extract is None else extract if _extract not in ['OPT', 'BOX']: msgs.error(f'Extraction type ({_extract}) not understood; must be OPT or BOX.') if _extract == 'OPT': if self.has_opt_ext(fluxed=fluxed): return 'OPT', fluxed # If we make it here, expect that fluxed was True. Try flipping it. if self.has_opt_ext(fluxed=False): return 'OPT', False # If we make it here, either extract was BOX from the start, or none of # the optimal extraction options were available if self.has_box_ext(fluxed=fluxed): return 'BOX', fluxed # If we make it here, assume fluxed was True. Try flipping it. if self.has_box_ext(fluxed=False): return 'BOX', False # If we make it here, we've got a problem! msgs.error('Unable to find a relevant set of data!')