Source code for pypeit.scripts.reduce_by_step

"""
Script to run to a single calibration step for an input frame

.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""

from pypeit.scripts import scriptbase

[docs] class ReducebyStep(scriptbase.ScriptBase):
[docs] @classmethod def get_parser(cls, width=None): parser = super().get_parser( description='Run one of the PypeIt reduction steps on a single frame (and detector)', width=width, formatter=scriptbase.SmartFormatter ) parser.add_argument( 'pypeit_file', type=str, help='PypeIt reduction file (must have .pypeit extension)' ) parser.add_argument( 'frame', type=str, help='Raw science/standard frame to reduce as listed in your PypeIt file, e.g. ' 'b28.fits.gz.') parser.add_argument( 'step', type=str, choices=['process', 'findobj', 'extract'], help='Reduction step to perform. Must be "process" to perform basic image processing ' '(bias subtraction, field flattening, etc), "findobj" to perform object ' 'detection and sky subtraction, or "extract" to extract 1D spectra.' ) parser.add_argument( '--det', default=None, type=str, help='Single detector number or Mosaic tuple. The Mosaic tuple must include the parentheses ' 'and be provided as a string, e.g. "(1,2)". Required, but the list of options is provided ' 'if nothing is provided.' ) parser.add_argument( '--show', default=False, action='store_true', help='Show reduction steps via plots (which will block further execution until ' 'clicked on) and outputs to ginga. Requires remote control ginga session via ' '"ginga --modules=RC,SlitWavelength &"' ) return parser
[docs] @classmethod def main(cls, args): import numpy as np from pathlib import Path from pypeit.core import parse from pypeit import pypeit from pypeit import pypeit_steps from pypeit import log from pypeit import PypeItError from pypeit import outputfiles from pypeit.images import pypeitimage from pypeit import specobjs from pypeit import spec2dobj from pypeit import exposure from pypeit import slittrace from IPython import embed # Set a default log file based on the name of the pypeit file, not the # name of the script # NOTE: This is copied directly from run_pypeit.py if args.log_file == 'default': _pypeit_file = Path(args.pypeit_file) if _pypeit_file.suffix != '.pypeit': raise PypeItError('Input file must have a .pypeit extension!') args.log_file = _pypeit_file.with_suffix('.log') cls.init_log(args) # Instantiate the main pipeline reduction object pypeIt = pypeit.PypeIt(args.pypeit_file, show=args.show) pypeIt.reuse_calibs = True # Detector detectors = pypeIt.spectrograph.select_detectors() mosaics = pypeIt.spectrograph.allowed_mosaics if args.det is None: print("---------------------------------------------------------------------") print("---------------------------------------------------------------------") print("---------------------------------------------------------------------") print(f"No detector provided (--det). Choose from one of these: {detectors}.") if len(mosaics) > 0: print("") print(f"This instrument also supports the following mosaics: {mosaics}") print('To reduce a mosaic, provide the mosaic as the detectors tuple, e.g. --det "(1,2)"') print("") print("---------------------------------------------------------------------") return else: det = pypeIt.spectrograph.select_detectors(subset=parse.eval_detectors(args.det)) if len(det) > 1: raise PypeItError("The input --det must be a single detector or mosaic.") det = det[0] # detector name det_name = pypeIt.spectrograph.get_det_name(det) # Find the frame mt_row = pypeIt.fitstbl['filename'] == args.frame if np.sum(mt_row) != 1: raise PypeItError(f"Frame {args.frame} not found or not unique") frame = int(np.where(mt_row)[0][0]) calib_IDs = pypeIt.fitstbl.find_frame_calib_groups(frame) if len(calib_IDs) != 1: raise PypeItError( f"Frame {args.frame} is a calibration frame. This script is for science/standard " "frames only" ) calib_ID = calib_IDs[0] # Sci metadata objtype_out, calib_key, obstime, basename, binning = pypeit_steps.get_sci_metadata( pypeIt.spectrograph, pypeIt.fitstbl, frame, det) # Find all the frames comb_id = pypeIt.fitstbl['comb_id'][frame] frames, bg_frames = pypeIt.fitstbl.get_frames_from_combid(comb_id) # Intermediate filenames sci_filename = outputfiles.intermediate_filename('sciImg', basename, det_name) bkg_filename = outputfiles.intermediate_filename('bkgImg', basename, det_name) sky_filename = outputfiles.intermediate_filename('Sky', basename, det_name) bkgredux_sky_filename = outputfiles.intermediate_filename('BkgReduxSky', basename, det_name) spec1d_filename = outputfiles.intermediate_filename('spec1d', basename, 'all') slits_filename = outputfiles.intermediate_filename('slits', basename, det_name) # Prep for background subtraction and finding negative traces has_bg, bkg_redux, find_negative = pypeit_steps.set_bkg_negative( pypeIt.fitstbl, pypeIt.par, bg_frames) # Process? if args.step == 'process': pypeit_steps.process_one_det( pypeIt.spectrograph, pypeIt.fitstbl, pypeIt.par, frames, det, calib_ID, pypeIt.calibrations_path, bg_frames=bg_frames, sci_outfile=sci_filename, bkg_outfile=bkg_filename) # All done return # Find Objects if args.step == 'findobj': # Load intermediate frames needed for finding objects log.info(f'Loading images for detector {det}') sciImg = pypeitimage.PypeItImage.from_file(sci_filename) if bg_frames is not None and len(bg_frames) > 0: bkg_redux_sciimg = pypeitimage.PypeItImage.from_file(bkg_filename) else: bkg_redux_sciimg = None # Load up the standard star spec1d file if it exists if objtype_out == 'science': is_standard = pypeIt.fitstbl.find_frames('standard') frame_indx = np.arange(len(pypeIt.fitstbl)) try: std_outfile = outputfiles.get_std_outfile(pypeIt.fitstbl, pypeIt.par, frame_indx[is_standard]) except PypeItError: log.warning( 'No reduced standard star spec1d file found for this science frame, but ' 'one was expected because it is in your PypeIt file.\n Continuing ' 'without standard star information.' ) std_outfile = None else: std_outfile = None # ##################################### # find objects + initial sky subtraction initial_sky, sobjs_obj_find, objFind = pypeit_steps.findobj_on_det( sciImg, pypeIt.spectrograph, pypeIt.fitstbl, pypeIt.par, frames, calib_ID, det, pypeIt.calibrations_path, bkg_redux=bkg_redux, find_negative=find_negative, std_outfile=std_outfile, show=args.show) # ##################################### # slitmask stuff if pypeIt.par['reduce']['slitmask']['assign_obj']: detname = sciImg.detector.name sciImg_dict = {detname: sciImg} slits, sobjs_obj_find = exposure.adjust_for_slitmask( sciImg_dict, pypeIt.spectrograph, pypeIt.fitstbl, pypeIt.par, frames[0], sobjs_obj_find, [objFind.slits]) objFind.slits = slits[0] # ##################################### # final sky subtraction final_global_sky, bkg_redux_global_sky, objFind = \ pypeit_steps.finalize_sky_det(pypeIt.spectrograph, pypeIt.fitstbl, pypeIt.par, frames[0], det, objFind, initial_sky, sobjs_obj_find, bkg_redux_sciimg=bkg_redux_sciimg, bkg_redux=bkg_redux, show=args.show) # update Slits _slits = objFind.slits flagged_slits = np.where(objFind.reduce_bpm)[0] if len(flagged_slits) > 0: _slits.mask[flagged_slits] = \ _slits.bitmask.turn_on(_slits.mask[flagged_slits], 'BADSKYSUB') # Update the sciImg with the scaleImg information sciImg.rel_scaleImg = objFind.scaleimg # and the global spectral flexure shift sciImg.flex_shift = objFind.slitshift # Write # sobjs object found sobjs_obj_find.write_to_fits({}, spec1d_filename) log.info(f'Wrote intermediate spec1d file with objects found to {spec1d_filename}') # final sky image skyimg = pypeitimage.PypeItImage(final_global_sky) if not sky_filename.parent.is_dir(): sky_filename.parent.mkdir() skyimg.to_file(sky_filename, overwrite=True) log.info(f'Wrote final sky image to {sky_filename}') # bkg_redux sky image if bkg_redux_global_sky is not None: bkgredux_skyimg = pypeitimage.PypeItImage(bkg_redux_global_sky) bkgredux_skyimg.to_file(bkgredux_sky_filename, overwrite=True) log.info(f'Wrote bkg_redux final sky image to {bkgredux_sky_filename}') # slits _slits.to_file(slits_filename, overwrite=True) log.info(f'Wrote intermediate slits to {slits_filename}') # updated sciImg sciImg.to_file(sci_filename, overwrite=True) log.info(f'Wrote updated science image to {sci_filename}') # Extract? if args.step == 'extract': # Load intermediate frames needed for the extraction log.info(f'Loading images for detector {det}') sciImg = pypeitimage.PypeItImage.from_file(sci_filename) # sky images log.info(f'Loading sky image for detector {det}') if not sky_filename.is_file(): raise PypeItError(f'Sky image {sky_filename} not found!') skyimg = pypeitimage.PypeItImage.from_file(sky_filename) skyimg = skyimg.image if bkgredux_sky_filename.is_file(): log.info(f'Loading bkg_redux sky image for detector {det}') bkg_redux_skyimg = pypeitimage.PypeItImage.from_file(bkgredux_sky_filename) bkg_redux_skyimg = bkg_redux_skyimg.image else: bkg_redux_skyimg = None # specobjs from findobj log.info(f'Loading spec1d file for detector {det}') if not spec1d_filename.is_file(): raise PypeItError(f'spec1d file {spec1d_filename} not found!') specobjs_objfind = specobjs.SpecObjs.from_fitsfile(spec1d_filename) # slits log.info(f'Loading slits for detector {det}') if not slits_filename.is_file(): raise PypeItError(f'Slits file {slits_filename} not found!') calib_slits = slittrace.SlitTraceSet.from_file(slits_filename) # Container for Spec2DObj all_spec2d = spec2dobj.AllSpec2DObj() all_spec2d['meta']['bkg_redux'] = bkg_redux all_spec2d['meta']['find_negative'] = find_negative # Grab the objects detname = sciImg.detector.name if specobjs_objfind.nobj > 0: specobjs_on_det = specobjs_objfind[specobjs_objfind.DET == detname] else: specobjs_on_det = specobjs_objfind # Extract all_spec2d[detname], sobjs_extract = pypeit_steps.extract_det( pypeIt.spectrograph, pypeIt.fitstbl, pypeIt.par, frames, det, calib_ID, pypeIt.calibrations_path, sciImg, skyimg, specobjs_on_det, calib_slits, bkg_redux_final_sky=bkg_redux_skyimg, bkg_redux=bkg_redux, find_negative=find_negative) # TODO: Should we add the calibration associations to the SpecObjs object as done in the main run? # Save it exposure.save_exposure(pypeIt.spectrograph, pypeIt.fitstbl, pypeIt.par, frame, all_spec2d, sobjs_extract, pypeIt.calibrations_path, in_update_det=det)