"""
Module for the SpecObjs and SpecObj classes
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import os
from pathlib import Path
import re
from typing import List
from IPython import embed
import numpy as np
from astropy import units
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from pypeit import msgs
from pypeit import specobj
from pypeit import io
from pypeit.spectrographs.util import load_spectrograph
from pypeit.core import parse
from pypeit.images.detector_container import DetectorContainer
# NOTE: Mosaic cannot be found in this module explicitly, but it is used in
# statements like: dmodcls = eval(hdu.header['DMODCLS'])
from pypeit.images.mosaic import Mosaic
from pypeit import utils
# TODO: Make this a DataContainer
[docs]
class SpecObjs:
"""
Object to hold a set of :class:`~pypeit.specobj.SpecObj` objects
Note that this class overloads:
- ``__getitem__`` to allow one to pull an attribute or a portion
of the SpecObjs list
- ``__setattr__`` to force a custom assignment method
- ``__getattr__`` to generate an array of attribute 'k' from the
specobjs.
Args:
specobjs (`numpy.ndarray`_, list, optional):
One or more :class:`~pypeit.specobj.SpecObj` objects
header (`astropy.io.fits.Header`_, optional):
Baseline header to use
Attributes:
summary (`astropy.table.Table`_):
Summary table (?)
"""
version = '1.0.0'
#TODO JFH This method only populates some of the underlying specobj attributes, for example RA and DEC are not
# getting set. We should do our best to populate everything that got written out to the file. This is part of having
# a rigid data model.
[docs]
@classmethod
def from_fitsfile(cls, fits_file, det=None, chk_version=True):
"""
Instantiate from a spec1d FITS file
Also tag on the Header
Args:
fits_file (:obj:`str`):
The name of the fits file to read.
det (:obj:`str`, optional):
The string identifier for a detector or mosaic used to select
the loaded spectra. If None, all spectra are loaded.
chk_version (:obj:`bool`, optional):
If False, allow a mismatch in datamodel to proceed
Returns:
:class:`~pypeit.specobjs.SpecObjs`: The loaded spectra from the
provided fits file.
"""
# HDUList
with io.fits_open(fits_file) as hdul:
# Init
slf = cls()
# Add on the header
slf.header = hdul[0].header
# Catch common error of trying to read a OneSpec file
if 'DMODCLS' in hdul[1].header and hdul[1].header['DMODCLS'] == 'OneSpec':
msgs.error('This is a OneSpec file. You are treating it like a SpecObjs file.')
# Load the calibration association into the instance attribute `calibs`
if 'CLBS_DIR' in slf.header:
slf.calibs = {}
slf.calibs['DIR'] = slf.header['CLBS_DIR']
for key in slf.header.keys():
if key.startswith('CLBS_') \
and (Path(slf.calibs['DIR']).absolute() / slf.header[key]).exists():
slf.calibs['_'.join(key.split('_')[1:])] = slf.header[key]
detector_hdus = {}
# Loop for Detectors first as we need to add these to the objects
for hdu in hdul[1:]:
if 'DETECTOR' not in hdu.name:
continue
if 'DMODCLS' not in hdu.header:
msgs.error('HDUs with DETECTOR in the name must have DMODCLS in their header.')
try:
dmodcls = eval(hdu.header['DMODCLS'])
except:
msgs.error(f"Unknown detector type datamodel class: {hdu.header['DMODCLS']}")
# NOTE: This requires that any "detector" datamodel class has a
# from_hdu method, and the name of the HDU must have a known format
# (e.g., 'DET01-DETECTOR').
_det = hdu.name.split('-')[0]
detector_hdus[_det] = dmodcls.from_hdu(hdu, chk_version=chk_version)
# Now the objects
for hdu in hdul[1:]:
if 'DETECTOR' in hdu.name:
continue
sobj = specobj.SpecObj.from_hdu(hdu, chk_version=chk_version)
# Restrict on det?
if det is not None and sobj.DET != det:
continue
# Check for detector
if sobj.DET in detector_hdus.keys():
sobj.DETECTOR = detector_hdus[sobj.DET]
# Append
slf.add_sobj(sobj)
# Return
return slf
def __init__(self, specobjs=None, header=None):
# Only two attributes are allowed for this Object -- specobjs, header
if specobjs is None:
self.specobjs = np.array([])
else:
if isinstance(specobjs, (list, np.ndarray)):
specobjs = np.array(specobjs)
self.specobjs = specobjs
self.header = header
self.hdul = None
self.calibs = None
# Turn off attributes from here
# Anything else set will be on the individual specobj objects in the specobjs array
self.__initialised = True
[docs]
def __setattr__(self, item, value):
"""
Define a custom assignment method.
Args:
item (str):
Item to set
value (object):
Value of the item
"""
if not '_SpecObjs__initialised' in self.__dict__: # this test allows attributes to be set in the __init__ method
return dict.__setattr__(self, item, value)
elif item in self.__dict__: # any normal attributes are handled normally
dict.__setattr__(self, item, value)
else:
# Special handling when the input is an array/list and the length matches that of the slice
if isinstance(value, (list, np.ndarray)):
if len(self.specobjs) == len(value): # Assume these are to be paired up
for kk, specobj in enumerate(self.specobjs):
setattr(specobj, item, value[kk])
return
#
for specobj in self.specobjs:
setattr(specobj, item, value)
@property
def nobj(self):
"""
Return the number of SpecObj objects
Returns:
int
"""
return len(self.specobjs)
[docs]
def unpack_object(self, ret_flam=False, log10blaze=False, min_blaze_value=1e-3, extract_type='OPT',
extract_blaze=False, remove_missing=False):
"""
Utility function to unpack the sobjs for one object and
return various numpy arrays describing the spectrum and meta
data. The user needs to already have trimmed the :class:`SpecObjs` to
the relevant indices for the object.
Args:
ret_flam (:obj:`bool`, optional):
If True return the FLAM, otherwise return COUNTS.
log10blaze (:obj:`bool`, optional):
If True return the log10 of the blaze function.
min_blaze_value (:obj:`float`, optional):
Minimum value of the blaze function to consider as good.
extract_type (:obj:`str`, optional):
Extraction type to use. Default is 'OPT'.
extract_blaze (:obj:`bool`, optional):
If True, extract the blaze function. Default is False.
remove_missing (:obj:`bool`, optional):
If True, remove any missing data (i.e. where the flux is None).
Default is False.
Returns:
tuple: Returns the following where all numpy arrays
returned have shape (nspec, norders) for Echelle data and
(nspec,) for Multislit data.
- wave (`numpy.ndarray`_): Wavelength grids
- flux (`numpy.ndarray`_): Flambda or counts
- flux_ivar (`numpy.ndarray`_): Inverse variance (of
Flambda or counts)
- flux_gpm (`numpy.ndarray`_): Good pixel mask.
True=Good
- blaze (`numpy.ndarray`_, None): Blaze function
- meta_spec (dict:) Dictionary containing meta data.
The keys are defined by
spectrograph.parse_spec_header()
- header (astropy.io.header object): header from
spec1d file
"""
# Prep
flux_attr = 'FLAM' if ret_flam else 'COUNTS'
flux_key = '{}_{}'.format(extract_type, flux_attr)
wave_key = '{}_WAVE'.format(extract_type)
blaze_key = '{}_FLAT'.format(extract_type)
# Check for missing data
none_flux = [f is None for f in getattr(self, flux_key)]
if np.any(none_flux):
other = 'OPT' if extract_type == 'BOX' else 'BOX'
msg = f"{extract_type} extracted flux is not available for all slits/orders. " \
f"Consider trying the {other} extraction."
if not remove_missing:
msgs.error(msg)
else:
msg += f"{msgs.newline()}-- The missing data will be removed --"
msgs.warn(msg)
# Remove missing data
r_indx = np.where(none_flux)[0]
self.remove_sobj(r_indx)
#
norddet = self.nobj
nspec = getattr(self, flux_key)[0].size
# Allocate arrays and unpack spectrum
wave = np.zeros((nspec, norddet))
flux = np.zeros((nspec, norddet))
flux_ivar = np.zeros((nspec, norddet))
flux_gpm = np.zeros((nspec, norddet), dtype=bool)
if extract_blaze:
blaze = np.zeros((nspec, norddet), dtype=float)
detector = [None]*norddet
ech_orders = np.zeros(norddet, dtype=int)
for iorddet in range(norddet):
wave[:, iorddet] = getattr(self, wave_key)[iorddet]
flux_gpm[:, iorddet] = getattr(self, '{}_MASK'.format(extract_type))[iorddet]
detector[iorddet] = self[iorddet].DET
if self[0].PYPELINE == 'Echelle':
ech_orders[iorddet] = self[iorddet].ECH_ORDER
flux[:, iorddet] = getattr(self, flux_key)[iorddet]
flux_ivar[:, iorddet] = getattr(self, flux_key+'_IVAR')[iorddet]
if extract_blaze:
blaze[:, iorddet] = getattr(self, blaze_key)[iorddet]
# Log10 blaze
if extract_blaze:
blaze_function = np.copy(blaze)
if log10blaze:
for iorddet in range(norddet):
blaze_function_smooth = utils.fast_running_median(blaze[:, iorddet], 5)
blaze_function_norm = blaze_function_smooth / blaze_function_smooth.max()
blaze_function[:, iorddet] = np.log10(np.clip(blaze_function_norm, min_blaze_value, None))
else:
blaze_function = None
# Populate meta data
spectrograph = load_spectrograph(self.header['PYP_SPEC'])
meta_spec = spectrograph.parse_spec_header(self.header)
# Add the pyp spec.
# TODO JFH: Make this an atribute of the specobj by default.
meta_spec['PYP_SPEC'] = self.header['PYP_SPEC']
meta_spec['PYPELINE'] = self[0].PYPELINE
meta_spec['DET'] = np.array(detector)
meta_spec['DISPNAME'] = self.header['DISPNAME']
# Return
if self[0].PYPELINE in ['MultiSlit', 'SlicerIFU'] and self.nobj == 1:
meta_spec['ECH_ORDERS'] = None
blaze_ret = blaze_function.reshape(nspec) if extract_blaze else None
return wave.reshape(nspec), flux.reshape(nspec), flux_ivar.reshape(nspec), \
flux_gpm.reshape(nspec), blaze_ret, meta_spec, self.header
else:
meta_spec['ECH_ORDERS'] = ech_orders
return wave, flux, flux_ivar, flux_gpm, blaze_function, meta_spec, self.header
[docs]
def get_std(self, multi_spec_det=None):
"""
Return the standard star from this :class:`SpecObjs`. For MultiSlit this
will be a single specobj in SpecObjs container, for Echelle it
will be the standard for all the orders.
Args:
multi_spec_det (list):
If there are multiple detectors arranged in the spectral
direction, return the sobjs for the standard on each detector.
Returns:
SpecObj or SpecObjs or None
"""
# Is this MultiSlit or Echelle
pypeline = (self.PYPELINE)[0]
if 'MultiSlit' in pypeline or 'SlicerIFU' in pypeline:
# Have to do a loop to extract the counts for all objects
if self.OPT_COUNTS[0] is not None:
SNR = np.median(self.OPT_COUNTS * np.sqrt(self.OPT_COUNTS_IVAR), axis=1)
elif self.BOX_COUNTS[0] is not None:
SNR = np.median(self.BOX_COUNTS * np.sqrt(self.BOX_COUNTS_IVAR), axis=1)
else:
return None
# For multiple detectors grab the requested detectors
if multi_spec_det is not None:
# TODO: This is a hack assuming the integers in multi_spec_det
# are for *detectors*, not mosaics.
if any([isinstance(d, int) for d in multi_spec_det]):
_multi_spec_det = [DetectorContainer.get_name(d) if isinstance(d, int) else d
for d in multi_spec_det]
else:
_multi_spec_det = multi_spec_det
sobjs_std = SpecObjs(header=self.header)
# Now append the maximum S/N object on each detector
for idet in _multi_spec_det:
this_det = self.DET == idet
if not np.any(this_det):
unique_det = np.unique(self.DET)
msgs.error(f'No matches for {idet} in spec1d file. Unique options found'
f"are {', '.join(unique_det)}. Check usage of multi_spec_det.")
istd = SNR[this_det].argmax()
sobjs_std.add_sobj(self[this_det][istd])
else: # For normal multislit take the brightest object
istd = SNR.argmax()
# Return
sobjs_std = SpecObjs(specobjs=[self[istd]], header=self.header)
sobjs_std.header = self.header
return sobjs_std
elif 'Echelle' in pypeline:
uni_objid = np.unique(self.ECH_FRACPOS) # A little risky using floats
uni_order = np.unique(self.ECH_ORDER)
nobj = len(uni_objid)
norders = len(uni_order)
# Build up S/N
SNR = np.zeros((norders, nobj))
for iobj in range(nobj):
for iord in range(norders):
ind = (self.ECH_FRACPOS == uni_objid[iobj]) & (self.ECH_ORDER == uni_order[iord])
spec = self[ind]
# Grab SNR
if spec[0].OPT_COUNTS is not None:
SNR[iord, iobj] = np.median(spec[0].OPT_COUNTS*np.sqrt(spec[0].OPT_COUNTS_IVAR))
elif spec[0].BOX_COUNTS is not None:
SNR[iord, iobj] = np.median(spec[0].BOX_COUNTS * np.sqrt(spec[0].BOX_COUNTS_IVAR))
else:
return None
# Maximize S/N
SNR_all = np.sqrt(np.sum(SNR**2,axis=0))
objid_std = uni_objid[SNR_all.argmax()]
# Finish
indx = self.ECH_FRACPOS == objid_std
# Return
sobjs_std = SpecObjs(specobjs=self[indx], header=self.header)
sobjs_std.header = self.header
return sobjs_std
else:
msgs.error('Unknown pypeline')
[docs]
def append_neg(self, sobjs_neg):
"""
Append negative objects and change the sign of their objids for IR reductions
Args:
sobjs_neg (SpecObjs):
"""
if sobjs_neg.nobj == 0:
msgs.warn("No negative objects found...")
return
# Assign the sign and the objids
sobjs_neg.sign = -1.0
if sobjs_neg[0].PYPELINE == 'Echelle':
sobjs_neg.ECH_OBJID = -sobjs_neg.ECH_OBJID
sobjs_neg.OBJID = -sobjs_neg.OBJID
elif sobjs_neg[0].PYPELINE == 'MultiSlit':
sobjs_neg.OBJID = -sobjs_neg.OBJID
elif sobjs_neg[0].PYPELINE == 'SlicerIFU':
sobjs_neg.OBJID = -sobjs_neg.OBJID
else:
msgs.error("The '{0:s}' PYPELINE is not defined".format(self[0].PYPELINE))
self.add_sobj(sobjs_neg)
# Sort objects according to their spatial location. Necessary for the extraction to properly work
if self.nobj > 0:
self.specobjs = self.specobjs[np.argsort(self.SPAT_PIXPOS)]
[docs]
def purge_neg(self):
"""
Purge negative objects from specobjs for IR reductions
"""
# Assign the sign and the objids
if self.nobj > 0:
if self[0].PYPELINE == 'Echelle':
index = self.ECH_OBJID < 0
elif self[0].PYPELINE == 'MultiSlit':
index = self.OBJID < 0
elif self[0].PYPELINE == 'SlicerIFU':
index = self.OBJID < 0
else:
msgs.error("The '{0:s}' PYPELINE is not defined".format(self[0].PYPELINE))
self.remove_sobj(index)
[docs]
def make_neg_pos(self):
"""
Purge negative objects from specobjs for IR reductions
"""
# Assign the sign and the objids
if self.nobj > 0:
if self[0].PYPELINE == 'Echelle':
index = self.ECH_OBJID < 0
elif self[0].PYPELINE == 'MultiSlit':
index = self.OBJID < 0
elif self[0].PYPELINE == 'SlicerIFU':
index = self.OBJID < 0
else:
msgs.error("Should not get here")
try:
self[index].OPT_COUNTS *= -1
except (TypeError,ValueError):
pass
try:
self[index].BOX_COUNTS *= -1
except (TypeError,ValueError):
pass
[docs]
def slitorder_indices(self, slitorder):
"""
Return the set of indices matching the input slit/order
Args:
slitorder (int):
Returns:
int:
"""
if self[0].PYPELINE == 'Echelle':
indx = self.ECH_ORDER == slitorder
elif self[0].PYPELINE == 'MultiSlit':
indx = self.SLITID == slitorder
elif self[0].PYPELINE == 'SlicerIFU':
indx = self.SLITID == slitorder
else:
msgs.error("The '{0:s}' PYPELINE is not defined".format(self[0].PYPELINE))
#
return indx
[docs]
def name_indices(self, name):
"""
Return the set of indices matching the input slit/order
Args:
name (str): The name of the object
Returns:
`numpy.ndarray`_: Array of indices with the corresponding
name. Shape is (nobj,).
"""
if self[0].PYPELINE == 'Echelle':
indx = self.ECH_NAME == name
elif self[0].PYPELINE == 'MultiSlit':
indx = self.NAME == name
elif self[0].PYPELINE == 'SlicerIFU':
indx = self.NAME == name
else:
msgs.error("The '{0:s}' PYPELINE is not defined".format(self[0].PYPELINE))
return indx
[docs]
def slitorder_objid_indices(self, slitorder, objid, toler=5):
"""
Return the set of indices matching the input slit/order and the input
objid
Args:
slitorder (int):
Order/Spatial pixel value for slit of interest.
objid (int):
ID value for object of interest.
toler (int, optional):
Tolerance for slit spatial pixel values used for slit
identification. Default = 5
Returns:
:obj:`int`: Index value for input slit/order and object ID values
for specobjs object.
"""
if self[0].PYPELINE == 'Echelle':
indx = (self.ECH_ORDER == slitorder) & (self.ECH_OBJID == objid)
elif self[0].PYPELINE == 'MultiSlit':
indx = (np.abs(self.SLITID - slitorder) <= toler) & (self.OBJID == objid)
elif self[0].PYPELINE == 'SlicerIFU':
indx = (self.SLITID == slitorder) & (self.OBJID == objid)
else:
msgs.error("The '{0:s}' PYPELINE is not defined".format(self[0].PYPELINE))
#
return indx
[docs]
def set_names(self):
"""
Simple method to (re)set the names of all the SpecObj
"""
for sobj in self.specobjs:
sobj.set_name()
[docs]
def add_sobj(self, sobj):
"""
Append one or more SpecObj to the existing set.
Args:
sobj (:class:`~pypeit.specobj.SpecObj`, :class:`~pypeit.specobjs.SpecObjs`, :obj:`list`, `numpy.ndarray`_):
One or more SpecObj objects to append. If a list or array, the
elements of these must be instances of
:class:`~pypeit.specobj.SpecObj`.
"""
if isinstance(sobj, specobj.SpecObj):
self.specobjs = np.append(self.specobjs, [sobj])
return
if isinstance(sobj, SpecObjs):
# TODO: Possible to instead do np.append(self.specobjs, sobj.specobjs)?
for isobj in sobj:
self.specobjs = np.append(self.specobjs, isobj)
return
if not isinstance(sobj, (np.ndarray, list)):
msgs.error(f'Unable to add {type(sobj)} objects to SpecObjs')
if any([not isinstance(s, specobj.SpecObj) for s in sobj]):
msgs.error('List or arrays of objects to add must all be of type SpecObj.')
self.specobjs = np.append(self.specobjs, sobj)
[docs]
def remove_sobj(self, index):
"""
Remove one or more SpecObj by index
Args:
index (int, `numpy.ndarray`_):
"""
msk = np.ones(self.specobjs.size, dtype=bool)
msk[index] = False
# Do it
self.specobjs = self.specobjs[msk]
[docs]
def ready_for_fluxing(self):
# Fluxing
required_header = ['EXPTIME', 'AIRMASS'] # These are sufficient to apply a sensitivity function
required_header += ['DISPNAME', 'PYP_SPEC', 'RA', 'DEC'] # These are to generate one
required_for_fluxing = ['_WAVE', '_COUNTS', '_IVAR']
chk = True
# Check header
for key in required_header:
chk &= key in self.header
for sobj in self.specobjs:
sub_box, sub_opt = True, True
if sobj is not None:
# Only need one of these but need them all
for item in required_for_fluxing:
if hasattr(sobj, 'BOX'+item):
sub_box &= True
if hasattr(sobj, 'OPT'+item):
sub_opt &= True
# chk
chk &= (sub_box or sub_opt)
return chk
[docs]
def apply_flux_calib(self, par, spectrograph, sens):
"""
Flux calibrate the object spectra (``sobjs``) using the provided
sensitivity function (``sens``).
Args:
par (:class:`~pypeit.par.pypeitpar.FluxCalibratePar`):
Parset object containing parameters governing the flux calibration.
spectrograph (:class:`~pypeit.spectrographs.spectrograph.Spectrograph`):
PypeIt Spectrograph class
sens (:class:`~pypeit.sensfunc.SensFunc`):
PypeIt Sensitivity function class
"""
_extinct_correct = (True if sens.algorithm == 'UVIS' else False) \
if par['extinct_correct'] is None else par['extinct_correct']
# TODO enbaling this for now in case someone wants to treat the IFU as a slit spectrograph
# (not recommnneded but useful for quick reductions where you don't want to construct cubes and don't care about DAR).
if spectrograph.pypeline in ['MultiSlit','SlicerIFU']:
for ii, sci_obj in enumerate(self.specobjs):
if sens.wave.shape[1] == 1:
sci_obj.apply_flux_calib(sens.wave[:, 0], sens.zeropoint[:, 0],
self.header['EXPTIME'],
extinct_correct=_extinct_correct,
longitude=spectrograph.telescope['longitude'],
latitude=spectrograph.telescope['latitude'],
extinctfilepar=par['extinct_file'],
extrap_sens=par['extrap_sens'],
airmass=float(self.header['AIRMASS']))
elif sens.wave.shape[1] > 1 and sens.splice_multi_det:
# This deals with the multi detector case where the sensitivity function is spliced. Note that
# the final sensitivity function written to disk is the spliced one. This functionality is only
# used internal to sensfunc.py for fluxing the standard for the QA plot.
sci_obj.apply_flux_calib(sens.wave[:, ii], sens.zeropoint[:, ii],
self.header['EXPTIME'],
extinct_correct=_extinct_correct,
longitude=spectrograph.telescope['longitude'],
latitude=spectrograph.telescope['latitude'],
extinctfilepar=par['extinct_file'],
extrap_sens=par['extrap_sens'],
airmass=float(self.header['AIRMASS']))
else:
msgs.error('This should not happen, there is a problem with your sensitivity function.')
elif spectrograph.pypeline == 'Echelle':
# Flux calibrate the orders that are mutually in the meta_table and in
# the sobjs. This allows flexibility for applying to data for cases
# where not all orders are present in the data as in the sensfunc, etc.,
# i.e. X-shooter with the K-band blocking filter.
ech_orders = np.array(sens.sens['ECH_ORDERS']).flatten()
for sci_obj in self.specobjs:
# JFH Is there a more elegant pythonic way to do this without looping over both orders and sci_obj?
indx = np.where(ech_orders == sci_obj.ECH_ORDER)[0]
if indx.size == 1:
sci_obj.apply_flux_calib(sens.wave[:, indx[0]], sens.zeropoint[:, indx[0]],
self.header['EXPTIME'],
extinct_correct=_extinct_correct,
extrap_sens=par['extrap_sens'],
longitude=spectrograph.telescope['longitude'],
latitude=spectrograph.telescope['latitude'],
extinctfilepar=par['extinct_file'],
airmass=float(self.header['AIRMASS']))
elif indx.size == 0:
msgs.info('Unable to flux calibrate order = {:} as it is not in your sensitivity function. '
'Something is probably wrong with your sensitivity function.'.format(sci_obj.ECH_ORDER))
else:
msgs.error('This should not happen')
else:
msgs.error('Unrecognized pypeline: {0}'.format(spectrograph.pypeline))
[docs]
def copy(self):
"""
Generate a copy of self
Returns:
:class:`SpecObjs`:
"""
sobj_copy = SpecObjs(header=self.header)
for sobj in self.specobjs:
sobj_copy.add_sobj(sobj.copy())
return sobj_copy
[docs]
def __getitem__(self, item):
"""
Overload to allow one to pull an attribute or a portion of the
SpecObjs list
Args:
item (:obj:`str`, :obj:`int`, :obj:`slice`)
Returns:
The selected items as either an object,
:class:`pypeit.specobj.SpecObj`, or
:class:`pypeit.specobjs.SpecObjs`, depending on the input
item.
"""
if isinstance(item, str):
return self.__getattr__(item)
elif isinstance(item, (int, np.integer)):
return self.specobjs[item] # TODO Is this using pointers or creating new data????
elif (isinstance(item, slice) or # Stolen from astropy.table
isinstance(item, np.ndarray) or
isinstance(item, list) or
isinstance(item, tuple) and all(isinstance(x, np.ndarray) for x in item)):
# here for the many ways to give a slice; a tuple of ndarray
# is produced by np.where, as in t[np.where(t['a'] > 2)]
# For all, a new table is constructed with slice of all columns
return SpecObjs(specobjs=self.specobjs[item], header=self.header)
[docs]
def __getattr__(self, attr):
"""
Overloaded to generate an array of attribute 'k' from the
:class:`pypeit.specobj.SpecObj` objects.
First attempts to grab data from the Summary table, then the list
"""
if len(self.specobjs) == 0:
raise ValueError('SpecObjs is empty!')
if attr in self.__dict__: # any normal attributes are handled normally
return self.__dict__[attr]
try:
getattr(self.specobjs[0], attr)
except NameError:
raise NameError(f'{attr} is not an attribute of SpecObjs or SpecObj.')
return lst_to_array([getattr(specobj, attr) for specobj in self.specobjs])
# Printing
def __repr__(self):
txt = '<{:s}:'.format(self.__class__.__name__)
if self.nobj == 0:
txt += "Empty SpecObjs"
else:
txt += '\n'
for sobj in self.specobjs:
txt += ' {} \n'.format(sobj)
txt += '>'
return txt
def __len__(self):
return len(self.specobjs)
@property
def size(self):
return self.specobjs.size
@property
def shape(self):
return self.specobjs.shape
[docs]
def write_to_fits(self, subheader, outfile, overwrite=True, update_det=None,
slitspatnum=None, history=None, debug=False):
"""
Write the set of SpecObj objects to one multi-extension FITS file
Args:
subheader (:obj:`dict`):
Dictionary with header keywords and values to be added to the
primary header of the output file.
outfile (str):
Name of the output file
overwrite (bool, optional):
Overwrite the output file if it exists?
update_det (int or list, optional):
If provided, do not clobber the existing file but only update
the indicated detectors. Useful for re-running on a subset of detectors
slitspatnum (:obj:`str` or :obj:`list`, optional):
Restricted set of slits for reduction.
If provided, do not clobber the existing file but only update
the indicated slits. Useful for re-running on a subset of slits
history (:obj:`str`, optional):
String to be added to the header HISTORY keyword.
debug (:obj:`bool`, optional):
If True, run in debug mode.
"""
if os.path.isfile(outfile) and not overwrite:
msgs.warn(f'{outfile} exits. Set overwrite=True to overwrite it.')
return
# If the file exists and update_det (and slit_spat_num) is provided, use the existing header
# and load up all the other hdus so that we only over-write the ones
# we are updating
if os.path.isfile(outfile) and (update_det is not None or slitspatnum is not None):
_specobjs = SpecObjs.from_fitsfile(outfile)
mask = np.ones(_specobjs.nobj, dtype=bool)
# Update
if slitspatnum is not None: # slitspatnum
dets, spat_ids = parse.parse_slitspatnum(slitspatnum)
for det, spat_id in zip(dets, spat_ids):
mask[(_specobjs.DET == det) & (_specobjs.SLITID == spat_id)] = False
elif update_det is not None:
# Pop out those with this detector (and slit if slit_spat_num is provided)
for det in np.atleast_1d(update_det):
mask[_specobjs.DET == det] = False
_specobjs = _specobjs[mask]
# Add in the new
# TODO: Is the loop necessary? add_sobj can take many SpecObj objects.
for sobj in self.specobjs:
_specobjs.add_sobj(sobj)
else:
_specobjs = self.specobjs
# Build up the Header
header = io.initialize_header()
for key in subheader.keys():
if key.upper() == 'HISTORY':
if history is None:
for line in str(subheader[key.upper()]).split('\n'):
header[key.upper()] = line
else:
header[key.upper()] = subheader[key]
# Also store the datetime in ISOT format
if key.upper() == 'MJD':
if isinstance(subheader[key], (list, tuple)):
mjdval = subheader[key][0]
elif isinstance(subheader[key], float):
mjdval = subheader[key]
else:
raise ValueError('Header card must be a float or a FITS header tuple')
header['DATETIME'] = (Time(mjdval, format='mjd').isot, "Date and time of the observation in ISOT format")
# Add calibration associations to Header
if self.calibs is not None:
for key, val in self.calibs.items():
header[f'CLBS_{key}'] = val
# Init
prihdu = fits.PrimaryHDU(header=header)
hdus = [prihdu]
# Add class info
prihdu.header['DMODCLS'] = (self.__class__.__name__, 'Datamodel class')
prihdu.header['DMODVER'] = (self.version, 'Datamodel version')
# Add history
if history is not None:
history.write_to_header(prihdu.header)
detector_hdus = {}
nspec, ext = 0, 0
# Loop on the SpecObj objects
for sobj in _specobjs:
if sobj is None:
continue
# HDUs
if debug:
raise NotImplementedError('Debugging for developers only.')
#embed()
#exit()
shdul = sobj.to_hdu()
if len(shdul) not in [1, 2]:
msgs.error('CODING ERROR: SpecObj datamodel changed. to_hdu should return 1 or 2 '
'HDUs. If returned, the 2nd one should be the detector/mosaic.')
if len(shdul) == 2:
detector_hdus[sobj['DET']] = shdul[1]
shdu = [shdul[0]]
else:
shdu = shdul
if len(shdu) != 1 or not isinstance(shdu[0], fits.hdu.table.BinTableHDU):
msgs.error('CODING ERROR: SpecObj datamodel changed.')
# Name
shdu[0].name = sobj.NAME
# Extension
keywd = 'EXT{:04d}'.format(ext)
prihdu.header[keywd] = sobj.NAME
ext += 1
nspec += 1
# Append
hdus += shdu
# Deal with Detectors
for key, item in detector_hdus.items():
prefix = item.header['name']
# Name
if prefix not in item.name: # In case we are re-loading
item.name = f'{prefix}-{item.name}'
# Append
hdus += [item]
# A few more for the header
prihdu.header['NSPEC'] = nspec
# Finish
hdulist = fits.HDUList(hdus)
if debug:
raise NotImplementedError('Debugging for developers only.')
#embed()
#exit()
hdulist.writeto(outfile, overwrite=overwrite)
msgs.info(f'Wrote 1D spectra to {outfile}')
[docs]
def write_info(self, outfile, pypeline):
"""
Write a summary of items to an ASCII file
Args:
outfile (:obj:`str`): Output filename
pypeline (:obj:`str`): PypeIt pipeline mode
"""
# TODO -- Deal with update_det
# Lists for a Table
slits, names, maskdef_id, objname, objra, objdec, spat_pixpos, spat_fracpos, boxsize, opt_fwhm, s2n = \
[], [], [], [], [], [], [], [], [], [], []
wave_rms = []
maskdef_extract = []
manual_extract = []
# binspectral, binspatial = parse.parse_binning(binning)
for specobj in self.specobjs:
det = specobj.DET
if specobj is None:
continue
# Detector items
binspectral, binspatial = parse.parse_binning(specobj.DETECTOR.binning)
platescale = specobj.DETECTOR.platescale
# Append
spat_pixpos.append(specobj.SPAT_PIXPOS)
if pypeline == 'MultiSlit':
spat_fracpos.append(specobj.SPAT_FRACPOS)
slits.append(specobj.SLITID)
names.append(specobj.NAME)
elif pypeline == 'SlicerIFU':
spat_fracpos.append(specobj.SPAT_FRACPOS)
slits.append(specobj.SLITID)
names.append(specobj.NAME)
elif pypeline == 'Echelle':
spat_fracpos.append(specobj.ECH_FRACPOS)
slits.append(specobj.ECH_ORDER)
names.append(specobj.ECH_NAME)
# Wave RMS
wave_rms.append(specobj.WAVE_RMS)
# Boxcar width
if specobj.BOX_RADIUS is not None:
slit_pix = 2.0 * specobj.BOX_RADIUS
# Convert to arcsec
binspectral, binspatial = parse.parse_binning(specobj.DETECTOR.binning)
#binspectral, binspatial = parse.parse_binning(binning)
# JFH TODO This should be using the order_platescale for each order. Furthermore, not all detectors
# have the same platescale, i.e. with GNIRS it is the same detector but a different camera hence a
# different attribute. platescale should be a spectrograph attribute determined on the fly.
# boxsize.append(slit_pix*binspatial*spectrograph.detector[specobj.DET-1]['platescale'])
boxsize.append(slit_pix * binspatial * platescale)
else:
boxsize.append(0.)
# Optimal profile (FWHM)
opt_fwhm.append(specobj.SPAT_FWHM)
# S2N -- default to boxcar
# NOTE: Below requires that S2N not be None, otherwise the code will
# fault. If the code gets here and S2N is None, check that 1D
# extractions have been performed.
s2n.append(specobj.S2N)
# Manual extraction?
manual_extract.append(specobj.hand_extract_flag)
# Slitmask info
maskdef_id.append(specobj.MASKDEF_ID)
objname.append(specobj.MASKDEF_OBJNAME)
objra.append(specobj.RA)
objdec.append(specobj.DEC)
maskdef_extract.append(specobj.MASKDEF_EXTRACT)
# Generate the table, if we have at least one source
if len(names) > 0:
obj_tbl = Table()
if pypeline == 'MultiSlit':
obj_tbl['slit'] = slits
obj_tbl['slit'].format = 'd'
elif pypeline == 'SlicerIFU':
obj_tbl['slit'] = slits
obj_tbl['slit'].format = 'd'
elif pypeline == 'Echelle':
obj_tbl['order'] = slits
obj_tbl['order'].format = 'd'
obj_tbl['name'] = names
if not np.all(np.array(maskdef_id) == None):
obj_tbl['maskdef_id'] = maskdef_id
if not np.all(np.array(objname) == None):
obj_tbl['objname'] = objname
if not np.all(np.array(objra) == None):
obj_tbl['objra'] = objra
if None not in objra:
obj_tbl['objra'].format = '.5f'
obj_tbl['objdec'] = objdec
if None not in objdec:
obj_tbl['objdec'].format = '.5f'
obj_tbl['spat_pixpos'] = spat_pixpos
obj_tbl['spat_pixpos'].format = '.1f'
obj_tbl['spat_fracpos'] = spat_fracpos
obj_tbl['spat_fracpos'].format = '.3f'
obj_tbl['box_width'] = boxsize
obj_tbl['box_width'].format = '.2f'
obj_tbl['box_width'].unit = units.arcsec
obj_tbl['opt_fwhm'] = opt_fwhm
obj_tbl['opt_fwhm'].format = '.3f'
obj_tbl['opt_fwhm'].unit = units.arcsec
obj_tbl['s2n'] = s2n
obj_tbl['s2n'].format = '.2f'
# is this a forced extraction at the expected position from slitmask design?
if not np.all(np.array(maskdef_extract) == None):
obj_tbl['maskdef_extract'] = maskdef_extract
# only if any manual extraction exists, print this
if not np.all(np.array(manual_extract) == False):
obj_tbl['manual_extract'] = manual_extract
# Wavelengths
if not np.all(np.array(wave_rms) == None):
obj_tbl['wv_rms'] = wave_rms
if None not in wave_rms:
obj_tbl['wv_rms'].format = '.3f'
# Write
obj_tbl.write(outfile,format='ascii.fixed_width', overwrite=True)
[docs]
def flexure_diagnostics(self):
"""
Print and return the spectral flexure of a spec1d file.
Returns:
:obj:`astropy.table.Table`: Table with the spectral flexure.
"""
spec_flex = Table()
spec_flex['NAME'] = self.NAME
spec_flex['global_spec_shift'] = self.FLEX_SHIFT_GLOBAL
if np.all(spec_flex['global_spec_shift'] != None):
spec_flex['global_spec_shift'].format = '0.3f'
spec_flex['local_spec_shift'] = self.FLEX_SHIFT_LOCAL
if np.all(spec_flex['local_spec_shift'] != None):
spec_flex['local_spec_shift'].format = '0.3f'
spec_flex['total_spec_shift'] = self.FLEX_SHIFT_TOTAL
if np.all(spec_flex['total_spec_shift'] != None):
spec_flex['total_spec_shift'].format = '0.3f'
# print the table
spec_flex.pprint_all()
# return the table
return spec_flex
#TODO Should this be a classmethod on specobjs??
[docs]
def get_std_trace(detname, std_outfile, chk_version=True):
"""
Returns the trace of the standard.
Args:
det (:obj:`int`, :obj:`tuple`):
1-indexed detector(s) to process.
std_outfile (:obj:`str`):
Filename with the standard star spec1d file. Can be None.
Returns:
`astropy.table.Table`_: Table with the trace of the standard star on the input detector.
If this is a MultiSlit reduction, the table will have a single column: `TRACE_SPAT`.
If this is an Echelle reduction, the table will have two columns: `ECH_ORDER` and `TRACE_SPAT`.
Will be None if ``std_outfile`` is None, or if the selected detector/mosaic
is not available in the provided spec1d file, or for SlicerIFU reductions.
"""
sobjs = SpecObjs.from_fitsfile(std_outfile, chk_version=chk_version)
pypeline = sobjs.PYPELINE
# Does the detector match?
# TODO: Instrument specific logic here could be implemented with the
# parset. For example LRIS-B or LRIS-R we we would use the standard
# from another detector.
this_det = sobjs.DET == detname
if np.any(this_det):
sobjs_det = sobjs[this_det]
sobjs_std = sobjs_det.get_std()
# No standard extracted on this detector??
if sobjs_std is None:
return None
# create table that contains the trace of the standard
std_tab = Table()
# flatten the array if this multislit
if 'MultiSlit' in pypeline:
std_tab['TRACE_SPAT'] = sobjs_std.TRACE_SPAT
elif 'Echelle' in pypeline:
std_tab['ECH_ORDER'] = sobjs_std.ECH_ORDER
std_tab['TRACE_SPAT'] = sobjs_std.TRACE_SPAT
elif 'SlicerIFU' in pypeline:
std_tab = None
else:
msgs.error('Unrecognized pypeline')
else:
std_tab = None
return std_tab
[docs]
def lst_to_array(lst, mask=None):
"""
Simple method to convert a list to an array
Allows for a list of Quantity objects
Args:
lst (:obj:`list`):
Should be number or Quantities
mask (`numpy.ndarray`_, optional):
Boolean array used to limit to a subset of the list. True=good
Returns:
`numpy.ndarray`_, `astropy.units.Quantity`_: Converted list
"""
_mask = np.ones(len(lst), dtype=bool) if mask is None else mask
# Return a Quantity array
if isinstance(lst[0], units.Quantity):
return units.Quantity(lst)[_mask]
# If all the elements of lst have the same type, np.array(lst)[mask] will work
if len(set(map(type, lst))) == 1:
return np.array(lst)[_mask]
# Otherwise, we have to set the array type to object
return np.array(lst, dtype=object)[_mask]
# NOTE: The dtype="object" is needed for the case where one element of lst
# is not a list but None. For example, if trying to unpack SpecObjs OPT fluxes
# and for one slit/order the OPT extraction failed (but not the BOX extraction),
# OPT_COUNTS is None for that slit/order, and lst would be something like
# [array, array, array, None, array], which makes np.array to fail and give the error
# "ValueError: setting an array element with a sequence. The requested array has an
# inhomogeneous shape after 1 dimensions..."