Source code for pypeit.scripts.show_2dspec

"""
This script enables the viewing of a processed FITS file
with extras.  Run above the Science/ folder.

.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import os

import numpy as np

from IPython import embed

from astropy.io import fits
from astropy.stats import sigma_clipped_stats

from pypeit import log
from pypeit import PypeItError
from pypeit import slittrace
from pypeit import specobjs
from pypeit import io
from pypeit import utils
from pypeit import __version__
from pypeit import PypeItDataModelError, PypeItBitMaskError

from pypeit.display import display
from pypeit.images.imagebitmask import ImageBitMask
from pypeit.images.detector_container import DetectorContainer
from pypeit import spec2dobj
from pypeit.scripts import scriptbase


[docs] def show_trace(sobjs, det, viewer, ch): """ Overplot the extracted object traces for this detector in the provided ginga channel. Args: sobjs (:class:`~pypeit.specobjs.SpecObjs`): Object holding the 1D spectral extractions. If None, the function doesn't do anything. det (:obj:`str`): The string identifier for the detector or mosaic used to select the extractions to show. viewer (:class:`~ginga.misc.Bunch.Bunch`): Ginga viewer object. ch (:obj:`str`): The name of the channel in the Ginga viewer to plot the traces. """ if sobjs is None: return in_det = np.where(sobjs.DET == det)[0] if len(in_det) == 0: return trace_list = [] trc_name_list = [] maskdef_extr_list = [] manual_extr_list = [] for kk in in_det: trace = sobjs[kk]['TRACE_SPAT'] obj_id = sobjs[kk].NAME maskdef_objname = sobjs[kk].MASKDEF_OBJNAME maskdef_extr_flag = sobjs[kk].MASKDEF_EXTRACT manual_extr_flag = sobjs[kk].hand_extract_flag trc_name = '{} OBJNAME:{}'.format(obj_id, maskdef_objname) if maskdef_objname is not None else obj_id trace_list.append(trace) trc_name_list.append(trc_name) maskdef_extr_list.append(maskdef_extr_flag is True) manual_extr_list.append(manual_extr_flag is True) if len(trace_list) > 0: display.show_trace(viewer, ch, np.swapaxes(trace_list, 1,0), np.array(trc_name_list), maskdef_extr=np.array(maskdef_extr_list), manual_extr=np.array(manual_extr_list)) else: log.warning('spec1d file found, but no objects were extracted for this detector.')
[docs] class Show2DSpec(scriptbase.ScriptBase):
[docs] @classmethod def get_parser(cls, width=None): parser = super().get_parser(description='Display sky subtracted, spec2d image in a ' 'ginga viewer.', width=width, default_log_file=True) parser.add_argument('file', type=str, default=None, help='Path to a PypeIt spec2d file') parser.add_argument('--list', default=False, action='store_true', help='List the extensions only?') parser.add_argument('--det', default=None, type=str, help='Detector name or number. If a number, the name is constructed ' 'assuming the reduction is for a single detector. If a string, ' 'it must match the name of the detector object (e.g., DET01 for ' 'a detector, MSC01 for a mosaic). If not set, the first available detector' 'in the spec2d file will be shown') parser.add_argument('--spat_id', type=int, default=None, help='Restrict plotting to this slit (PypeIt ID notation)') parser.add_argument('--maskID', type=int, default=None, help='Restrict plotting to this maskID') parser.add_argument('--showmask', nargs='*', default=None, help='Include a channel showing the mask. If no arguments are ' 'provided, the mask bit values are provided directly. You can ' 'also specify one or more mask flags used to construct an ' 'image identifying which pixels are flagged by any of these ' 'issues. E.g., to show pixels flagged by the instrument ' 'specific bad-pixel mask or cosmic arrays, use --showmask BPM CR ' '. See https://pypeit.readthedocs.io/en/release/out_masks.html ' 'for the list of flags.') parser.add_argument('--removetrace', default=False, action='store_true', help='Do not overplot traces in the skysub, sky_resid, and resid ' 'channels') parser.add_argument('--embed', default=False, action='store_true', help='Upon completion embed in ipython shell') parser.add_argument('--ignore_extract_mask', default=False, action='store_true', help='Ignore the extraction mask') # parser.add_argument('--sensfunc', type=str, default=None, # help='Pass in a sensfunc to display the sky-subtracted image with a ' # 'flux calibration') parser.add_argument('--channels', type=str, help='Only show a subset of the channels (0-indexed), e.g. 1,3') parser.add_argument('--prefix', type=str, default='', help='Channel name prefix [lets you display more than one set]') parser.add_argument('--no_clear', dest='clear', default=True, action='store_false', help='Do *not* clear all existing tabs') parser.add_argument('--try_old', default=False, action='store_true', help='Attempt to load old datamodel versions. A crash may ensue..') return parser
[docs] @classmethod def main(cls, args): # List only? if args.list: io.fits_open(args.file).info() return # Initialize the log cls.init_log(args) # Set whether or not to check datamodel versions chk_version = not args.try_old # Parse the detector name if args.det is None: allspec2d_det = fits.getval(args.file,'HIERARCH ALLSPEC2D_DETS') detname = allspec2d_det.split(',')[0] else: try: det = int(args.det) except: detname = args.det else: detname = DetectorContainer.get_name(det) # Find the set of channels to show show_channels = [0,1,2,3] if args.channels is None \ else [int(item) for item in args.channels.split(',')] # Need to update clear throughout in case only some channels are being displayed _clear = args.clear # Try to read the Spec2DObj using the current datamodel, but allowing # for the datamodel version to be different try: spec2DObj = spec2dobj.Spec2DObj.from_file(args.file, detname, chk_version=chk_version) except (PypeItDataModelError, PypeItBitMaskError): try: # Try to get the pypeit version used to write this file file_pypeit_version = fits.getval(args.file, 'VERSPYP', 0) except KeyError: file_pypeit_version = '*unknown*' if chk_version: addendum = 'To allow the script to attempt to read the data anyway, use the ' \ '--try_old command-line option. This will first try to simply ' \ 'ignore the version number. If the datamodels are incompatible ' \ '(e.g., the new datamodel contains components not in a previous ' \ 'version), this may not be enough and the script will continue by ' \ 'trying to parse only the components necessary for use by this ' \ 'script. In either case, BEWARE that the displayed data may be in ' \ 'error!' else: addendum = 'The datamodels are sufficiently different that the script will now ' \ 'try to parse only the components necessary for use by this ' \ 'script. BEWARE that the displayed data may be in error!' message = ( f'Your installed version of PypeIt ({__version__}) cannot be used to parse ' f'{args.file}, which was reduced using version {file_pypeit_version}. You ' 'are strongly encouraged to re-reduce your data using this (or, better yet, ' 'the most recent) version of PypeIt. ' + addendum ) if check_version: raise PypeItError(message) else: log.warning(message) spec2DObj = None if spec2DObj is None: # Try to get the relevant elements directly from the fits file with io.fits_open(args.file) as hdu: names = [h.name for h in hdu] has_det = any([detname in n for n in names]) if not has_det: raise PypeItError(f'Provided file has no extensions including {detname}.') for ext in ['SCIIMG', 'SKYMODEL', 'OBJMODEL', 'IVARMODEL']: _ext = f'{detname}-{ext}' if _ext not in names: raise PypeItError(f'{args.file} missing extension {_ext}.') sciimg = hdu[f'{detname}-SCIIMG'].data skymodel = hdu[f'{detname}-SKYMODEL'].data objmodel = hdu[f'{detname}-OBJMODEL'].data ivarmodel = hdu[f'{detname}-IVARMODEL'].data _ext = f'{detname}-BPMMASK' bpmmask = hdu[_ext].data if _ext in names else None _ext = f'{detname}-WAVEIMG' waveimg = hdu[_ext].data if _ext in names else None img_gpm = np.ones(sciimg.shape, dtype=bool) if bpmmask is None else bpmmask == 0 model_gpm = img_gpm.copy() _ext = f'{detname}-SLITS' if _ext not in names: log.warning(f'{args.file} missing extension {_ext}; cannot show slit edges.') else: slit_columns = hdu[_ext].columns.names slit_spat_id = hdu[_ext].data['spat_id'] if 'spat_id' in slit_columns else None slit_left = hdu[_ext].data['left_tweak'].T if 'left_tweak' in slit_columns \ else (hdu[_ext].data['left_init'].T if 'left_init' in slit_columns else None) slit_right = hdu[_ext].data['right_tweak'].T if 'right_tweak' in slit_columns \ else (hdu[_ext].data['right_init'].T if 'right_init' in slit_columns else None) slit_mask = hdu[_ext].data['mask'] if 'mask' in slit_columns else None slit_mask_id = hdu[_ext].data['maskdef_id'] \ if 'maskdef_id' in slit_columns else None if 'SCI_SPAT_FLEXURE' in hdu[f'{detname}-SCIIMG'].header \ and slit_left is not None and slit_right is not None: sci_spat_flexure \ = float(hdu[f'{detname}-SCIIMG'].header['SCI_SPAT_FLEXURE']) slit_left += sci_spat_flexure slit_right += sci_spat_flexure log.info(f'Offseting slits by {sci_spat_flexure} pixels.') pypeline = hdu[f'{detname}-SCIIMG'].header['PYPELINE'] \ if 'PYPELINE' in hdu[f'{detname}-SCIIMG'].header else None if pypeline in ['MultiSlit', 'SlicerIFU']: slit_slid_IDs = slit_spat_id elif pypeline == 'Echelle': slit_slid_IDs = hdu[_ext].data['ech_order'] \ if 'ech_order' in slit_columns else None else: slit_slid_IDs = None # Ignore any spec1d file sobjs = None else: # Use the parsed SpecObjs object sciimg = spec2DObj.sciimg skymodel = spec2DObj.skymodel objmodel = spec2DObj.objmodel ivarmodel = spec2DObj.ivarmodel bpmmask = spec2DObj.bpmmask waveimg = spec2DObj.waveimg img_gpm = spec2DObj.select_flag(invert=True) if not np.any(img_gpm): log.warning('The full science image is masked!') model_gpm = img_gpm.copy() if args.ignore_extract_mask: model_gpm |= spec2DObj.select_flag(flag='EXTRACT') if spec2DObj.sci_spat_flexure is not None: log.info(f'Offseting slits by {spec2DObj.sci_spat_flexure}') slit_left, slit_right, slit_mask \ = spec2DObj.slits.select_edges(flexure=spec2DObj.sci_spat_flexure) slit_spat_id = spec2DObj.slits.spat_id slit_mask_id = spec2DObj.slits.maskdef_id slit_slid_IDs = spec2DObj.slits.slitord_id # Object traces from spec1d file spec1d_file = args.file.replace('spec2d', 'spec1d') if args.file[-2:] == 'gz': spec1d_file = spec1d_file[:-3] if os.path.isfile(spec1d_file): sobjs = specobjs.SpecObjs.from_fitsfile(spec1d_file, chk_version=False) else: sobjs = None log.warning(f'Could not find spec1d file: {spec1d_file}\n' 'No objects were extracted.') # TODO: This may be too restrictive, i.e. ignore BADFLTCALIB?? slit_gpm = slit_mask == 0 # Restrict on spat_id or maskid? if slit_gpm is not None: if slit_spat_id is not None and args.spat_id is not None: slit_gpm &= slit_spat_id == args.spat_id # TODO: Should this be 'elif'? Why not just 'if'? I.e., are # spat_id and maskdef_id mutually exclusive? elif slit_mask_id is not None and args.maskID is not None: slit_gpm &= slit_mask_id == args.maskID left = None if slit_left is None else slit_left[:, slit_gpm] right = None if slit_right is None else slit_right[:, slit_gpm] slid_IDs = None if slit_slid_IDs is None else slit_slid_IDs[slit_gpm] maskdef_id = None if slit_mask_id is None else slit_mask_id[slit_gpm] # Connect to an open ginga window, or open a new one display.connect_to_ginga(raise_err=True, allow_new=True) # Show the bitmask? if args.showmask is not None and bpmmask is not None: _mask = bpmmask.mask if len(args.showmask) == 0 \ else bpmmask.flagged(flag=args.showmask) viewer, ch_mask = display.show_image(_mask, chname='MASK', waveimg=waveimg, clear=_clear) _clear = False channel_names = [] # SCIIMG if 0 in show_channels: mean, med, sigma = sigma_clipped_stats(sciimg[img_gpm], sigma_lower=5.0, sigma_upper=5.0) cut_min = mean - 1.0 * sigma cut_max = mean + 4.0 * sigma chname_sci = args.prefix+f'sciimg-{detname}' # Clear all channels at the beginning viewer, ch_sci = display.show_image(sciimg, chname=chname_sci, waveimg=waveimg, clear=_clear, cuts=(cut_min, cut_max)) _clear=False if sobjs is not None: show_trace(sobjs, detname, viewer, ch_sci) display.show_slits(viewer, ch_sci, left, right, slit_ids=slid_IDs, maskdef_ids=maskdef_id) channel_names.append(chname_sci) # SKYSUB if 1 in show_channels and skymodel is not None: image = (sciimg - skymodel) * model_gpm.astype(float) mean, med, sigma = sigma_clipped_stats(image[model_gpm], sigma_lower=5.0, sigma_upper=5.0) cut_min = mean - 1.0 * sigma cut_max = mean + 4.0 * sigma chname_skysub = args.prefix+f'skysub-{detname}' viewer, ch_skysub = display.show_image(image, chname=chname_skysub, waveimg=waveimg, clear=_clear, cuts=(cut_min, cut_max), wcs_match=True) _clear = False if not args.removetrace and sobjs is not None: show_trace(sobjs, detname, viewer, ch_skysub) display.show_slits(viewer, ch_skysub, left, right, slit_ids=slid_IDs, maskdef_ids=maskdef_id) channel_names.append(chname_skysub) # TODO Place holder for putting in sensfunc #if args.sensfunc: # # Load the sensitivity function # wave_sens, sfunc, _, _, _ = sensfunc.SensFunc.load(sensfunc_name) # # Interpolate the sensitivity function onto the wavelength grid of the data. Since the image is rectified # # this is trivial and we don't need to do a 2d interpolation # sens_factor = flux_calib.get_sensfunc_factor( # pseudo_dict['wave_mid'][:,islit], wave_sens, sfunc, fits.getheader(files[0])['TRUITIME'], # extrap_sens=parset['fluxcalib']['extrap_sens']) # # Compute the median sensitivity and set the sensitivity to zero at locations 100 times the median. This # # prevents the 2d image from blowing up where the sens_factor explodes because there is no throughput # sens_gpm = sens_factor < 100.0*np.median(sens_factor) # sens_factor_masked = sens_factor*sens_gpm # sens_factor_img = np.repeat(sens_factor_masked[:, np.newaxis], pseudo_dict['nspat'], axis=1) # imgminsky = sens_factor_img*pseudo_dict['imgminsky'] # imgminsky_gpm = sens_gpm[:, np.newaxis] & pseudo_dict['inmask'] #else: # imgminsky= pseudo_dict['imgminsky'] # SKRESIDS if 2 in show_channels and skymodel is not None: # the block below is repeated because if showing this channel but # not channel 1 it will crash chname_skyresids = args.prefix+f'sky_resid-{detname}' image = (sciimg - skymodel) * np.sqrt(ivarmodel) * model_gpm.astype(float) viewer, ch_sky_resids = display.show_image(image, chname_skyresids, waveimg=waveimg, clear=_clear, cuts=(-5.0, 5.0)) _clear = False if not args.removetrace and sobjs is not None: show_trace(sobjs, detname, viewer, ch_sky_resids) display.show_slits(viewer, ch_sky_resids, left, right, slit_ids=slid_IDs, maskdef_ids=maskdef_id) channel_names.append(chname_skyresids) # RESIDS if 3 in show_channels and objmodel is not None: chname_resids = args.prefix+f'resid-{detname}' # full model residual map image = (sciimg - skymodel - objmodel) * np.sqrt(ivarmodel) * img_gpm.astype(float) viewer, ch_resids = display.show_image(image, chname=chname_resids, waveimg=waveimg, clear=_clear, cuts=(-5.0, 5.0), wcs_match=True) _clear = False if not args.removetrace and sobjs is not None: show_trace(sobjs, detname, viewer, ch_resids) display.show_slits(viewer, ch_resids, left, right, slit_ids=slid_IDs, maskdef_ids=maskdef_id) channel_names.append(chname_resids) # After displaying all the images sync up the images with WCS_MATCH shell = viewer.shell() shell.start_global_plugin('WCSMatch') shell.call_global_plugin_method('WCSMatch', 'set_reference_channel', [channel_names[-1]], {}) if args.embed: embed(header=utils.embed_header())
# Playing with some mask stuff #out = shell.start_operation('TVMask') #maskfile = '/Users/joe/python/PypeIt-development-suite/REDUX_OUT/Shane_Kast_blue/600_4310_d55/shane_kast_blue_setup_A/crmask.fits' #out = shell.call_local_plugin_method(chname_resids, 'TVMask', 'load_file', [maskfile], {})