Source code for pypeit.sensfunc

"""
Implements the objects used to construct sensitivity functions.

.. include:: ../include/links.rst
"""
import inspect

from IPython import embed

import numpy as np
import scipy.interpolate
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages

from astropy.io import fits
from astropy import table

from pypeit import msgs
from pypeit import specobjs
from pypeit import utils
from pypeit.core import coadd
from pypeit.core import flux_calib
from pypeit.core import telluric
from pypeit.core import fitting
from pypeit.core.wavecal import wvutils
from pypeit.core import meta
from pypeit.core import flat
from pypeit.core.moment import moment1d

from pypeit.spectrographs.util import load_spectrograph
from pypeit import datamodel
from pypeit import flatfield


# TODO Add the data model up here as a standard thing using DataContainer.

# TODO Standard output location for sensfunc?

# TODO Add some QA plots, and plots to the screen if show is set.

[docs] class SensFunc(datamodel.DataContainer): r""" Base class for generating sensitivity functions from a standard-star spectrum. This class should not be instantated by itself; instead instantiate either :class:`UVISSensFunc` or :class:`IRSensFunc`, depending on the wavelength range of your data (UVIS for :math:`\lambda < 7000` angstrom, IR for :math:`\lambda > 7000` angstrom.) The datamodel attributes are: .. include:: ../include/class_datamodel_sensfunc.rst Args: spec1dfile (:obj:`str`): PypeIt spec1d file for the standard file. sensfile (:obj:`str`): File name for the sensitivity function data. par (:class:`~pypeit.par.pypeitpar.SensFuncPar`): The parameters required for the sensitivity function computation. par_fluxcalib (:class:`~pypeit.par.pypeitpar.FluxCalibratePar`, optional): The parameters required for flux calibration. These are only used for flux calibration of the standard star spectrum for the QA plot. If None, defaults will be used. debug (:obj:`bool`, optional): Run in debug mode, sending diagnostic information to the screen. """ version = '1.0.2' """Datamodel version.""" # TODO: Add this if we want to set the output float type for the np.ndarray # elements of the datamodel... # output_float_dtype = np.float32 # """Regardless of datamodel, output floating-point data have this fixed bit size.""" datamodel = {'PYP_SPEC': dict(otype=str, descr='PypeIt spectrograph name'), 'pypeline': dict(otype=str, descr='PypeIt pipeline reduction path'), 'spec1df': dict(otype=str, descr='PypeIt spec1D file used to for sensitivity function'), 'extr': dict(otype=str, descr='Extraction method used for the standard star (OPT or BOX)'), 'std_name': dict(otype=str, descr='Type of standard source'), 'std_cal': dict(otype=str, descr='File name (or shorthand) with the standard flux data'), # TODO: Is it possible/useful to force the coordinates to always be floats 'std_ra': dict(otype=float, descr='RA of the standard source'), 'std_dec': dict(otype=float, descr='DEC of the standard source'), 'airmass': dict(otype=float, descr='Airmass of the observation'), 'exptime': dict(otype=float, descr='Exposure time'), 'telluric': dict(otype=telluric.Telluric, descr='Telluric model; see ' ':class:`~pypeit.core.telluric.Telluric`'), 'sens': dict(otype=table.Table, descr='Table with the sensitivity function'), 'wave': dict(otype=np.ndarray, atype=float, descr='Wavelength vectors'), 'zeropoint': dict(otype=np.ndarray, atype=float, descr='Sensitivity function zeropoints'), 'throughput': dict(otype=np.ndarray, atype=float, descr='Spectrograph throughput measurements'), 'algorithm': dict(otype=str, descr='Algorithm used for the sensitivity calculation.')} # , # 'wave_splice': dict(otype=np.ndarray, atype=float, # descr='Spliced-together wavelength vector'), # 'zeropoint_splice': dict(otype=np.ndarray, atype=float, # descr='Spliced-together sensitivity function zeropoints'), # 'throughput_splice': dict(otype=np.ndarray, atype=float, # descr='Spliced-together spectrograph throughput ' # 'measurements')} """DataContainer datamodel.""" internals = ['sensfile', 'spectrograph', 'par', 'par_fluxcalib', 'qafile', 'thrufile', 'fstdfile', 'debug', 'sobjs_std', 'wave_cnts', 'counts', 'counts_ivar', 'counts_mask', 'log10_blaze_function', 'nspec_in', 'norderdet', 'wave_splice', 'zeropoint_splice', 'throughput_splice', 'steps', 'splice_multi_det', 'meta_spec', 'std_dict', 'chk_version' ] _algorithm = None """Algorithm used for the sensitivity calculation."""
[docs] @staticmethod def empty_sensfunc_table(norders, nspec, nspec_in, ncoeff=1): """ Construct an empty `astropy.table.Table`_ for the sensitivity function. Args: norders (:obj:`int`): The number of slits/orders on the detector. nspec (:obj:`int`): The number of spectral pixels for the zeropoint arrays. nspec_in (:obj:`int`): The number of spectral pixels on the detector for the input standard star spectrum. ncoeff (:obj:`int`, optional): Number of coefficients for smooth model fit to zeropoints Returns: `astropy.table.Table`_: Instance of the empty sensitivity function table. """ return table.Table(data=[ table.Column(name='SENS_WAVE', dtype=float, length=norders, shape=(nspec,), description='Wavelength vector'), table.Column(name='SENS_COUNTS_PER_ANG', dtype=float, length=norders, shape=(nspec,), description='Flux in counts per angstrom'), table.Column(name='SENS_LOG10_BLAZE_FUNCTION', dtype=float, length=norders, shape=(nspec,), description='Log10 of the blaze function for each slit/order'), table.Column(name='SENS_ZEROPOINT', dtype=float, length=norders, shape=(nspec,), description='Measured sensitivity zero-point data'), table.Column(name='SENS_ZEROPOINT_GPM', dtype=bool, length=norders, shape=(nspec,), description='Good-pixel mask for the measured zero points'), table.Column(name='SENS_ZEROPOINT_FIT', dtype=float, length=norders, shape=(nspec,), description='Best-fit smooth model to the zero points'), table.Column(name='SENS_ZEROPOINT_FIT_GPM', dtype=bool, length=norders, shape=(nspec,), description='Good-pixel mask for the model zero points'), table.Column(name='SENS_COEFF', dtype=float, length=norders, shape=(ncoeff,), description='Coefficients of smooth model fit to zero points'), table.Column(name='ECH_ORDERS', dtype=int, length=norders, description='Echelle order for this specrum (echelle data only)'), table.Column(name='POLYORDER_VEC', dtype=int, length=norders, description='Polynomial order for each slit/echelle (if applicable)'), table.Column(name='WAVE_MIN', dtype=float, length=norders, description='Minimum wavelength included in the fit'), table.Column(name='WAVE_MAX', dtype=float, length=norders, description='Maximum wavelength included in the fit'), table.Column(name='SENS_FLUXED_STD_WAVE', dtype=float, length=norders, shape=(nspec_in,), description='The wavelength array for the fluxed standard star spectrum'), table.Column(name='SENS_FLUXED_STD_FLAM', dtype=float, length=norders, shape=(nspec_in,), description='The F_lambda for the fluxed standard star spectrum'), table.Column(name='SENS_FLUXED_STD_FLAM_IVAR', dtype=float, length=norders, shape=(nspec_in,), description='The inverse variance of F_lambda for the fluxed standard star spectrum'), table.Column(name='SENS_FLUXED_STD_MASK', dtype=bool, length=norders, shape=(nspec_in,), description='The good pixel mask for the fluxed standard star spectrum '), table.Column(name='SENS_STD_MODEL_FLAM', dtype=float, length=norders, shape=(nspec_in,), description='The F_lambda for the standard model spectrum')])
# Superclass factory method generates the subclass instance
[docs] @classmethod def get_instance(cls, spec1dfile, sensfile, par, par_fluxcalib=None, debug=False, chk_version=True): """ Instantiate the relevant subclass based on the algorithm provided in ``par``. """ return next(c for c in cls.__subclasses__() if c.__name__ == f"{par['algorithm']}SensFunc")( spec1dfile, sensfile, par, par_fluxcalib=par_fluxcalib, debug=debug, chk_version=chk_version)
def __init__(self, spec1dfile, sensfile, par, par_fluxcalib=None, debug=False, chk_version=True): # Instantiate as an empty DataContainer super().__init__() # Input and Output files self.spec1df = spec1dfile self.extr = par['extr'] self.sensfile = sensfile self.par = par self.chk_version = chk_version # Spectrograph header = fits.getheader(self.spec1df) self.PYP_SPEC = header['PYP_SPEC'] self.spectrograph = load_spectrograph(self.PYP_SPEC) self.pypeline = self.spectrograph.pypeline # TODO: This line is necessary until we figure out a way to instantiate # spectrograph objects with configuration specific information from # spec1d files. self.spectrograph.dispname = header['DISPNAME'] self.par_fluxcalib = self.spectrograph.default_pypeit_par()['fluxcalib'] \ if par_fluxcalib is None else par_fluxcalib # Set the algorithm in the datamodel self.algorithm = self.__class__._algorithm # QA and throughput plot filenames self.qafile = sensfile.replace('.fits', '') + '_QA.pdf' self.thrufile = sensfile.replace('.fits', '') + '_throughput.pdf' self.fstdfile = sensfile.replace('.fits', '') + '_fluxed_std.pdf' # Other self.debug = debug self.steps = [] # Are we splicing together multiple detectors? self.splice_multi_det = True if self.par['multi_spec_det'] is not None else False # Read in the Standard star data self.sobjs_std = specobjs.SpecObjs.from_fitsfile( self.spec1df, chk_version=self.chk_version).get_std( multi_spec_det=self.par['multi_spec_det']) if self.sobjs_std is None: msgs.error(f'There is a problem with your standard star spec1d file: {self.spec1df}') # Unpack standard wave, counts, counts_ivar, counts_mask, log10_blaze_function, self.meta_spec, header \ = self.sobjs_std.unpack_object(ret_flam=False, log10blaze=True, extract_blaze=par['use_flat'], extract_type=self.extr, remove_missing=True) # Perform any instrument tweaks wave_twk, counts_twk, counts_ivar_twk, counts_mask_twk, log10_blaze_function_twk = \ self.spectrograph.tweak_standard(wave, counts, counts_ivar, counts_mask, self.meta_spec, log10_blaze_function=log10_blaze_function) # Reshape to 2d arrays self.wave_cnts, self.counts, self.counts_ivar, self.counts_mask, self.log10_blaze_function, self.nspec_in, \ self.norderdet = utils.spec_atleast_2d(wave_twk, counts_twk, counts_ivar_twk, counts_mask_twk, log10_blaze_function=log10_blaze_function_twk) # If the user provided RA and DEC use those instead of what is in meta star_ra = self.meta_spec['RA'] if self.par['star_ra'] is None else self.par['star_ra'] star_dec = self.meta_spec['DEC'] if self.par['star_dec'] is None else self.par['star_dec'] # Convert to decimal deg, as needed star_ra, star_dec = meta.convert_radec(star_ra, star_dec) # Read in standard star dictionary self.std_dict = flux_calib.get_standard_spectrum(star_type=self.par['star_type'], star_mag=self.par['star_mag'], ra=star_ra, dec=star_dec)
[docs] def _bundle(self): """ Bundle the object for writing using :func:`~pypeit.datamodel.DataContainer.to_hdu`. """ # All of the datamodel elements that can be written as a header are # written to the SENS extension. d = [] for key in self.keys(): if self[key] is None: continue if isinstance(self[key], datamodel.DataContainer) \ or isinstance(self[key], table.Table): d += [{key: self[key]}] continue if self.datamodel[key]['otype'] == np.ndarray: # TODO: We might want a general solution in # DataContainer that knows to convert (and revert) # boolean arrays to np.uint8 types. Does anyone know a # way around this? It's annoying that ImageHDUs cannot # have boolean type arrays. if issubclass(self[key].dtype.type, (bool, np.bool_)): d += [{key: self[key].astype(np.uint8)}] # elif issubclass(self[key].dtype.type, (float, np.floating)): # d += [{key: self[key].astype(self.output_float_dtype)}] else: d += [{key: self[key]}] # If the spliced arrays have been set, use these to replace the # multislit/echelle wave, zeropoint, and throughput arrays if self.splice_multi_det: # TODO: I added this neurotic check, just to make sure... if self.wave_splice is None or self.zeropoint_splice is None \ or self.throughput_splice is None: msgs.error('CODING ERROR: Assumed if splice_multi_det is True, then the *_splice ' 'arrays have all been defined. Found a case where this is not true!') # Loop through this list of dictionaries for _d in d: if list(_d.keys())[0] not in ['wave', 'zeropoint', 'throughput']: # Keep going if it's not one of the relevant attributes continue # Replace with the spliced version. Warning: This requires that # the relevant attribute names are paired as self.attr and # self.attr_splice attr = list(_d.keys())[0] _d[attr] = getattr(self, f'{attr}_splice') # Add all the non-array, non-DataContainer, non-Table elements to the # 'sens' extension. This is done after the first loop to just to make # sure that the order of extensions matches the order in the datamodel. for _d in d: if list(_d.keys()) != ['sens']: continue for key in self.keys(): if self[key] is None or self.datamodel[key]['otype'] == np.ndarray \ or isinstance(self[key], datamodel.DataContainer) \ or isinstance(self[key], table.Table): continue _d[key] = self[key] return d
[docs] @classmethod def from_hdu(cls, hdu, hdu_prefix=None, chk_version=True): """ Instantiate the object from an HDU extension. This overrides the base-class method, essentially just to handle the fact that the 'TELLURIC' extension is not called 'MODEL'. Args: hdu (`astropy.io.fits.HDUList`_, `astropy.io.fits.ImageHDU`_, `astropy.io.fits.BinTableHDU`_): The HDU(s) with the data to use for instantiation. hdu_prefix (:obj:`str`, optional): Maintained for consistency with the base class but is not used by this method. chk_version (:obj:`bool`, optional): If True, raise an error if the datamodel version or type check failed. If False, throw a warning only. """ # Run the default parser to get most of the data. This correctly parses # everything except for the Telluric.model data table. d, version_passed, type_passed, parsed_hdus = cls._parse(hdu, allow_subclasses=True) # Check cls._check_parsed(version_passed, type_passed, chk_version=chk_version) # Load the telluric model, if it exists if 'TELLURIC' in [h.name for h in hdu]: # Instantiate the Telluric model from the header, if it exists # TODO: I don't like this, but this is the fastest work around d['telluric'] = telluric.Telluric.from_hdu(hdu['TELLURIC'], ext_pseudo='MODEL', chk_version=chk_version) # Return the constructed object return super().from_dict(d=d)
[docs] def compute_zeropoint(self): """ Dummy method overloaded by subclasses """ pass
[docs] def run(self): """ Execute the sensitivity function calculations. """ # Compute the sensitivity function self.compute_zeropoint() # Extrapolate the zeropoint based on par['extrap_blu'], par['extrap_red'] self.wave, self.zeropoint = self.extrapolate(samp_fact=self.par['samp_fact']) if self.splice_multi_det: self.wave_splice, self.zeropoint_splice = self.splice() # Flux the standard star with this sensitivity function and add it to the output table self.flux_std() # Compute the throughput self.throughput, self.throughput_splice = self.compute_throughput() # Write out QA and throughput plots self.write_QA()
[docs] def flux_std(self): """ Flux the standard star and add it to the sensitivity function table """ # Now flux the standard star self.sobjs_std.apply_flux_calib(self.par_fluxcalib, self.spectrograph, self) # TODO assign this to the data model # Unpack the fluxed standard _wave, _flam, _flam_ivar, _flam_mask, _blaze, _, _ \ = self.sobjs_std.unpack_object(ret_flam=True, extract_type=self.extr) # Reshape to 2d arrays wave, flam, flam_ivar, flam_mask, _, _, _ \ = utils.spec_atleast_2d(_wave, _flam, _flam_ivar, _flam_mask) # Store in the sens table self.sens['SENS_FLUXED_STD_WAVE'] = wave.T self.sens['SENS_FLUXED_STD_FLAM'] = flam.T self.sens['SENS_FLUXED_STD_FLAM_IVAR'] = flam_ivar.T self.sens['SENS_FLUXED_STD_MASK'] = flam_mask.T
[docs] def eval_zeropoint(self, wave, iorddet): """ Dummy method, overloaded by subclasses """ pass
[docs] def extrapolate(self, samp_fact=1.5): """ Extrapolates the sensitivity function to cover an extra wavelength range set by the ``extrapl_blu`` and ``extrap_red`` parameters. This is important for making sure that the sensitivity function can be applied to data with slightly different wavelength coverage etc. Parameters ---------- samp_fact : :obj:`float` Parameter governing the sampling of the wavelength grid used for the extrapolation. Returns ------- wave_extrap : `numpy.ndarray`_ Extrapolated wavelength array zeropoint_extrap : `numpy.ndarray`_ Extrapolated sensitivity function """ # Create a new set of oversampled and padded wavelength grids for the # extrapolation wave_extrap_min = self.sens['WAVE_MIN'].data * (1.0 - self.par['extrap_blu']) wave_extrap_max = self.sens['WAVE_MAX'].data * (1.0 + self.par['extrap_red']) nspec_extrap = 0 # Find the maximum size of the wavewlength grids, since we want # everything to have the same for idet in range(self.norderdet): wave = self.wave_cnts if self.wave_cnts.ndim == 1 else self.wave_cnts[:, idet] dwave_data, dloglam_data, resln_guess, pix_per_sigma = wvutils.get_sampling(wave) nspec_now = np.ceil(samp_fact * (wave_extrap_max[idet] - wave_extrap_min[idet]) / dwave_data).astype(int) nspec_extrap = np.max([nspec_now, nspec_extrap]) # Create the wavelength grid wave_extrap = np.outer(np.arange(nspec_extrap), (wave_extrap_max - wave_extrap_min) / (nspec_extrap - 1)) \ + np.outer(np.ones(nspec_extrap), wave_extrap_min) zeropoint_extrap = np.zeros_like(wave_extrap) # Evaluate extrapolated zerpoint for all orders detectors for iorddet in range(self.norderdet): zeropoint_extrap[:, iorddet] = self.eval_zeropoint(wave_extrap[:,iorddet], iorddet) self.steps.append(inspect.stack()[0][3]) return wave_extrap, zeropoint_extrap
[docs] def splice(self): """ Routine to splice together sensitivity functions into one global sensitivity function for spectrographs with multiple detectors extending across the wavelength direction. Returns ------- wave_splice : `numpy.ndarray`_, shape is (nspec_splice, 1) wavelength array zeropoint_splice: `numpy.ndarray`_, shape is (nspec_splice, 1) zero-point array """ msgs.info(f"Merging sensfunc for {self.norderdet} detectors {self.par['multi_spec_det']}") wave_splice_min = self.wave[self.wave > 1.0].min() wave_splice_max = self.wave[self.wave > 1.0].max() wave_splice_1d, _, _ = wvutils.get_wave_grid(waves=self.wave, wave_method='linear', wave_grid_min=wave_splice_min, wave_grid_max=wave_splice_max, spec_samp_fact=1.0) zeropoint_splice_1d = np.zeros_like(wave_splice_1d) for idet in range(self.norderdet): wave_min = self.sens['WAVE_MIN'][idet] wave_max = self.sens['WAVE_MAX'][idet] if idet == 0: # If this is the bluest detector, extrapolate to wave_extrap_min wave_mask_min = wave_splice_min wave_mask_max = wave_max elif idet == (self.norderdet - 1): # If this is the reddest detector, extrapolate to wave_extrap_max wave_mask_min = wave_min wave_mask_max = wave_splice_max else: wave_mask_min = wave_min wave_mask_max = wave_max splice_wave_mask = (wave_splice_1d >= wave_mask_min) & (wave_splice_1d <= wave_mask_max) zeropoint_splice_1d[splice_wave_mask] \ = self.eval_zeropoint(wave_splice_1d[splice_wave_mask], idet) # Interpolate over gaps zeros = zeropoint_splice_1d == 0. if np.any(zeros): msgs.info("Interpolating over gaps (and extrapolating with fill_value=1, if need be)") interp_func = scipy.interpolate.interp1d(wave_splice_1d[np.invert(zeros)], zeropoint_splice_1d[np.invert(zeros)], kind='nearest', fill_value=0., bounds_error=False) # #kind='nearest', fill_value='extrapoloate', bounds_error=False) # extrapolate fails for JXP, even on 1.4.1 zero_values = interp_func(wave_splice_1d[zeros]) zeropoint_splice_1d[zeros] = zero_values nspec_splice = wave_splice_1d.size self.steps.append(inspect.stack()[0][3]) return wave_splice_1d.reshape(nspec_splice,1), zeropoint_splice_1d.reshape(nspec_splice,1)
[docs] def compute_throughput(self): """ Compute the spectroscopic throughput Returns ------- throughput : `numpy.ndarray`_, :obj:`float`, shape is (nspec, norders) Throughput measurements throughput_splice : `numpy.ndarray`_, :obj:`float`, shape is (nspec_splice, norders) Throughput measurements for spliced spectra """ # Set the throughput to be -1 in places where it is not defined. throughput = np.full_like(self.zeropoint, -1.0) for idet in range(self.wave.shape[1]): wave_gpm = (self.wave[:,idet] >= self.sens['WAVE_MIN'][idet]) \ & (self.wave[:,idet] <= self.sens['WAVE_MAX'][idet]) \ & (self.wave[:,idet] > 1.0) throughput[:,idet][wave_gpm] \ = flux_calib.zeropoint_to_throughput(self.wave[:,idet][wave_gpm], self.zeropoint[:,idet][wave_gpm], self.spectrograph.telescope.eff_aperture()) if self.splice_multi_det: wave_gpm = (self.wave_splice >= np.amin(self.sens['WAVE_MIN'])) \ & (self.wave_splice <= np.amax(self.sens['WAVE_MAX'])) \ & (self.wave_splice > 1.0) throughput_splice = np.zeros_like(self.wave_splice) throughput_splice[wave_gpm] \ = flux_calib.zeropoint_to_throughput(self.wave_splice[wave_gpm], self.zeropoint_splice[wave_gpm], self.spectrograph.telescope.eff_aperture()) else: throughput_splice = None return throughput, throughput_splice
[docs] def write_QA(self): """ Write out zeropoint QA files """ utils.pyplot_rcparams() # Plot QA for zeropoint if 'Echelle' in self.spectrograph.pypeline: order_or_det = self.meta_spec['ECH_ORDERS'] order_or_det_str = 'order' else: order_or_det = np.arange(self.norderdet) + 1 order_or_det_str = 'det' spec_str = f' {self.spectrograph.name} {self.spectrograph.pypeline} ' \ f'{self.spectrograph.dispname} ' zp_title = ['PypeIt Zeropoint QA for' + spec_str + order_or_det_str + f'={order_or_det[idet]}' for idet in range(self.norderdet)] thru_title = [order_or_det_str + f'={order_or_det[idet]}' for idet in range(self.norderdet)] is_odd = self.norderdet % 2 != 0 npages = int(np.ceil(self.norderdet/2)) if is_odd else self.norderdet//2 + 1 # TODO: PDF page logic is a bit complicated becauase we want to plot two # plots per page, but the number of pages depends on the number of # order/det. Consider just dumping out a set of plots or revamp once we # have a dashboard. with PdfPages(self.qafile) as pdf: for ipage in range(npages): figure, (ax1, ax2) = plt.subplots(2, figsize=(8.27, 11.69)) if (2 * ipage) < self.norderdet: flux_calib.zeropoint_qa_plot(self.sens['SENS_WAVE'][2*ipage], self.sens['SENS_ZEROPOINT'][2*ipage], self.sens['SENS_ZEROPOINT_GPM'][2*ipage], self.sens['SENS_ZEROPOINT_FIT'][2*ipage], self.sens['SENS_ZEROPOINT_FIT_GPM'][2*ipage], title=zp_title[2*ipage], axis=ax1) if (2*ipage + 1) < self.norderdet: flux_calib.zeropoint_qa_plot(self.sens['SENS_WAVE'][2*ipage+1], self.sens['SENS_ZEROPOINT'][2*ipage+1], self.sens['SENS_ZEROPOINT_GPM'][2*ipage+1], self.sens['SENS_ZEROPOINT_FIT'][2*ipage+1], self.sens['SENS_ZEROPOINT_FIT_GPM'][2*ipage+1], title=zp_title[2*ipage+1], axis=ax2) if self.norderdet == 1: # For single order/det just finish up after the first page ax2.remove() pdf.savefig() plt.close('all') elif (self.norderdet > 1) & (ipage < npages-1): # For multi order/det but not on the last page, finish up. # No need to remove ax2 since there are always 2 plots per # page except on the last page pdf.savefig() plt.close('all') else: # For multi order/det but on the last page, add order/det # summary plot to the final page Deal with even/odd page # logic for axes if is_odd: # add order/det summary plot to axis 2 of current page axis=ax2 else: axis=ax1 ax2.remove() for idet in range(self.norderdet): # define the color rr = (np.max(order_or_det) - order_or_det[idet]) \ / np.maximum(np.max(order_or_det) - np.min(order_or_det), 1) gg = 0.0 bb = (order_or_det[idet] - np.min(order_or_det)) \ / np.maximum(np.max(order_or_det) - np.min(order_or_det), 1) wave_gpm = self.sens['SENS_WAVE'][idet] > 1.0 axis.plot(self.sens['SENS_WAVE'][idet,wave_gpm], self.sens['SENS_ZEROPOINT_FIT'][idet,wave_gpm], color=(rr, gg, bb), linestyle='-', linewidth=2.5, label=thru_title[idet], zorder=5 * idet) _wave_min = np.amin(self.sens['WAVE_MIN']) _wave_max = np.amax(self.sens['WAVE_MAX']) # If we are splicing, overplot the spliced zeropoint if self.splice_multi_det: wave_slice_gpm = (self.wave_splice >= _wave_min) \ & (self.wave_splice <= _wave_max) \ & (self.wave_splice > 1.0) axis.plot(self.wave_splice[wave_slice_gpm].flatten(), self.zeropoint_splice[wave_slice_gpm].flatten(), color='black', linestyle='-', linewidth=2.5, label='Spliced Zeropoint', zorder=30, alpha=0.3) wave_gpm = self.sens['SENS_WAVE'] > 1.0 axis.set_xlim((0.98 * _wave_min, 1.02 * _wave_max)) axis.set_ylim((0.95 * np.amin(self.sens['SENS_ZEROPOINT_FIT'][wave_gpm]), 1.05 * np.amax(self.sens['SENS_ZEROPOINT_FIT'][wave_gpm]))) axis.legend(fontsize=14) axis.set_xlabel('Wavelength (Angstroms)') axis.set_ylabel('Zeropoint (AB mag)') axis.set_title('PypeIt Zeropoints for' + spec_str, fontsize=12) pdf.savefig() plt.close('all') # Plot throughput curve(s) for all orders/det fig = plt.figure(figsize=(12,8)) axis = fig.add_axes([0.1, 0.1, 0.8, 0.8]) for idet in range(self.wave.shape[1]): # define the color rr = (np.max(order_or_det) - order_or_det[idet]) \ / np.maximum(np.max(order_or_det) - np.min(order_or_det), 1) gg = 0.0 bb = (order_or_det[idet] - np.min(order_or_det)) \ / np.maximum(np.max(order_or_det) - np.min(order_or_det), 1) gpm = (self.throughput[:, idet] >= 0.0) axis.plot(self.wave[gpm,idet], self.throughput[gpm,idet], color=(rr, gg, bb), linestyle='-', linewidth=2.5, label=thru_title[idet], zorder=5*idet) if self.splice_multi_det: axis.plot(self.wave_splice[wave_slice_gpm].flatten(), self.throughput_splice[wave_slice_gpm].flatten(), color='black', linestyle='-', linewidth=2.5, label='Spliced Throughput', zorder=30, alpha=0.3) axis.set_xlim((0.98*self.wave[self.throughput >=0.0].min(), 1.02*self.wave[self.throughput >=0.0].max())) axis.set_ylim((0.0, 1.05*self.throughput[self.throughput >=0.0].max())) axis.legend() axis.set_xlabel('Wavelength (Angstroms)') axis.set_ylabel('Throughput') axis.set_title('PypeIt Throughput for' + spec_str) fig.savefig(self.thrufile) # Plot fluxed standard star for all orders/det fig = plt.figure(figsize=(12,8)) axis = fig.add_axes([0.1, 0.1, 0.8, 0.8]) axis.plot(self.std_dict['wave'].value, self.std_dict['flux'].value, color='green',linewidth=3.0, label=self.std_dict['name'], zorder=100, alpha=0.7) for iorddet in range(self.sens['SENS_FLUXED_STD_WAVE'].shape[0]): # define the color rr = (np.max(order_or_det) - order_or_det[iorddet]) \ / np.maximum(np.max(order_or_det) - np.min(order_or_det), 1) gg = 0.0 bb = (order_or_det[iorddet] - np.min(order_or_det)) \ / np.maximum(np.max(order_or_det) - np.min(order_or_det), 1) wave_gpm = self.sens['SENS_FLUXED_STD_WAVE'][iorddet] > 1.0 axis.plot(self.sens['SENS_FLUXED_STD_WAVE'][iorddet][wave_gpm], self.sens['SENS_FLUXED_STD_FLAM'][iorddet][wave_gpm], color=(rr, gg, bb), drawstyle='steps-mid', linewidth=1.0, label=thru_title[iorddet], zorder=idet, alpha=0.7) wave_gpm_global = self.sens['SENS_FLUXED_STD_WAVE'] > 1.0 wave_min = 0.98*(self.sens['SENS_FLUXED_STD_WAVE'][wave_gpm_global]).min() wave_max = 1.02*(self.sens['SENS_FLUXED_STD_WAVE'][wave_gpm_global]).max() pix_wave_std = (self.std_dict['wave'].value >= wave_min) & (self.std_dict['wave'].value <= wave_max) flux_min = -1.0 flux_max = 1.10*self.std_dict['flux'][pix_wave_std].value.max() axis.set_xlim((wave_min, wave_max)) axis.set_ylim((flux_min, flux_max)) axis.legend() axis.set_xlabel('Wavelength (Angstroms)') axis.set_ylabel(r'$f_{{\lambda}}~~~(10^{{-17}}~{{\rm erg~s^{-1}~cm^{{-2}}~\AA^{{-1}}}})$') axis.set_title('Fluxed Std Compared to True Spectrum:' + spec_str) fig.savefig(self.fstdfile) #save the model that was used model_interp_func = scipy.interpolate.interp1d(self.std_dict['wave'].value, self.std_dict['flux'].value, bounds_error=False, fill_value='extrapolate') model_flux_sav = np.zeros_like(self.sens['SENS_FLUXED_STD_FLAM']) for iorddet in range(self.sens['SENS_FLUXED_STD_WAVE'].shape[0]): wave_gpm = self.sens['SENS_FLUXED_STD_WAVE'][iorddet] > 1.0 model_flux_sav[iorddet][wave_gpm] = model_interp_func(self.sens['SENS_FLUXED_STD_WAVE'][iorddet][wave_gpm]) self.sens['SENS_STD_MODEL_FLAM'] = model_flux_sav
[docs] @classmethod def sensfunc_weights(cls, sensfile, waves, ech_order_vec=None, debug=False, extrap_sens=True, chk_version=True): """ Get the weights based on the sensfunc Args: sensfile (str): the name of your fits format sensfile waves (`numpy.ndarray`_): wavelength grid for your output weights. Shape is (nspec, norders, nexp) or (nspec, norders). ech_order_vec (`numpy.ndarray`_, optional): Vector of echelle orders. Only used for echelle data. debug (bool): default=False show the weights QA extrap_sens (bool): default=True Extrapolate the sensitivity function chk_version (:obj:`bool`, optional): When reading in existing files written by PypeIt, perform strict version checking to ensure a valid file. If False, the code will try to keep going, but this may lead to faults and quiet failures. User beware! Returns: `numpy.ndarray`_: sensfunc weights evaluated on the input waves wavelength grid, shape = same as waves """ sens = cls.from_file(sensfile, chk_version=chk_version) if waves.ndim == 2: nspec, norder = waves.shape if ech_order_vec.size != norder: msgs.warn('The number of orders in the wave grid does not match the ' 'number of orders in the unpacked sobjs. Echelle order vector not used.') ech_order_vec = None nexp = 1 waves_stack = np.reshape(waves, (nspec, norder, 1)) elif waves.ndim == 3: nspec, norder, nexp = waves.shape waves_stack = waves elif waves.ndim == 1: nspec = waves.size norder, nexp = 1, 1 waves_stack = np.reshape(waves, (nspec, 1, 1)) else: msgs.error('Unrecognized dimensionality for waves') weights_stack = np.ones_like(waves_stack) if norder != sens.zeropoint.shape[1] and ech_order_vec is None: msgs.error('The number of orders in {:} does not agree with your data. Wrong sensfile?'.format(sensfile)) elif norder != sens.zeropoint.shape[1] and ech_order_vec is not None: msgs.warn('The number of orders in {:} does not match the number of orders in the data. ' 'Using only the matching orders.'.format(sensfile)) # array of order to loop through orders = np.arange(norder) if ech_order_vec is None else ech_order_vec for iord,this_ord in enumerate(orders): if ech_order_vec is None: isens = iord # find the index of the sensfunc for this order elif ech_order_vec is not None and np.any(sens.sens['ECH_ORDERS'].value == this_ord): isens = np.where(sens.sens['ECH_ORDERS'].value == this_ord)[0][0] else: # if the order is not in the sensfunc file, skip it continue for iexp in range(nexp): sensfunc_iord = flux_calib.get_sensfunc_factor(waves_stack[:,iord,iexp], sens.wave[:,isens], sens.zeropoint[:,isens], 1.0, extrap_sens=extrap_sens) weights_stack[:,iord,iexp] = utils.inverse(sensfunc_iord) if debug: coadd.weights_qa(utils.echarr_to_echlist(waves_stack)[0], utils.echarr_to_echlist(weights_stack)[0], utils.echarr_to_echlist(waves_stack > 1.0)[0], title='sensfunc_weights') # Reshape to be the same size/dimension as the input spectrum if waves.ndim == 2: weights_stack = np.reshape(weights_stack, (nspec, norder)) elif waves.ndim == 1: weights_stack = np.reshape(weights_stack, (nspec)) return weights_stack
# TODO Add a method which optionally merges sensfunc using the nsens > 1 logic
[docs] class IRSensFunc(SensFunc): r""" Determine a sensitivity functions from standard-star spectra. Should only be used with NIR spectra (:math:`\lambda > 7000` angstrom). Args: spec1dfile (:obj:`str`): PypeIt spec1d file for the standard file. sensfile (:obj:`str`): File name for the sensitivity function data. par (:class:`~pypeit.par.pypeitpar.SensFuncPar`, optional): The parameters required for the sensitivity function computation. debug (:obj:`bool`, optional): Run in debug mode. """ _algorithm = 'IR' """Algorithm used for the sensitivity calculation."""
[docs] def compute_zeropoint(self): """ Calls routine to compute the sensitivity function. Returns ------- TelObj : :class:`~pypeit.core.telluric.Telluric` Best-fitting telluric model """ self.telluric = telluric.sensfunc_telluric(self.wave_cnts, self.counts, self.counts_ivar, self.counts_mask, self.meta_spec['EXPTIME'], self.meta_spec['AIRMASS'], self.std_dict, self.par['IR']['telgridfile'], log10_blaze_function=self.log10_blaze_function, polyorder=self.par['polyorder'], ech_orders=self.meta_spec['ECH_ORDERS'], resln_guess=self.par['IR']['resln_guess'], resln_frac_bounds=self.par['IR']['resln_frac_bounds'], pix_shift_bounds=self.par['IR']['pix_shift_bounds'], sn_clip=self.par['IR']['sn_clip'], teltype=self.par['IR']['teltype'], tell_npca=self.par['IR']['tell_npca'], mask_hydrogen_lines=self.par['mask_hydrogen_lines'], maxiter=self.par['IR']['maxiter'], lower=self.par['IR']['lower'], upper=self.par['IR']['upper'], delta_coeff_bounds=self.par['IR']['delta_coeff_bounds'], minmax_coeff_bounds=self.par['IR']['minmax_coeff_bounds'], tol=self.par['IR']['tol'], popsize=self.par['IR']['popsize'], recombination=self.par['IR']['recombination'], polish=self.par['IR']['polish'], disp=self.par['IR']['disp'], debug=self.debug, debug_init=self.debug) # Copy the relevant metadata self.std_name = self.telluric.std_name self.std_cal = self.telluric.std_cal self.std_ra = self.telluric.std_ra self.std_dec = self.telluric.std_dec self.airmass = self.telluric.airmass self.exptime = self.telluric.exptime # Instantiate the main output data table self.sens = self.empty_sensfunc_table(self.telluric.norders, self.telluric.wave_grid.size, self.nspec_in, ncoeff=self.telluric.max_ntheta_obj) # For stupid reasons related to how astropy tables will let me store # this data I have to redundantly write out the wavelength grid for each # order. In actuality a variable length wavelength grid is needed which # is a subset of the original fixed grid, but there is no way to write # this to disk. Perhaps the better solution would be a list of of # astropy tables, one for each order, that would save some space, since # we could then write out only the subset used to the table column. I'm # going with this inefficient approach for now, since eventually we will # want to create a data container for these outputs. That said, coming # up with that standarized model is a bit tedious given the somewhat # heterogenous outputs of the two different fluxing algorithms. # Extract and construct the relevant data using the telluric model self.sens['SENS_COEFF'] = self.telluric.model['OBJ_THETA'] self.sens['WAVE_MIN'] = self.telluric.model['WAVE_MIN'] self.sens['WAVE_MAX'] = self.telluric.model['WAVE_MAX'] self.sens['ECH_ORDERS'] = self.telluric.model['ECH_ORDERS'] s = self.telluric.model['IND_LOWER'] e = self.telluric.model['IND_UPPER']+1 self.sens['POLYORDER_VEC'] = self.telluric.model['POLYORDER_VEC'] for i in range(self.norderdet): # Compute and assign the zeropint_data from the input data and the # best-fit telluric model self.sens['SENS_WAVE'][i,s[i]:e[i]] = self.telluric.wave_grid[s[i]:e[i]] if self.log10_blaze_function is not None: log10_blaze_function_iord = self.telluric.log10_blaze_func_arr[s[i]:e[i],i] self.sens['SENS_LOG10_BLAZE_FUNCTION'][i,s[i]:e[i]] = self.telluric.log10_blaze_func_arr[s[i]:e[i],i] else: log10_blaze_function_iord = None self.sens['SENS_ZEROPOINT_GPM'][i,s[i]:e[i]] = self.telluric.mask_arr[s[i]:e[i],i] self.sens['SENS_COUNTS_PER_ANG'][i,s[i]:e[i]] = self.telluric.flux_arr[s[i]:e[i],i] N_lam = self.sens['SENS_COUNTS_PER_ANG'][i,s[i]:e[i]] / self.exptime self.sens['SENS_ZEROPOINT'][i,s[i]:e[i]], _ \ = flux_calib.compute_zeropoint(self.sens['SENS_WAVE'][i,s[i]:e[i]], N_lam, self.sens['SENS_ZEROPOINT_GPM'][i,s[i]:e[i]], self.telluric.obj_dict_list[i]['flam_true'], tellmodel=self.telluric.tellmodel_list[i]) # TODO: func is always 'legendre' because that is what's set by # sensfunc_telluric self.sens['SENS_ZEROPOINT_FIT'][i,s[i]:e[i]] \ = flux_calib.eval_zeropoint( self.sens['SENS_COEFF'][i,:self.sens['POLYORDER_VEC'][i]+2], self.telluric.func, self.sens['SENS_WAVE'][i,s[i]:e[i]], self.sens['WAVE_MIN'][i], self.sens['WAVE_MAX'][i], log10_blaze_func_per_ang=log10_blaze_function_iord) self.sens['SENS_ZEROPOINT_FIT_GPM'][i,s[i]:e[i]] = self.telluric.outmask_list[i]
[docs] def eval_zeropoint(self, wave, iorddet): """ Evaluate the sensitivity function zero-points Parameters ---------- wave : `numpy.ndarray`_, shape is (nspec) Wavelength array iorddet : :obj:`int` Order or detector (0-indexed) Returns ------- zeropoint : `numpy.ndarray`_, shape is (nspec,) """ s = self.telluric.model['IND_LOWER'] e = self.telluric.model['IND_UPPER']+1 # TODO: Not sure what else to do here if self.log10_blaze_function is not None: log10_blaze_function = scipy.interpolate.interp1d( self.sens['SENS_WAVE'][iorddet,s[iorddet]:e[iorddet]], self.sens['SENS_LOG10_BLAZE_FUNCTION'][iorddet,s[iorddet]:e[iorddet]], kind='linear', bounds_error=False, fill_value='extrapolate')(wave) else: log10_blaze_function = None return flux_calib.eval_zeropoint( self.sens['SENS_COEFF'][iorddet,:self.telluric.model['POLYORDER_VEC'][iorddet]+2], self.telluric.func, wave, self.sens['WAVE_MIN'][iorddet], self.sens['WAVE_MAX'][iorddet], log10_blaze_func_per_ang=log10_blaze_function)
[docs] class UVISSensFunc(SensFunc): r""" Determine a sensitivity functions from standard-star spectra. Should only be used with UVIS spectra (:math:`\lambda < 7000` angstrom). Args: spec1dfile (:obj:`str`): PypeIt spec1d file for the standard file. sensfile (:obj:`str`): File name for the sensitivity function data. par (:class:`~pypeit.par.pypeitpar.SensFuncPar`, optional): The parameters required for the sensitivity function computation. debug (:obj:`bool`, optional): Run in debug mode. """ _algorithm = 'UVIS' """Algorithm used for the sensitivity calculation.""" def __init__(self, spec1dfile, sensfile, par, par_fluxcalib=None, debug=False, chk_version=True): super().__init__(spec1dfile, sensfile, par, par_fluxcalib=par_fluxcalib, debug=debug, chk_version=chk_version) # Add some cards to the meta spec. These should maybe just be added # already in unpack object self.meta_spec['LATITUDE'] = self.spectrograph.telescope['latitude'] self.meta_spec['LONGITUDE'] = self.spectrograph.telescope['longitude']
[docs] def compute_zeropoint(self): """ Calls routine to compute the sensitivity function. """ meta_table, out_table = flux_calib.sensfunc(self.wave_cnts, self.counts, self.counts_ivar, self.counts_mask, self.meta_spec['EXPTIME'], self.meta_spec['AIRMASS'], self.std_dict, self.meta_spec['LONGITUDE'], self.meta_spec['LATITUDE'], self.par['UVIS']['extinct_file'], self.meta_spec['ECH_ORDERS'], polyorder=self.par['polyorder'], hydrogen_mask_wid=self.par['hydrogen_mask_wid'], mask_hydrogen_lines=self.par['mask_hydrogen_lines'], mask_helium_lines=self.par['mask_helium_lines'], nresln=self.par['UVIS']['nresln'], resolution=self.par['UVIS']['resolution'], trans_thresh=self.par['UVIS']['trans_thresh'], polycorrect=self.par['UVIS']['polycorrect'], polyfunc=self.par['UVIS']['polyfunc'], debug=self.debug) # Copy the relevant metadata self.std_name = meta_table['STD_NAME'][0] self.std_cal = meta_table['CAL_FILE'][0] self.std_ra = meta_table['STD_RA'][0] self.std_dec = meta_table['STD_DEC'][0] self.airmass = meta_table['AIRMASS'][0] self.exptime = meta_table['EXPTIME'][0] norder, nspec = out_table['SENS_ZEROPOINT'].shape # Instantiate the main output data table self.sens = self.empty_sensfunc_table(norder, nspec, self.nspec_in) # Copy the relevant data # NOTE: SENS_COEFF is empty! self.sens['SENS_WAVE'] = out_table['SENS_WAVE'] self.sens['SENS_COUNTS_PER_ANG'] = out_table['SENS_COUNTS_PER_ANG'] self.sens['SENS_ZEROPOINT'] = out_table['SENS_ZEROPOINT'] self.sens['SENS_ZEROPOINT_GPM'] = out_table['SENS_ZEROPOINT_GPM'] self.sens['SENS_ZEROPOINT_FIT'] = out_table['SENS_ZEROPOINT_FIT'] self.sens['SENS_ZEROPOINT_FIT_GPM'] = out_table['SENS_ZEROPOINT_FIT_GPM'] if self.meta_spec['ECH_ORDERS'] is not None: self.sens['ECH_ORDERS'] = self.meta_spec['ECH_ORDERS'] self.sens['POLYORDER_VEC'] = np.full(norder, self.par['polyorder']) self.sens['WAVE_MIN'] = out_table['WAVE_MIN'] self.sens['WAVE_MAX'] = out_table['WAVE_MAX']
[docs] def eval_zeropoint(self, wave, iorddet): """ Evaluate the sensitivity function zero-points Parameters ---------- wave : `numpy.ndarray`_, shape is (nspec) Wavelength array iorddet : :obj:`int` Order or detector (0-indexed) Returns ------- zeropoint : `numpy.ndarray`_, shape is (nspec,) """ # This routine can extrapolate return scipy.interpolate.interp1d(self.sens['SENS_WAVE'][iorddet,:], self.sens['SENS_ZEROPOINT_FIT'][iorddet,:], bounds_error=False, fill_value='extrapolate')(wave)