Source code for pypeit.scripts.coadd_2dspec

"""
Script for performing 2d coadds of PypeIt data.

.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
from pypeit.scripts import scriptbase
from pypeit.core import parse

[docs] class CoAdd2DSpec(scriptbase.ScriptBase):
[docs] @classmethod def get_parser(cls, width=None): parser = super().get_parser(description='Coadd 2D spectra produced by PypeIt', width=width, default_log_file=True) parser.add_argument('coadd2d_file', type=str, default=None, help='File to guide 2d coadds') parser.add_argument("--show", default=False, action="store_true", help="Show the reduction steps. Equivalent to the -s option when " "running pypeit.") parser.add_argument("--debug_offsets", default=False, action="store_true", help="Show QA plots useful for debugging automatic offset " "determination") parser.add_argument("--peaks", default=False, action="store_true", help="Show the peaks found by the object finding algorithm.") parser.add_argument("--basename", type=str, default=None, help="Basename of files to save the parameters, spec1d, and spec2d") parser.add_argument("--debug", default=False, action="store_true", help="show debug plots?") return parser
[docs] @classmethod def main(cls, args): """ Executes 2d coadding """ from pathlib import Path import copy from IPython import embed import numpy as np from astropy.io import fits from pypeit import log from pypeit import coadd2d from pypeit import history from pypeit import inputfiles from pypeit import specobjs from pypeit import spec2dobj # Initialize the log cls.init_log(args) # Load the file coadd2dFile = inputfiles.Coadd2DFile.from_file(args.coadd2d_file) spectrograph, par, _ = coadd2dFile.get_pypeitpar(pypeit_fits=True) # Check some of the parameters # TODO Heliocentric for coadd2d needs to be thought through. Currently turning it off. if par['calibrations']['wavelengths']['refframe'] != 'observed': log.warning('Wavelength reference frame shift (e.g., heliocentric correction) not yet ' 'fully developed. Ignoring input and setting "refframe = observed".') par['calibrations']['wavelengths']['refframe'] = 'observed' # TODO Flexure correction for coadd2d needs to be thought through. Currently turning it off. if par['flexure']['spec_method'] != 'skip': log.warning('Spectroscopic flexure correction not yet fully developed. Skipping.') par['flexure']['spec_method'] = 'skip' # TODO This is currently the default for 2d coadds, but we need a way to toggle it on/off if not par['reduce']['findobj']['skip_skysub']: log.warning('Must skip sky subtraction when finding objects (i.e., sky should have ' 'been subtracted during primary reduction procedure). Skipping.') par['reduce']['findobj']['skip_skysub'] = True # Get the files spec2d_files = coadd2dFile.filenames # Get the paths coadd_scidir, qa_path = map(lambda x : Path(x).absolute(), coadd2d.CoAdd2D.output_paths(spec2d_files, par, coadd_dir=par['rdx']['redux_path'])) # Get the output basename head2d = fits.getheader(spec2d_files[0]) basename = coadd2d.CoAdd2D.default_basename(spec2d_files) \ if args.basename is None else args.basename # Write the par to disk par_outfile = coadd_scidir.parent / f'{basename}_coadd2d.par' print(f'Writing full parameter set to {par_outfile}.') par.to_config(par_outfile, exclude_defaults=True, include_descr=False) # Now run the coadds bkg_redux = head2d['SKYSUB'] == 'DIFF' find_negative = head2d['FINDOBJ'] == 'POS_NEG' # Print status message log_string = f'Reducing target {basename}\n' log_string += f"Coadding frame sky-subtracted with {head2d['SKYSUB']}\n" log_string += f"Searching for objects that are {head2d['FINDOBJ']}\n" log_string += 'Combining frames in 2d coadd:\n' for f, file in enumerate(spec2d_files): log_string += f'File {f}: {Path(file).name}\n' log.info(log_string) # Instantiate the sci_dict # TODO Why do we need this sci_dict at all?? JFH sci_dict = {} # Standard dictionaries are ordered for Python >= 3.7 sci_dict['meta'] = {} sci_dict['meta']['vel_corr'] = 0. sci_dict['meta']['bkg_redux'] = bkg_redux sci_dict['meta']['find_negative'] = find_negative # Find the detectors to reduce detectors = spectrograph.select_detectors(subset=par['rdx']['detnum'] if par['coadd2d']['only_slits'] is None else par['coadd2d']['only_slits']) log.info(f'Detectors to work on: {detectors}') # container for specobjs all_specobjs = specobjs.SpecObjs() # container for spec2dobj all_spec2d = spec2dobj.AllSpec2DObj() # set some meta all_spec2d['meta']['bkg_redux'] = bkg_redux all_spec2d['meta']['find_negative'] = find_negative # get only_slits and exclude_slits if they are set only_dets, only_spat_ids, exclude_dets, exclude_spat_ids = None, None, None, None if par['coadd2d']['only_slits'] is not None: only_dets, only_spat_ids = parse.parse_slitspatnum(par['coadd2d']['only_slits']) if par['coadd2d']['exclude_slits'] is not None: if par['coadd2d']['only_slits'] is not None: log.warning('Both `only_slits` and `exclude_slits` are provided. They are mutually exclusive. ' 'Using `only_slits` and ignoring `exclude_slits`') else: exclude_dets, exclude_spat_ids = parse.parse_slitspatnum(par['coadd2d']['exclude_slits']) # Loop on detectors for det in detectors: log.info("Working on detector {0}".format(det)) detname = spectrograph.get_det_name(det) this_only_slits = only_spat_ids[only_dets == detname] if np.any(only_dets == detname) else None this_exclude_slits = exclude_spat_ids[exclude_dets == detname] if np.any(exclude_dets == detname) else None # Instantiate Coadd2d coadd = coadd2d.CoAdd2D.get_instance(spec2d_files, spectrograph, par, det=det, only_slits=this_only_slits, exclude_slits=this_exclude_slits, bkg_redux=bkg_redux, find_negative=find_negative, debug_offsets=args.debug_offsets, debug=args.debug) # TODO Add this stuff to a run method in coadd2d # Coadd the slits coadd_dict_list = coadd.coadd() # Create the pseudo images pseudo_dict = coadd.create_pseudo_image(coadd_dict_list) # Reduce log.info('Running the extraction') # TODO -- This should mirror what is in pypeit.extract_one # TODO -- JFH :: This ought to return a Spec2DObj and SpecObjs which # would be slurped into AllSpec2DObj and all_specobsj, as below. # TODO -- JFH -- Check that the slits we are using are correct sci_dict[coadd.detname] = {} sci_dict[coadd.detname]['sciimg'], sci_dict[coadd.detname]['sciivar'], \ sci_dict[coadd.detname]['skymodel'], sci_dict[coadd.detname]['objmodel'], \ sci_dict[coadd.detname]['ivarmodel'], sci_dict[coadd.detname]['outmask'], \ sci_dict[coadd.detname]['specobjs'], sci_dict[coadd.detname]['detector'], \ sci_dict[coadd.detname]['slits'], sci_dict[coadd.detname]['tilts'], \ sci_dict[coadd.detname]['waveimg'] \ = coadd.reduce(pseudo_dict, show=args.show, show_peaks=args.peaks, basename=basename) # Tack on detector (similarly to pypeit.extract_one) for sobj in sci_dict[coadd.detname]['specobjs']: sobj.DETECTOR = sci_dict[coadd.detname]['detector'] # fill the specobjs container all_specobjs.add_sobj(sci_dict[coadd.detname]['specobjs']) # fill the spec2dobj container but first ... # pull out maskdef_designtab from sci_dict[det]['slits'] maskdef_designtab = sci_dict[coadd.detname]['slits'].maskdef_designtab slits = copy.deepcopy(sci_dict[coadd.detname]['slits']) slits.maskdef_designtab = None # fill up all_spec2d[coadd.detname] = spec2dobj.Spec2DObj(sciimg=sci_dict[coadd.detname]['sciimg'], ivarraw=sci_dict[coadd.detname]['sciivar'], skymodel=sci_dict[coadd.detname]['skymodel'], bkg_redux_skymodel=None, objmodel=sci_dict[coadd.detname]['objmodel'], ivarmodel=sci_dict[coadd.detname]['ivarmodel'], scaleimg=np.array([1.0], dtype=float), bpmmask=sci_dict[coadd.detname]['outmask'], detector=sci_dict[coadd.detname]['detector'], slits=slits, wavesol=None, waveimg=sci_dict[coadd.detname]['waveimg'], tilts=sci_dict[coadd.detname]['tilts'], sci_spat_flexure=None, sci_spec_flexure=None, vel_corr=None, vel_type=None, maskdef_designtab=maskdef_designtab) all_spec2d['meta']['effective_exptime'] = coadd.exptime_coadd # SAVE TO DISK # Add history entries for coadding. coadd_history = history.History() coadd_history.add_coadd2d(spec2d_files, basename) # THE FOLLOWING MIMICS THE CODE IN pypeit.save_exposure() subheader = spectrograph.subheader_for_spec(head2d, head2d) subheader['coaddobj'] = (basename, 'Coadd object base name') # Write spec1D if all_specobjs.nobj > 0: outfile1d = coadd_scidir / f'spec1d_{basename}.fits' all_specobjs.write_to_fits(subheader, outfile1d, history=coadd_history) # Info outfiletxt = coadd_scidir / f'spec1d_{basename}.txt' sobjs = specobjs.SpecObjs.from_fitsfile(outfile1d, chk_version=False) sobjs.write_info(outfiletxt, spectrograph.pypeline) # Build header for spec2d outfile2d = coadd_scidir / f'spec2d_{basename}.fits' pri_hdr = all_spec2d.build_primary_hdr(head2d, spectrograph, subheader=subheader, history=coadd_history, # TODO -- JFH :: Decide if we need any of these redux_path=None) # Write spec2d all_spec2d.write_to_fits(outfile2d, pri_hdr=pri_hdr)