Source code for pypeit.multislit_flexure

""" Module for flexure routines

.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst

"""
import inspect
import pathlib

from astropy.io import ascii
from IPython import embed
import matplotlib
from matplotlib import pyplot as plt
import numpy as np

from pypeit import dataPaths
from pypeit import log
from pypeit import specobjs
from pypeit.core import fitting
from pypeit.datamodel import DataContainer
from pypeit.images.detector_container import DetectorContainer
from pypeit.images.mosaic import Mosaic


[docs] def sky_em_residuals( wave:np.ndarray, flux:np.ndarray, ivar:np.ndarray, sky_waves:np.ndarray, plot=False, noff=5., nfit_min=20 ): """ Calculate residuals and other metrics for a set of input sky emission lines. Args: wave (`numpy.ndarray`_): Wavelengths (in air!) flux (`numpy.ndarray`_): Fluxes ivar (`numpy.ndarray`_): Inverse variance sky_waves (`numpy.ndarray`_): Skyline wavelengths (in air!) plot (bool, optional): If true, plot the residuals noff (int, optional): Range in Ang to analyze labout emission line. Defaults to 5. nfit_min (int, optional): Minimum number of pixels required to do a fit. Defaults to 20. Returns: tuple of `numpy.ndarray`_ -- sky line wavelength of good lines, wavelength offset, error in wavelength offset, sky line width, error in sky line width """ dwave = [] diff = [] diff_err = [] los = [] los_err= [] good_ivar = ivar > 0 # Loop on known sky lines for line in sky_waves: wline = [line-noff,line+noff] mw = (wave > wline[0]) & (wave < wline[1]) & good_ivar # Reuire minimum number if np.sum(mw) <= nfit_min: continue p=[0,0,0,0] # Guess p0 = list(fitting.guess_gauss(wave[mw], flux[mw])) # Fit try: p, pcov = fitting.fit_gauss(wave[mw], flux[mw], w_out=1./np.sqrt(ivar[mw]), guesses=p0, nparam=4) except RuntimeError as e: log.warning('First attempt at Gaussian fit failed, ending with RuntimeError. Original ' f'exception: {e.args[0]} Assuming this is because it hit the maximum ' 'number of function evaluations. Trying again with a maximum of 10000.') # Try again with larger limit on the number of function evaluations p, pcov = fitting.fit_gauss(wave[mw], flux[mw], w_out=1./np.sqrt(ivar[mw]), guesses=p0, nparam=4, maxfev=10000) perr = np.sqrt(np.diag(pcov)) #except: # p=p0 # p[2] = -99 # perr=p0 # Continue d = p[2] - line # For debugging if plot: gfit = fitting.gauss_4deg(wave[mw],*p) plt.figure(figsize=(8,3)) plt.plot(wave[mw],gfit,'g') plt.plot(wave[mw],flux[mw]) plt.title('{} {:0.2f} diff= {:0.3f}'.format(line,p[3],d)) plt.show() # Check if not np.isfinite(perr[2]): perr[2] = 1000. # Save dwave = np.append(dwave,line) diff = np.append(diff,d) diff_err = np.append(diff_err,perr[2]) los = np.append(los,p[3]) los_err = np.append(los_err,perr[3]) # Cut on quality m=(diff_err < 0.1) & (diff_err > 0.0) # Return return dwave[m], diff[m], diff_err[m], los[m], los_err[m]
# TODO -- Consider separating the methods from the DataContainer as per calibrations
[docs] class MultiSlitFlexure(DataContainer): """ Class to perform multi-detector flexure analysis. Based on code written by Marla Geha for DEIMOS. The datamodel attributes are: .. include:: ../include/class_datamodel_multislitflexure.rst """ # Set the version of this class version = '1.1.0' datamodel = {'s1dfile': dict(otype=str, descr='spec1d filename'), 'PYP_SPEC': dict(otype=str, descr='PypeIt spectrograph name'), 'ndet': dict(otype=int, descr='Number of detectors per spectrum'), 'nslits': dict(otype=int, descr='Number of slits'), 'is_msc': dict(otype=np.ndarray, atype=(int, np.integer), descr='Flag that the "det" is the mosaic ID (ndet, nslits)'), 'det': dict(otype=np.ndarray, atype=(int, np.integer), descr='Integer identifiers for the detector or mosaic (ndet, nslits)'), 'SN': dict(otype=np.ndarray, atype=np.floating, descr='S/N (ndet, nslits)'), 'slitid': dict(otype=np.ndarray, atype=np.floating, descr='Slit ID (nslits)'), 'mn_wv': dict(otype=np.ndarray, atype=np.floating, descr='Mininum wavelength of the slit [Ang] (nslits)'), 'indiv_fit_slope': dict(otype=np.ndarray, atype=np.floating, descr='Fits to each slit individually (nslits)'), 'indiv_fit_b': dict(otype=np.ndarray, atype=np.floating, descr='Same as above but for b (nslits)'), 'indiv_fit_los': dict(otype=np.ndarray, atype=np.floating, descr='Same as above but for line width (nslits)'), 'fit_slope': dict(otype=np.ndarray, atype=np.floating, descr='Fitted slope (nslits)'), 'fit_b': dict(otype=np.ndarray, atype=np.floating, descr='Fitted b value(nslits)'), 'fit_los': dict(otype=np.ndarray, atype=np.floating, descr='Fitted line width(nslits)'), 'resid_sky': dict(otype=np.ndarray, atype=np.floating, descr='Residuals of flexure model on sky lines (nslits)'), 'objra': dict(otype=np.ndarray, atype=np.floating, descr='Object RA (nslits)'), 'objdec': dict(otype=np.ndarray, atype=np.floating, descr='Object DEC (nslits)'), 'maskdef_id': dict(otype=np.ndarray, atype=np.integer, descr='Mask ID (nslits)'), 'rms_arc': dict(otype=np.ndarray, atype=np.floating, descr='RMS of fit (ndet, nslits)')} internals = ['flex_par', # Parameters (FlexurePar) 'spectrograph', # spectrograph 'specobjs', # SpecObjs object 'sobj_idx', # (ndet, nslits); Index to specobjs (tuple of arrays) 'sky_table', # Sky line table # 2D models 'pmodel_m', 'pmodel_b', 'pmodel_l' ] def __init__(self, s1dfile=None, PYP_SPEC=None, nslits=None, det=None, SN=None, slitid=None, mn_wv=None, fit_slope=None, fit_b=None, fit_los=None, objra=None, objdec=None, maskdef_id=None, rms_arc=None, resid_sky=None, indiv_fit_slope=None, indiv_fit_b=None, indiv_fit_los=None): # Setup the DataContainer args, _, _, values = inspect.getargvalues(inspect.currentframe()) _d = {k: values[k] for k in args[1:]} # Init super().__init__(d=_d) # Load up specobjs # NOTE: specobjs here *is* the pypeit module self.specobjs = specobjs.SpecObjs.from_fitsfile(self.s1dfile, chk_version=False) # Sky lines -- This one is ASCII, so don't use load_sky_spectrum() sky_file = 'sky_single_mg.dat' self.sky_table = ascii.read(dataPaths.sky_spec.get_file_path(sky_file)) # NOTE: If you make changes to how this object is bundled into the output # datamodel, make sure you update the documentation in # doc/calibrations/flexure.rst!
[docs] def _bundle(self): """ Override the base class method simply to set the HDU extension name. """ return super()._bundle(ext='FLEXURE')
[docs] def init(self, spectrograph, par): """ Initialize this and that about the slits, par, spectrograph e.g. RA, DEC, S/N Args: spectrograph (:class:`pypeit.spectrographs.spectrograph.Spectrograph`): The spectrograph instance that sets the instrument used to take the observations. Used to set :attr:`spectrograph`. par (:class:`~pypeit.par.pypeitpar.FlexurePar`): The parameters used for the flexure processing """ # Internals self.spectrograph = spectrograph self.flex_par = par # Set self.PYP_SPEC = self.spectrograph.name self.sobj_idx = self.spectrograph.spec1d_match_spectra(self.specobjs) # self.nslits = len(self.sobj_idx[0]) self.ndet = len(self.sobj_idx) # Fill in 1D self['slitid'] = self.specobjs[self.sobj_idx[0]]['SLITID'].astype(float) self['objra'] = self.specobjs[self.sobj_idx[0]]['RA'] self['objdec'] = self.specobjs[self.sobj_idx[0]]['DEC'] #self['slitname'] = self.specobjs[self.sobj_idx[0]]['MASKDEF_OBJNAME'] self['maskdef_id'] = self.specobjs[self.sobj_idx[0]]['MASKDEF_ID'] # Compile the list of detector *names* once DETs = self.specobjs.DET # Find which ones are actually mosaics is_msc = np.array([Mosaic.name_prefix in d for d in DETs]).astype(np.uint16) # Use the relevant parser to get the integer identifier det_msc_num = np.array([Mosaic.parse_name(d) if m else DetectorContainer.parse_name(d) for d,m in zip(DETs, is_msc)]) # Then assign the attributes self.is_msc = np.vstack(tuple(is_msc[self.sobj_idx[det]] for det in range(self.ndet))) self.det = np.vstack(tuple(det_msc_num[self.sobj_idx[det]] for det in range(self.ndet))) # S/N and mn_wv from the spectra self['SN'] = np.zeros((self.ndet, self.nslits), dtype=float) self['mn_wv'] = np.zeros((self.ndet, self.nslits), dtype=float) for det in range(self.ndet): self['SN'][det] = [sobj.S2N for sobj in self.specobjs[self.sobj_idx[det]]] self['mn_wv'][det] = [sobj.mnx_wave[0] for sobj in self.specobjs[self.sobj_idx[det]]]
[docs] def fit_mask_surfaces(self): """ Fit 2D model to linear flexure models from each slit as a function of RA, DEC. """ # Cut on S/N good_SN = self['SN'] > self.flex_par['multi_min_SN'] good_slit = np.sum(good_SN, axis=0) == self.ndet # Basic stats mu = np.median(self['indiv_fit_slope'][good_slit]) sd = np.std(self['indiv_fit_slope'][good_slit]) mu2 = np.median(self['indiv_fit_b'][good_slit]) sd2 = np.std(self['indiv_fit_b'][good_slit]) # Cut down to +/- 2sigma mgood = (np.abs(self['indiv_fit_slope']-mu) < 2.*sd) \ & ( np.abs(self['indiv_fit_b']-mu2) < 2.*sd2) & good_slit # Fit me (without additional rejection) # TODO -- Allow for x,y position instead of RA, DEC self.pmodel_m = fitting.robust_fit(self['objra'][mgood], self['indiv_fit_slope'][mgood], (2,2), function='polynomial2d', x2=self['objdec'][mgood]) self.pmodel_b = fitting.robust_fit(self['objra'][mgood], self['indiv_fit_b'][mgood], (2,2), function='polynomial2d', x2=self['objdec'][mgood]) self.pmodel_l = fitting.robust_fit(self['objra'][mgood], self['indiv_fit_los'][mgood], (2,2), function='polynomial2d', x2=self['objdec'][mgood])
[docs] def measure_sky_lines(self): """Main method to analyze the sky lines for all the slits """ # Init for key in ['indiv_fit_slope', 'indiv_fit_b', 'indiv_fit_los']: self[key] = np.zeros(self.nslits) # Loop on slits for i in np.arange(0,self.nslits,1): if (i % 10) == 0: log.info("Working on slit {} of {}".format(i, self.nslits)) if not np.all(self['SN'][:,i] > 1.): continue # Loop on detectors sky_lines, sky_diffs, sky_ediffs, sky_loss = [], [], [], [] for det in range(self.ndet): sobj = self.specobjs[self.sobj_idx[det][i]] # Measure em # The following will break if only boxcar... # TODO -- Allow for boxcar sky_line, sky_diff, sky_ediff, los, _ = sky_em_residuals( sobj['OPT_WAVE'], sobj['OPT_COUNTS_SKY'], sobj['OPT_COUNTS_IVAR'], self.sky_table['Wave']) # Hold em sky_lines.append(sky_line) sky_diffs.append(sky_diff) sky_ediffs.append(sky_ediff) sky_loss.append(los) # Concatenate sky_lines = np.concatenate(sky_lines) sky_diffs = np.concatenate(sky_diffs) sky_ediffs = np.concatenate(sky_ediffs) sky_loss = np.concatenate(sky_loss) # FIT SINGLE SLIT SKY LINES WITH A LINE linear_fit = fitting.robust_fit(sky_lines, sky_diffs, weights=1./sky_ediffs**2, function='polynomial', order=1, maxrej=1, # Might increase lower=3., upper=3.) # Save self['indiv_fit_b'][i] = linear_fit.fitc[0] self['indiv_fit_slope'][i] = linear_fit.fitc[1] self['indiv_fit_los'][i] = np.median(sky_loss)
[docs] def update_fit(self): """Update fits for each slit based on 2D model """ # Do it self['fit_slope'] = self.pmodel_m.eval(self['objra'],x2=self['objdec']) self['fit_b'] = self.pmodel_b.eval(self['objra'],x2=self['objdec']) self['fit_los'] = self.pmodel_l.eval(self['objra'],x2=self['objdec']) # CALCULATE RESIDUALS FROM FIT # Only for QA (I think) resid_sky = [] for i in range(self.nslits): # Require sufficient S/N in reddest detector if self['SN'][-1,i] > 0: # Load up the full spectrum tmp_wave, all_flux, all_sky, all_ivar = np.ndarray(0), \ np.ndarray(0), np.ndarray(0), np.ndarray(0) # TODO -- Allow for Boxcar for det in range(self.ndet): sobj = self.specobjs[self.sobj_idx[det][i]] tmp_wave = np.concatenate((tmp_wave, sobj.OPT_WAVE)) all_flux = np.concatenate((all_flux, sobj.OPT_COUNTS)) all_sky = np.concatenate((all_sky, sobj.OPT_COUNTS_SKY)) all_ivar = np.concatenate((all_ivar, sobj.OPT_COUNTS_IVAR)) # Massage fitwave = self['fit_slope'][i]*tmp_wave + self['fit_b'][i] all_wave = tmp_wave - fitwave # TRIM ENDS all_wave=all_wave[5:-15] all_flux=all_flux[5:-15] all_ivar=all_ivar[5:-15] all_sky=all_sky[5:-15] # REMOVE CRAZY 500-SIGMA VALUES cmask = (all_sky > np.percentile(all_sky,0.1)) & (all_sky < np.percentile(all_sky,99.9)) m=np.median(all_sky[cmask]) s=np.std(all_sky[cmask]) mm = (all_sky > 500.*s + m) | (all_sky < m-50.*s) all_sky[mm] = m all_ivar[mm] = 1e6 if (np.sum(mm) > 10): log.warning('Removing more than 10 pixels of data') _,diff,diff_err,_,_ = sky_em_residuals(all_wave, all_sky, all_ivar, self.sky_table['Wave']) m = np.isfinite(diff) sky_mean = np.average(np.abs(diff[m]), weights = 1./diff_err[m]**2) resid_sky = np.append(resid_sky,sky_mean) else: resid_sky = np.append(resid_sky,-1) self['resid_sky'] = resid_sky
[docs] def qa_plots(self, plot_dir:str, root:str): """Generate QA plots Args: plot_dir (str): Top-lvel folder for QA QA/ is generated beneath this, as needed root (str): Root for output files """ # Generate QA folder as need be qa_dir = pathlib.Path(plot_dir) / 'QA' if not qa_dir.is_dir(): qa_dir.mkdir(parents=True) ''' # Slopes pdf2 = matplotlib.backends.backend_pdf.PdfPages(os.path.join(qa_dir, 'flex_slits_'+root+'.pdf')) plt.rcParams.update({'figure.max_open_warning': 0}) for i in np.arange(0,self.nslits,1): if not np.all(self['SN'][:,i] > 0.): continue # SKY LINES FIRST r_sky_line, r_sky_diff,r_sky_ediff,r_los,r_elos = sky_em_residuals(hdu[r].data['OPT_WAVE'], \ hdu[r].data['OPT_COUNTS_SKY'],\ hdu[r].data['OPT_COUNTS_IVAR']) b_sky_line, b_sky_diff,b_sky_ediff,b_los,b_elos = sky_em_residuals(hdu[b].data['OPT_WAVE'], \ hdu[b].data['OPT_COUNTS_SKY'],\ hdu[b].data['OPT_COUNTS_IVAR']) fig, (ax1,ax2) = plt.subplots(1, 2,figsize=(20,4)) ax1.plot(r_sky_line,r_sky_diff,'ro',alpha=0.8,label='Red chip: Sky Emission') ax1.plot(b_sky_line,b_sky_diff,'bo',alpha=0.8,label='Blue chip: Sky Emission') ax1.errorbar(b_sky_line,b_sky_diff,yerr=b_sky_ediff,fmt='none',ecolor='b',alpha=0.5) ax1.errorbar(r_sky_line,r_sky_diff,yerr=r_sky_ediff,fmt='none',ecolor='r',alpha=0.5) ax1.text(6320,0,'{}'.format(b),fontsize=11) ax1.text(8500,0,'{}'.format(r),fontsize=11) ax1.set_ylim(-0.45,0.45) x=np.arange(6000,9000,1) l1 = slits['fit_slope'][i]*x + slits['fit_b'][i] l2 = fslits['fit_slope'][i]*x + fslits['fit_b'][i] ax1.plot(x,l1,'-') ax1.plot(x,l2,'--') ax1.axhline(linewidth=1, color='grey',alpha=0.5) ax1.set_ylabel('Wavelength offset (AA)') ax1.set_xlabel('Wavelength (AA)') ax1.set_xlim(6300,9100) t = 'Sky Line Fits , resid = {:0.4f} AA, arc = {:0.2f}'.format(slits['resid_sky'][i],0.32*slits['rms_arc_r'][i]) ax1.set_title(t) sky_diff = np.concatenate((r_sky_diff,b_sky_diff),axis=None) sky_lines = np.concatenate((r_sky_line,b_sky_line),axis=None) sky_ediff = np.concatenate((r_sky_ediff,b_sky_ediff),axis=None) sky_los = np.concatenate((r_los,b_los),axis=None) ax2.plot(r_sky_line,r_los,'ro',alpha=0.8,label='Red chip: Sky Emission') ax2.plot(b_sky_line,b_los,'bo',alpha=0.8,label='Blue chip: Sky Emission') ax2.errorbar(r_sky_line,r_los,yerr=r_elos,fmt='none',ecolor='r',alpha=0.5) ax2.errorbar(b_sky_line,b_los,yerr=b_elos,fmt='none',ecolor='b',alpha=0.5) ax2.axhline(fslits['fit_los'][i],linewidth=1, color='grey',alpha=0.5) ax2.set_title('Line widths') ax2.set_xlabel('Wavelength (AA)') ax2.set_ylim(0.3,0.8) ax2.set_xlim(6300,9100) pdf2.savefig() pdf2.close() plt.close('all') ''' ######################################################################### # CREATE FULL MASK FITS pdf = matplotlib.backends.backend_pdf.PdfPages( plot_dir+'QA/flex_mask_'+root+'.pdf') xslit = self['objra'] yslit = self['objdec'] t=2. mu = np.median(self['indiv_fit_slope']) sd = np.std(self['indiv_fit_slope']) mu2 = np.median(self['indiv_fit_b']) sd2 = np.std(self['indiv_fit_b']) mu3 = np.median(self['indiv_fit_los']) sd3 = np.std(self['indiv_fit_los']) # PLOT FITTED VALUES fig, (ax1,ax2,ax3) = plt.subplots(1, 3,figsize=(22,5)) mm1=-0.00005 mm2=0.00005 print(mu-t*sd,mu+t*sd) ax1.scatter(xslit,yslit,c=self['indiv_fit_slope'], cmap="cool",vmin = mm1,vmax=mm2 )# mu-t*sd,vmax=mu+t*sd) ax1.set_ylabel('Dec [deg]') ax1.set_xlabel('RA [deg]') ax1.set_title('Wave MEASURE: line slope') #cax, _ = matplotlib.colorbar.make_axes(ax1) #normalize = matplotlib.colors.Normalize(vmin = mu-t*sd,vmax=mu+t*sd) #cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='cool',norm=normalize) ax2.scatter(xslit,yslit,c=self['indiv_fit_b'],cmap="summer", vmin = mu2-t*sd2,vmax=mu2+t*sd2) ax2.set_ylabel('Dec [deg]') ax2.set_xlabel('RA [deg]') ax2.set_title('Wave MEASURE: line intercept') cax, _ = matplotlib.colorbar.make_axes(ax2) normalize = matplotlib.colors.Normalize(vmin = mu2-t*sd2,vmax=mu2+t*sd2) #cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='summer',norm=normalize) ax3.scatter(xslit,yslit,c=self['indiv_fit_los'],cmap="cool",vmin = mu3-t*sd3,vmax=mu3+t*sd3) ax3.set_ylabel('Dec [deg]') ax3.set_xlabel('RA [deg]') ax3.set_title('Wave MEASURE: line width') cax, _ = matplotlib.colorbar.make_axes(ax3) normalize = matplotlib.colors.Normalize(vmin = mu3-t*sd3,vmax=mu3+t*sd3) #cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='cool',norm=normalize) pdf.savefig() ####################### # PLOT MEASURED VALUES fig, (ax1,ax2,ax3) = plt.subplots(1, 3,figsize=(22,5)) ax1.scatter(xslit,yslit,c=self['fit_slope'], cmap="cool",vmin = mu-t*sd,vmax=mu+t*sd) ax1.set_ylabel('Dec [deg]') ax1.set_xlabel('RA [deg]') ax1.set_title('Wave fit: line slope') cax, _ = matplotlib.colorbar.make_axes(ax1) normalize = matplotlib.colors.Normalize(vmin = mu-t*sd,vmax=mu+t*sd) #cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='cool',norm=normalize) ax2.scatter(xslit,yslit,c=self['fit_b'], cmap="summer",vmin = mu2-t*sd2,vmax=mu2+t*sd2) ax2.set_ylabel('Dec [deg]') ax2.set_xlabel('RA [deg]') ax2.set_title('Wave fit: line intercept') cax, _ = matplotlib.colorbar.make_axes(ax2) normalize = matplotlib.colors.Normalize(vmin = mu2-t*sd2,vmax=mu2+t*sd2) #cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='summer',norm=normalize) ax3.scatter(xslit,yslit,c=self['fit_los'], cmap="cool",vmin = mu3-t*sd3,vmax=mu3+t*sd3) ax3.set_ylabel('Dec [deg]') ax3.set_xlabel('RA [deg]') ax3.set_title('Wave fit: line width') cax, _ = matplotlib.colorbar.make_axes(ax3) normalize = matplotlib.colors.Normalize(vmin = mu3-t*sd3,vmax=mu3+t*sd3) #cbar = matplotlib.colorbar.ColorbarBase(cax, cmap='cool',norm=normalize) pdf.close()