"""
Main driver class for PypeIt run
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
from pathlib import Path
import time
import os
import copy
import datetime
from IPython import embed
import numpy as np
from astropy.io import fits
from astropy.table import Table
from pypeit import io
from pypeit import inputfiles
from pypeit.calibframe import CalibFrame
from pypeit.core import parse, wave, qa
from pypeit import msgs
from pypeit import calibrations
from pypeit.images import buildimage
from pypeit.display import display
from pypeit import find_objects
from pypeit import extraction
from pypeit import spec2dobj
from pypeit import specobjs
#from pypeit.spectrographs.util import load_spectrograph
from pypeit import slittrace
from pypeit import utils
from pypeit.history import History
#from pypeit.par import PypeItPar
#from pypeit.par.pypeitpar import ql_is_on
from pypeit.metadata import PypeItMetaData
from pypeit.manual_extract import ManualExtractionObj
from pypeit.core import skysub
from linetools import utils as ltu
[docs]class PypeIt:
"""
This class runs the primary calibration and extraction in PypeIt
.. todo::
Fill in list of attributes!
Args:
pypeit_file (:obj:`str`):
PypeIt filename.
verbosity (:obj:`int`, optional):
Verbosity level of system output. Can be:
- 0: No output
- 1: Minimal output (default)
- 2: All output
overwrite (:obj:`bool`, optional):
Flag to overwrite any existing files/directories.
reuse_calibs (:obj:`bool`, optional):
Reuse any pre-existing calibration files
logname (:obj:`str`, optional):
The name of an ascii log file with the details of the
reduction.
show: (:obj:`bool`, optional):
Show reduction steps via plots (which will block further
execution until clicked on) and outputs to ginga. Requires
remote control ginga session via ``ginga --modules=RC,SlitWavelength &``
redux_path (:obj:`str`, optional):
Over-ride reduction path in PypeIt file (e.g. Notebook usage)
calib_only: (:obj:`bool`, optional):
Only generate the calibration files that you can
Attributes:
pypeit_file (:obj:`str`):
Name of the pypeit file to read. PypeIt files have a
specific set of valid formats. A description can be found
:ref:`pypeit_file`.
fitstbl (:obj:`pypeit.metadata.PypeItMetaData`): holds the meta info
"""
def __init__(self, pypeit_file, verbosity=2, overwrite=True, reuse_calibs=False, logname=None,
show=False, redux_path=None, calib_only=False):
# Set up logging
self.logname = logname
self.verbosity = verbosity
self.pypeit_file = pypeit_file
self.msgs_reset()
# Load up PypeIt file
self.pypeItFile = inputfiles.PypeItFile.from_file(pypeit_file)
self.calib_only = calib_only
# Build the spectrograph and the parameters
self.spectrograph, self.par, config_specific_file = self.pypeItFile.get_pypeitpar()
msgs.info(f'Loaded spectrograph {self.spectrograph.name}')
msgs.info('Setting configuration-specific parameters using '
f'{os.path.split(config_specific_file)[1]}.')
# Check the output paths are ready
if redux_path is not None:
self.par['rdx']['redux_path'] = redux_path
# Write the full parameter set here
# --------------------------------------------------------------
par_file = pypeit_file.replace(
'.pypeit', f"_UTC_{datetime.datetime.utcnow().date()}.par")
self.par.to_config(par_file, include_descr=False)
# --------------------------------------------------------------
# Build the meta data
# - Re-initilize based on the file data
msgs.info('Compiling metadata')
self.fitstbl = PypeItMetaData(self.spectrograph, self.par,
files=self.pypeItFile.filenames,
usrdata=self.pypeItFile.data,
strict=True)
# - Interpret automated or user-provided data from the PypeIt
# file
self.fitstbl.finalize_usr_build(
self.pypeItFile.frametypes,
self.pypeItFile.setup_name)
# Other Internals
self.overwrite = overwrite
# Currently the runtime argument determines the behavior for
# reusing calibrations
self.reuse_calibs = reuse_calibs
self.show = show
# Set paths
self.calibrations_path = os.path.join(self.par['rdx']['redux_path'],
self.par['calibrations']['calib_dir'])
# Check for calibrations
if not self.calib_only:
calibrations.check_for_calibs(self.par, self.fitstbl,
raise_error=self.par['calibrations']['raise_chk_error'])
# --------------------------------------------------------------
# - Write .calib file (For QA naming amongst other things)
calib_file = pypeit_file.replace('.pypeit', '.calib')
calibrations.Calibrations.association_summary(calib_file, self.fitstbl, self.spectrograph,
self.calibrations_path, overwrite=True)
# Report paths
msgs.info('Setting reduction path to {0}'.format(self.par['rdx']['redux_path']))
msgs.info('Calibration frames saved to: {0}'.format(self.calibrations_path))
msgs.info('Science data output to: {0}'.format(self.science_path))
msgs.info('Quality assessment plots output to: {0}'.format(self.qa_path))
# Init
self.det = None
self.tstart = None
self.basename = None
self.obstime = None
@property
def science_path(self):
"""Return the path to the science directory."""
return os.path.join(self.par['rdx']['redux_path'], self.par['rdx']['scidir'])
@property
def qa_path(self):
"""Return the path to the top-level QA directory."""
return os.path.join(self.par['rdx']['redux_path'], self.par['rdx']['qadir'])
[docs] def build_qa(self):
"""
Generate QA wrappers
"""
msgs.qa_path = self.qa_path
qa.gen_qa_dir(self.qa_path)
qa.gen_mf_html(self.pypeit_file, self.qa_path)
qa.gen_exp_html()
# TODO: This should go in a more relevant place
[docs] def spec_output_file(self, frame, twod=False):
"""
Return the path to the spectral output data file.
Args:
frame (:obj:`int`):
Frame index from :attr:`fitstbl`.
twod (:obj:`bool`):
Name for the 2D output file; 1D file otherwise.
Returns:
:obj:`str`: The path for the output file
"""
return self.get_spec_file_name(self.science_path, self.fitstbl.construct_basename(frame),
twod=twod)
[docs] @staticmethod
def get_spec_file_name(science_path, basename, twod=False):
return os.path.join(science_path, f'spec{"2" if twod else "1"}d_{basename}.fits')
[docs] def outfile_exists(self, frame):
"""
Check whether the 2D outfile of a given frame already exists
Args:
frame (int): Frame index from fitstbl
Returns:
bool: True if the 2d file exists, False if it does not exist
"""
return os.path.isfile(self.spec_output_file(frame, twod=True))
[docs] def get_std_outfile(self, standard_frames):
"""
Return the spec1d file name for a reduced standard to use as a tracing
crutch.
The file is either constructed using the provided standard frame indices
or it is directly pulled from the
:class:`~pypeit.par.pypeitpar.FindObjPar` parameters in :attr:`par`.
The latter takes precedence. If more than one row is provided by
``standard_frames``, the first index is used.
Args:
standard_frames (array-like):
Set of rows in :attr:`fitstbl` with standards.
Returns:
:obj:`str`: Full path to the standard spec1d output file to use.
"""
# NOTE: I'm not sure if this is the best place to put this, but it does
# isolate where the name of the standard-star spec1d file is defined.
std_outfile = self.par['reduce']['findobj']['std_spec1d']
if std_outfile is not None:
if not Path(std_outfile).resolve().exists():
msgs.error(f'Provided standard spec1d file does not exist: {std_outfile}')
return std_outfile
# TODO: Need to decide how to associate standards with
# science frames in the case where there is more than one
# standard associated with a given science frame. Below, I
# just use the first standard
std_frame = None if len(standard_frames) == 0 else standard_frames[0]
# Prepare to load up standard?
if std_frame is not None:
std_outfile = self.spec_output_file(std_frame) \
if isinstance(std_frame, (int,np.integer)) else None
if std_outfile is not None and not os.path.isfile(std_outfile):
msgs.error('Could not find standard file: {0}'.format(std_outfile))
return std_outfile
[docs] def calib_all(self):
"""
Process all calibration frames.
Provides an avenue to reduce a dataset without (or omitting) any
science/standard frames.
"""
self.tstart = time.perf_counter()
# Frame indices
frame_indx = np.arange(len(self.fitstbl))
for calib_ID in self.fitstbl.calib_groups:
# Find all the frames in this calibration group
in_grp = self.fitstbl.find_calib_group(calib_ID)
if not any(in_grp):
continue
grp_frames = frame_indx[in_grp]
# Find the detectors to reduce
detectors = self.select_detectors(self.spectrograph, self.par['rdx']['detnum'],
slitspatnum=self.par['rdx']['slitspatnum'])
msgs.info(f'Detectors to work on: {detectors}')
# Loop on Detectors
for self.det in detectors:
msgs.info(f'Working on detector {self.det}')
# Instantiate Calibrations class
user_slits = slittrace.merge_user_slit(self.par['rdx']['slitspatnum'],
self.par['rdx']['maskIDs'])
self.caliBrate = calibrations.Calibrations.get_instance(
self.fitstbl, self.par['calibrations'], self.spectrograph,
self.calibrations_path, qadir=self.qa_path, reuse_calibs=self.reuse_calibs,
show=self.show, user_slits=user_slits,
chk_version=self.par['rdx']['chk_version'])
# Do it
# These need to be separate to accommodate COADD2D
self.caliBrate.set_config(grp_frames[0], self.det, self.par['calibrations'])
self.caliBrate.run_the_steps()
if not self.caliBrate.success:
msgs.warn(f'Calibrations for detector {self.det} were unsuccessful! The step '
f'that failed was {self.caliBrate.failed_step}. Continuing to next '
f'detector.')
# Finish
self.print_end_time()
[docs] def reduce_all(self):
"""
Main driver of the entire reduction
Calibration and extraction via a series of calls to
:func:`reduce_exposure`.
"""
# Validate the parameter set
self.par.validate_keys(required=['rdx', 'calibrations', 'scienceframe', 'reduce',
'flexure'])
self.tstart = time.perf_counter()
# Find the standard frames
is_standard = self.fitstbl.find_frames('standard')
if np.any(is_standard):
msgs.info(f'Found {np.sum(is_standard)} standard frames to reduce.')
# Find the science frames
is_science = self.fitstbl.find_frames('science')
if np.any(is_science):
msgs.info(f'Found {np.sum(is_science)} science frames to reduce.')
# This will give an error to alert the user that no reduction will be
# run if there are no science/standard frames and `run_pypeit` is run
# without -c flag
if not np.any(is_science) and not np.any(is_standard):
msgs.error('No science/standard frames provided. Add them to your PypeIt file '
'if this is a standard run! Otherwise run calib_only reduction using -c flag')
# Frame indices
frame_indx = np.arange(len(self.fitstbl))
# Standard Star(s) Loop
# Iterate over each calibration group and reduce the standards
for calib_ID in self.fitstbl.calib_groups:
# Find all the frames in this calibration group
in_grp = self.fitstbl.find_calib_group(calib_ID)
if not np.any(is_standard & in_grp):
continue
# Find the indices of the standard frames in this calibration group:
grp_standards = frame_indx[is_standard & in_grp]
msgs.info(f'Found {len(grp_standards)} standard frames in calibration group '
f'{calib_ID}.')
# Reduce all the standard frames, loop on unique comb_id
u_combid_std = np.unique(self.fitstbl['comb_id'][grp_standards])
for j, comb_id in enumerate(u_combid_std):
frames = np.where(self.fitstbl['comb_id'] == comb_id)[0]
# Find all frames whose comb_id matches the current frames
# bkg_id (same as for science frames).
bg_frames = np.where((self.fitstbl['comb_id'] == self.fitstbl['bkg_id'][frames][0])
& (self.fitstbl['comb_id'] >= 0))[0]
if not self.outfile_exists(frames[0]) or self.overwrite:
# Build history to document what contributed to the reduced
# exposure
history = History(self.fitstbl.frame_paths(frames[0]))
history.add_reduce(calib_ID, self.fitstbl, frames, bg_frames)
std_spec2d, std_sobjs = self.reduce_exposure(frames, bg_frames=bg_frames)
# TODO come up with sensible naming convention for save_exposure for combined files
self.save_exposure(frames[0], std_spec2d, std_sobjs, self.basename, history)
else:
msgs.info('Output file: {:s} already exists'.format(self.fitstbl.construct_basename(frames[0])) +
'. Set overwrite=True to recreate and overwrite.')
# Science Frame(s) Loop
# Iterate over each calibration group again and reduce the science frames
for calib_ID in self.fitstbl.calib_groups:
# Find all the frames in this calibration group
in_grp = self.fitstbl.find_calib_group(calib_ID)
if not np.any(is_science & in_grp):
continue
# Find the indices of the science frames in this calibration group:
grp_science = frame_indx[is_science & in_grp]
msgs.info(f'Found {len(grp_science)} science frames in calibration group {calib_ID}.')
# Associate standards (previously reduced above) for this setup
std_outfile = self.get_std_outfile(frame_indx[is_standard])
# Reduce all the science frames; keep the basenames of the science
# frames for use in flux calibration
science_basename = [None]*len(grp_science)
# Loop on unique comb_id
u_combid = np.unique(self.fitstbl['comb_id'][grp_science])
for j, comb_id in enumerate(u_combid):
# TODO: This was causing problems when multiple science frames
# were provided to quicklook and the user chose *not* to stack
# the frames. But this means it now won't skip processing the
# B-A pair when the background image(s) are defined. Punting
# for now...
# # Quicklook mode?
# if self.par['rdx']['quicklook'] and j > 0:
# msgs.warn('PypeIt executed in quicklook mode. Only reducing science frames '
# 'in the first combination group!')
# break
#
frames = np.where(self.fitstbl['comb_id'] == comb_id)[0]
# Find all frames whose comb_id matches the current frames bkg_id.
bg_frames = np.where((self.fitstbl['comb_id'] == self.fitstbl['bkg_id'][frames][0])
& (self.fitstbl['comb_id'] >= 0))[0]
# JFH changed the syntax below to that above, which allows
# frames to be used more than once as a background image. The
# syntax below would require that we could somehow list multiple
# numbers for the bkg_id which is impossible without a comma
# separated list
# bg_frames = np.where(self.fitstbl['bkg_id'] == comb_id)[0]
if not self.outfile_exists(frames[0]) or self.overwrite:
# Build history to document what contributd to the reduced
# exposure
history = History(self.fitstbl.frame_paths(frames[0]))
history.add_reduce(calib_ID, self.fitstbl, frames, bg_frames)
# TODO -- Should we reset/regenerate self.slits.mask for a new exposure
sci_spec2d, sci_sobjs = self.reduce_exposure(frames, bg_frames=bg_frames,
std_outfile=std_outfile)
science_basename[j] = self.basename
# TODO: come up with sensible naming convention for
# save_exposure for combined files
if len(sci_spec2d.detectors) > 0:
self.save_exposure(frames[0], sci_spec2d, sci_sobjs, self.basename, history)
else:
msgs.warn('No spec2d and spec1d saved to file because the '
'calibration/reduction was not successful for all the detectors')
else:
msgs.warn(f'Output file: {self.fitstbl.construct_basename(frames[0])} already '
'exists. Set overwrite=True to recreate and overwrite.')
msgs.info(f'Finished calibration group {calib_ID}')
# Finish
self.print_end_time()
[docs] @staticmethod
def select_detectors(spectrograph, detnum, slitspatnum=None):
"""
Get the set of detectors to be reduced.
This is mostly a wrapper for
:func:`~pypeit.spectrographs.spectrograph.Spectrograph.select_detectors`,
except that it applies any limitations set by the
:class:`~pypeit.par.pypeitpar.ReduxPar` parameters.
Args:
spectrograph (:class:`~pypeit.spectrographs.spectrograph.Spectrograph`):
Spectrograph instance that defines the allowed
detectors/mosaics.
detnum (:obj:`int`, :obj:`tuple`):
The detectors/mosaics to parse
slitspatnum (:obj:`str`, optional):
Used to restrict the reduction to a specified slit. See
:class:`~pypeit.par.pypeitpar.ReduxPar`.
Returns:
:obj:`list`: List of unique detectors or detector mosaics to be
reduced.
"""
return spectrograph.select_detectors(subset=detnum if slitspatnum is None else slitspatnum)
[docs] def reduce_exposure(self, frames, bg_frames=None, std_outfile=None):
"""
Reduce a single exposure
Args:
frames (:obj:`list`):
List of 0-indexed rows in :attr:`fitstbl` with the frames to
reduce.
bg_frames (:obj:`list`, optional):
List of frame indices for the background.
std_outfile (:obj:`str`, optional):
File with a previously reduced standard spectrum from
PypeIt.
Returns:
dict: The dictionary containing the primary outputs of
extraction.
"""
# if show is set, clear the ginga channels at the start of each new sci_ID
if self.show:
# TODO: Put this in a try/except block?
display.clear_all(allow_new=True)
has_bg = True if bg_frames is not None and len(bg_frames) > 0 else False
# Is this an b/g subtraction reduction?
if has_bg:
self.bkg_redux = True
# The default is to find_negative objects if the bg_frames are
# classified as "science", and to not find_negative objects if the
# bg_frames are classified as "sky". This can be explicitly
# overridden if par['reduce']['findobj']['find_negative'] is set to
# something other than the default of None.
self.find_negative = (('science' in self.fitstbl['frametype'][bg_frames[0]]) |
('standard' in self.fitstbl['frametype'][bg_frames[0]])) \
if self.par['reduce']['findobj']['find_negative'] is None else \
self.par['reduce']['findobj']['find_negative']
else:
self.bkg_redux = False
self.find_negative= False
# Container for all the Spec2DObj
all_spec2d = spec2dobj.AllSpec2DObj()
all_spec2d['meta']['bkg_redux'] = self.bkg_redux
all_spec2d['meta']['find_negative'] = self.find_negative
# TODO -- Should we reset/regenerate self.slits.mask for a new exposure
# container for specobjs during first loop (objfind)
all_specobjs_objfind = specobjs.SpecObjs()
# container for specobjs during second loop (extraction)
all_specobjs_extract = specobjs.SpecObjs()
# list of global_sky obtained during objfind and used in extraction
initial_sky_list = []
# list of sciImg
sciImg_list = []
# List of detectors with successful calibration
calibrated_det = []
# list of successful slits calibrations to be used in the extraction loop
calib_slits = []
# List of objFind objects
objFind_list = []
# Print status message
msgs_string = 'Reducing target {:s}'.format(self.fitstbl['target'][frames[0]]) + msgs.newline()
# TODO: Print these when the frames are actually combined,
# backgrounds are used, etc?
msgs_string += 'Combining frames:' + msgs.newline()
for iframe in frames:
msgs_string += '{0:s}'.format(self.fitstbl['filename'][iframe]) + msgs.newline()
msgs.info(msgs_string)
if has_bg:
bg_msgs_string = ''
for iframe in bg_frames:
bg_msgs_string += '{0:s}'.format(self.fitstbl['filename'][iframe]) + msgs.newline()
bg_msgs_string = msgs.newline() + 'Using background from frames:' + msgs.newline() + bg_msgs_string
msgs.info(bg_msgs_string)
# Find the detectors to reduce
detectors = self.select_detectors(self.spectrograph, self.par['rdx']['detnum'],
slitspatnum=self.par['rdx']['slitspatnum'])
msgs.info(f'Detectors to work on: {detectors}')
# Loop on Detectors -- Calibrate, process image, find objects
# TODO: Attempt to put in a multiprocessing call here?
for self.det in detectors:
msgs.info(f'Reducing detector {self.det}')
# run calibration
self.caliBrate = self.calib_one(frames, self.det)
if not self.caliBrate.success:
msgs.warn(f'Calibrations for detector {self.det} were unsuccessful! The step '
f'that failed was {self.caliBrate.failed_step}. Continuing by '
f'skipping this detector.')
continue
# we save only the detectors that had a successful calibration,
# and we use only those in the extract loop below
calibrated_det.append(self.det)
# we also save the successful slit calibrations because they are used and modified
# in the slitmask stuff in between the two loops
calib_slits.append(self.caliBrate.slits)
# global_sky, skymask and sciImg are needed in the extract loop
initial_sky, sobjs_obj, sciImg, objFind = self.objfind_one(
frames, self.det, bg_frames=bg_frames, std_outfile=std_outfile)
if len(sobjs_obj)>0:
all_specobjs_objfind.add_sobj(sobjs_obj)
initial_sky_list.append(initial_sky)
sciImg_list.append(sciImg)
objFind_list.append(objFind)
# slitmask stuff
if len(calibrated_det) > 0 and self.par['reduce']['slitmask']['assign_obj']:
# get object positions from slitmask design and slitmask offsets for all the detectors
spat_flexure = np.array([ss.spat_flexure for ss in sciImg_list])
# Grab platescale with binning
bin_spec, bin_spat = parse.parse_binning(self.binning)
platescale = np.array([ss.detector.platescale*bin_spat for ss in sciImg_list])
# get the dither offset if available and if desired
dither_off = None
if self.par['reduce']['slitmask']['use_dither_offset']:
if 'dithoff' in self.fitstbl.keys():
dither_off = self.fitstbl['dithoff'][frames[0]]
calib_slits = slittrace.get_maskdef_objpos_offset_alldets(
all_specobjs_objfind, calib_slits, spat_flexure, platescale,
self.par['calibrations']['slitedges']['det_buffer'],
self.par['reduce']['slitmask'], dither_off=dither_off)
# determine if slitmask offsets exist and compute an average offsets over all the detectors
calib_slits = slittrace.average_maskdef_offset(
calib_slits, platescale[0], self.spectrograph.list_detectors(mosaic='MSC' in calib_slits[0].detname))
# slitmask design matching and add undetected objects
all_specobjs_objfind = slittrace.assign_addobjs_alldets(
all_specobjs_objfind, calib_slits, spat_flexure, platescale,
self.par['reduce']['slitmask'], self.par['reduce']['findobj']['find_fwhm'])
# Extract
for i, self.det in enumerate(calibrated_det):
# re-run (i.e., load) calibrations
self.caliBrate = self.calib_one(frames, self.det)
self.caliBrate.slits = calib_slits[i]
detname = sciImg_list[i].detector.name
# TODO: pass back the background frame, pass in background
# files as an argument. extract one takes a file list as an
# argument and instantiates science within
if all_specobjs_objfind.nobj > 0:
all_specobjs_on_det = all_specobjs_objfind[all_specobjs_objfind.DET == detname]
else:
all_specobjs_on_det = all_specobjs_objfind
# Extract
all_spec2d[detname], tmp_sobjs \
= self.extract_one(frames, self.det, sciImg_list[i], objFind_list[i],
initial_sky_list[i], all_specobjs_on_det)
# Hold em
if tmp_sobjs.nobj > 0:
all_specobjs_extract.add_sobj(tmp_sobjs)
# Add calibration associations to the SpecObjs object
all_specobjs_extract.calibs = calibrations.Calibrations.get_association(
self.fitstbl, self.spectrograph, self.calibrations_path,
self.fitstbl[frames[0]]['setup'],
self.fitstbl.find_frame_calib_groups(frames[0])[0], self.det,
must_exist=True, proc_only=True)
# JFH TODO write out the background frame?
# TODO -- Save here? Seems like we should. Would probably need to use update_det=True
# Return
return all_spec2d, all_specobjs_extract
[docs] def calib_one(self, frames, det):
"""
Run Calibration for a single exposure/detector pair
Args:
frames (:obj:`list`):
List of frames to extract; stacked if more than one
is provided
det (:obj:`int`):
Detector number (1-indexed)
Returns:
caliBrate (:class:`pypeit.calibrations.Calibrations`)
"""
msgs.info(f'Building/loading calibrations for detector {det}')
# Instantiate Calibrations class
user_slits=slittrace.merge_user_slit(self.par['rdx']['slitspatnum'],
self.par['rdx']['maskIDs'])
caliBrate = calibrations.Calibrations.get_instance(
self.fitstbl, self.par['calibrations'], self.spectrograph,
self.calibrations_path, qadir=self.qa_path,
reuse_calibs=self.reuse_calibs, show=self.show, user_slits=user_slits,
chk_version=self.par['rdx']['chk_version'])
# These need to be separate to accomodate COADD2D
caliBrate.set_config(frames[0], det, self.par['calibrations'])
caliBrate.run_the_steps()
return caliBrate
[docs] def objfind_one(self, frames, det, bg_frames=None, std_outfile=None):
"""
Reduce + Find Objects in a single exposure/detector pair
sci_ID and det need to have been set internally prior to calling this method
Parameters
----------
frames : :obj:`list`
List of frames to extract; stacked if more than one is provided
det : :obj:`int`
Detector number (1-indexed)
bg_frames : :obj:`list`, optional
List of frames to use as the background. Can be empty.
std_outfile : :obj:`str`, optional
Filename for the standard star spec1d file. Passed directly to
:func:`~pypeit.specobjs.get_std_trace`.
Returns
-------
global_sky : `numpy.ndarray`_
Initial global sky model
sobjs_obj : :class:`~pypeit.specobjs.SpecObjs`
List of objects found
sciImg : :class:`~pypeit.images.pypeitimage.PypeItImage`
Science image
objFind : :class:`~pypeit.find_objects.FindObjects`
Object finding speobject
"""
# Grab some meta-data needed for the reduction from the fitstbl
self.objtype, self.setup, self.obstime, self.basename, self.binning \
= self.get_sci_metadata(frames[0], det)
msgs.info("Object finding begins for {} on det={}".format(self.basename, det))
# Is this a standard star?
self.std_redux = self.objtype == 'standard'
frame_par = self.par['calibrations']['standardframe'] if self.std_redux else self.par['scienceframe']
# Get the standard trace if need be
if self.std_redux is False and std_outfile is not None:
std_trace = specobjs.get_std_trace(self.spectrograph.get_det_name(det), std_outfile)
else:
std_trace = None
# Build Science image
sci_files = self.fitstbl.frame_paths(frames)
sciImg = buildimage.buildimage_fromlist(
self.spectrograph, det, frame_par,
sci_files, bias=self.caliBrate.msbias, bpm=self.caliBrate.msbpm,
dark=self.caliBrate.msdark,
scattlight=self.caliBrate.msscattlight,
flatimages=self.caliBrate.flatimages,
slits=self.caliBrate.slits, # For flexure correction
ignore_saturation=False)
# Background Image?
if bg_frames is not None and len(bg_frames) > 0:
bg_file_list = self.fitstbl.frame_paths(bg_frames)
bgimg = buildimage.buildimage_fromlist(self.spectrograph, det, frame_par, bg_file_list,
bpm=self.caliBrate.msbpm,
bias=self.caliBrate.msbias,
dark=self.caliBrate.msdark,
scattlight=self.caliBrate.msscattlight,
flatimages=self.caliBrate.flatimages,
slits=self.caliBrate.slits,
ignore_saturation=False)
# NOTE: If the spatial flexure exists for sciImg, the subtraction
# function propagates that to the subtracted image, ignoring any
# spatial flexure determined for the background image.
sciImg = sciImg.sub(bgimg)
# Flexure
spat_flexure = None
if (self.objtype == 'science' and self.par['scienceframe']['process']['spat_flexure_correct']) or \
(self.objtype == 'standard' and self.par['calibrations']['standardframe']['process']['spat_flexure_correct']):
spat_flexure = sciImg.spat_flexure
# Build the initial sky mask
initial_skymask = self.load_skyregions(initial_slits=self.spectrograph.pypeline != 'SlicerIFU',
scifile=sciImg.files[0], frame=frames[0], spat_flexure=spat_flexure)
# Deal with manual extraction
row = self.fitstbl[frames[0]]
manual_obj = ManualExtractionObj.by_fitstbl_input(
row['filename'], row['manual'], self.spectrograph) if len(row['manual'].strip()) > 0 else None
# Instantiate Reduce object
# Required for pypeline specific object
# At instantiaton, the fullmask in self.sciImg is modified
objFind = find_objects.FindObjects.get_instance(sciImg, self.caliBrate.slits,
self.spectrograph, self.par, self.objtype,
wv_calib=self.caliBrate.wv_calib,
waveTilts=self.caliBrate.wavetilts,
initial_skymask=initial_skymask,
bkg_redux=self.bkg_redux,
manual=manual_obj,
find_negative=self.find_negative,
std_redux=self.std_redux,
show=self.show,
basename=self.basename)
# Do it
initial_sky, sobjs_obj = objFind.run(std_trace=std_trace, show_peaks=self.show)
# Return
return initial_sky, sobjs_obj, sciImg, objFind
[docs] def load_skyregions(self, initial_slits=False, scifile=None, frame=None, spat_flexure=None):
"""
Generate or load sky regions, if defined by the user.
Sky regions are defined by the internal provided parameters; see
``user_regions`` in :class:`~pypeit.par.pypeitpar.SkySubPar`. If
included in the pypeit file like so,
.. code-block:: ini
[reduce]
[[skysub]]
user_regions = :25,75:
The first and last 25% of all slits are used as sky. If the user has
used the ``pypeit_skysub_regions`` GUI to generate a sky mask for a
given frame, this can be searched for and loaded by setting the
parameter to ``user``:
.. code-block:: ini
[reduce]
[[skysub]]
user_regions = user
Parameters
----------
initial_slits : :obj:`bool`, optional
Flag to use the initial slits before any tweaking based on the
slit-illumination profile; see
:func:`~pypeit.slittrace.SlitTraceSet.select_edges`.
scifile : :obj:`str`, optional
The file name used to define the user-based sky regions. Only used
if ``user_regions = user``.
frame : :obj:`int`, optional
The index of the frame used to construct the calibration key. Only
used if ``user_regions = user``.
spat_flexure : :obj:`float`, None, optional
The spatial flexure (measured in pixels) of the science frame relative to the trace frame.
Returns
-------
skymask : `numpy.ndarray`_
A boolean array used to select sky pixels; i.e., True is a pixel
that corresponds to a sky region. If the ``user_regions`` parameter
is not set (or an empty string), the returned value is None.
"""
if self.par['reduce']['skysub']['user_regions'] in [None, '']:
return None
# First priority given to user_regions first
if self.par['reduce']['skysub']['user_regions'] == 'user':
# Build the file name
calib_key = CalibFrame.construct_calib_key(
self.fitstbl['setup'][frame],
CalibFrame.ingest_calib_id(self.fitstbl['calib'][frame]),
self.spectrograph.get_det_name(self.det))
regfile = buildimage.SkyRegions.construct_file_name(calib_key,
calib_dir=self.calibrations_path,
basename=io.remove_suffix(scifile))
regfile = Path(regfile).resolve()
if not regfile.exists():
msgs.error(f'Unable to find SkyRegions file: {regfile} . Create a SkyRegions '
'frame using pypeit_skysub_regions, or change the user_regions to '
'the percentage format. See documentation.')
msgs.info(f'Loading SkyRegions file: {regfile}')
return buildimage.SkyRegions.from_file(regfile).image.astype(bool)
skyregtxt = self.par['reduce']['skysub']['user_regions']
if isinstance(skyregtxt, list):
skyregtxt = ",".join(skyregtxt)
msgs.info(f'Generating skysub mask based on the user defined regions: {skyregtxt}')
# NOTE : Do not include spatial flexure here!
# It is included when generating the mask in the return statement below
slits_left, slits_right, _ \
= self.caliBrate.slits.select_edges(initial=initial_slits, flexure=None)
maxslitlength = np.max(slits_right-slits_left)
# Get the regions
status, regions = skysub.read_userregions(skyregtxt, self.caliBrate.slits.nslits, maxslitlength)
if status == 1:
msgs.error("Unknown error in sky regions definition. Please check the value:" + msgs.newline() + skyregtxt)
elif status == 2:
msgs.error("Sky regions definition must contain a percentage range, and therefore must contain a ':'")
# Generate and return image
return skysub.generate_mask(self.spectrograph.pypeline, regions, self.caliBrate.slits,
slits_left, slits_right, spat_flexure=spat_flexure)
# TODO :: Should this be moved outside of this class?
[docs] def refframe_correct(self, slits, ra, dec, obstime, slitgpm=None, waveimg=None, sobjs=None):
""" Correct the calibrated wavelength to the user-supplied reference frame
Args:
slits (:class:`~pypeit.slittrace.SlitTraceSet`):
Slit trace set object
ra (float, str):
Right Ascension
dec (float, str):
Declination
obstime (`astropy.time.Time`_):
Observation time
slitgpm (`numpy.ndarray`_, None, optional):
1D boolean array indicating the good slits (True). If None, the gpm will be taken from slits
waveimg (`numpy.ndarray`_, optional)
Two-dimensional image specifying the wavelength of each pixel
sobjs (:class:`~pypeit.specobjs.SpecObjs`, None, optional):
Spectrally extracted objects
"""
if slitgpm is None:
slitgpm = (slits.mask == 0)
# Correct Telescope's motion
refframe = self.par['calibrations']['wavelengths']['refframe']
vel_corr = 0.0
if refframe in ['heliocentric', 'barycentric'] \
and self.par['calibrations']['wavelengths']['reference'] != 'pixel':
msgs.info("Performing a {0} correction".format(self.par['calibrations']['wavelengths']['refframe']))
# Calculate correction
radec = ltu.radec_to_coord((ra, dec))
vel, vel_corr = wave.geomotion_correct(radec, obstime,
self.spectrograph.telescope['longitude'],
self.spectrograph.telescope['latitude'],
self.spectrograph.telescope['elevation'],
refframe)
# Apply correction to objects
msgs.info('Applying {0} correction = {1:0.5f} km/s'.format(refframe, vel))
if (sobjs is not None) and (sobjs.nobj != 0):
# Loop on slits to apply
gd_slitord = slits.slitord_id[slitgpm]
for slitord in gd_slitord:
indx = sobjs.slitorder_indices(slitord)
this_specobjs = sobjs[indx]
# Loop on objects
for specobj in this_specobjs:
if specobj is None:
continue
specobj.apply_helio(vel_corr, refframe)
# Apply correction to wavelength image
if waveimg is not None:
waveimg *= vel_corr
else:
msgs.info('A wavelength reference frame correction will not be performed.')
# Return the value of the correction and the corrected wavelength image
return vel_corr, waveimg
[docs] def save_exposure(self, frame, all_spec2d, all_specobjs, basename, history=None):
"""
Save the outputs from extraction for a given exposure
Args:
frame (:obj:`int`):
0-indexed row in the metadata table with the frame
that has been reduced.
all_spec2d(:class:`pypeit.spec2dobj.AllSpec2DObj`):
sci_dict (:obj:`dict`):
Dictionary containing the primary outputs of
extraction
basename (:obj:`str`):
The root name for the output file.
history (:obj:`pypeit.history.History`):
History entries to be added to fits header
Returns:
None or SpecObjs: All of the objects saved to disk
"""
# TODO: Need some checks here that the exposure has been reduced?
# Determine the headers
row_fitstbl = self.fitstbl[frame]
# Need raw file header information
rawfile = self.fitstbl.frame_paths(frame)
head2d = fits.getheader(rawfile, ext=self.spectrograph.primary_hdrext)
# Check for the directory
if not os.path.isdir(self.science_path):
os.makedirs(self.science_path)
# NOTE: There are some gymnastics here to keep from altering
# self.par['rdx']['detnum']. I.e., I can't just set update_det =
# self.par['rdx']['detnum'] because that can alter the latter if I don't
# deepcopy it...
if self.par['rdx']['detnum'] is None:
update_det = None
elif isinstance(self.par['rdx']['detnum'], list):
update_det = [self.spectrograph.allowed_mosaics.index(d)+1
if isinstance(d, tuple) else d for d in self.par['rdx']['detnum']]
else:
update_det = self.par['rdx']['detnum']
subheader = self.spectrograph.subheader_for_spec(row_fitstbl, head2d)
# 1D spectra
if all_specobjs.nobj > 0 and not self.par['reduce']['extraction']['skip_extraction']:
# Spectra
outfile1d = os.path.join(self.science_path, 'spec1d_{:s}.fits'.format(basename))
# TODO
#embed(header='deal with the following for maskIDs; 713 of pypeit')
all_specobjs.write_to_fits(subheader, outfile1d,
update_det=update_det,
slitspatnum=self.par['rdx']['slitspatnum'],
history=history)
# Info
outfiletxt = os.path.join(self.science_path, 'spec1d_{:s}.txt'.format(basename))
# TODO: Note we re-read in the specobjs from disk to deal with situations where
# only a single detector is run in a second pass but in the same reduction directory.
# Thiw was to address Issue #1116 in PR #1154. Slightly inefficient, but only other
# option is to re-work write_info to also "append"
sobjs = specobjs.SpecObjs.from_fitsfile(outfile1d, chk_version=False)
sobjs.write_info(outfiletxt, self.spectrograph.pypeline)
#all_specobjs.write_info(outfiletxt, self.spectrograph.pypeline)
# 2D spectra
outfile2d = os.path.join(self.science_path, 'spec2d_{:s}.fits'.format(basename))
# Build header
pri_hdr = all_spec2d.build_primary_hdr(head2d, self.spectrograph,
redux_path=self.par['rdx']['redux_path'],
calib_dir=self.caliBrate.calib_dir,
subheader=subheader,
history=history)
# Write
all_spec2d.write_to_fits(outfile2d, pri_hdr=pri_hdr,
update_det=update_det,
slitspatnum=self.par['rdx']['slitspatnum'])
[docs] def msgs_reset(self):
"""
Reset the msgs object
"""
# Reset the global logger
msgs.reset(log=self.logname, verbosity=self.verbosity)
msgs.pypeit_file = self.pypeit_file
[docs] def print_end_time(self):
"""
Print the elapsed time
"""
# Capture the end time and print it to user
msgs.info(utils.get_time_string(time.perf_counter()-self.tstart))
# TODO: Move this to fitstbl?
[docs] def show_science(self):
"""
Simple print of science frames
"""
indx = self.fitstbl.find_frames('science')
print(self.fitstbl[['target','ra','dec','exptime','dispname']][indx])
def __repr__(self):
# Generate sets string
return '<{:s}: pypeit_file={}>'.format(self.__class__.__name__, self.pypeit_file)