Source code for pypeit.scripts.show_2dspec

"""
This script enables the viewing of a processed FITS file
with extras.  Run above the Science/ folder.

.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import os

import numpy as np

from IPython import embed

from astropy.io import fits
from astropy.stats import sigma_clipped_stats

from pypeit import msgs
from pypeit import slittrace
from pypeit import specobjs
from pypeit import io
from pypeit import utils
from pypeit import __version__
from pypeit.pypmsgs import PypeItDataModelError, PypeItBitMaskError

from pypeit.display import display
from pypeit.images.imagebitmask import ImageBitMask
from pypeit.images.detector_container import DetectorContainer
from pypeit import spec2dobj
from pypeit.scripts import scriptbase


[docs]def show_trace(sobjs, det, viewer, ch): """ Overplot the extracted object traces for this detector in the provided ginga channel. Args: sobjs (:class:`~pypeit.specobjs.SpecObjs`): Object holding the 1D spectral extractions. If None, the function doesn't do anything. det (:obj:`str`): The string identifier for the detector or mosaic used to select the extractions to show. viewer (?): ch (?): """ if sobjs is None: return in_det = np.where(sobjs.DET == det)[0] trace_list = [] trc_name_list = [] maskdef_extr_list = [] manual_extr_list = [] for kk in in_det: trace = sobjs[kk]['TRACE_SPAT'] obj_id = sobjs[kk].NAME maskdef_objname = sobjs[kk].MASKDEF_OBJNAME maskdef_extr_flag = sobjs[kk].MASKDEF_EXTRACT manual_extr_flag = sobjs[kk].hand_extract_flag trc_name = '{} OBJNAME:{}'.format(obj_id, maskdef_objname) if maskdef_objname is not None else obj_id trace_list.append(trace) trc_name_list.append(trc_name) maskdef_extr_list.append(maskdef_extr_flag is True) manual_extr_list.append(manual_extr_flag is True) display.show_trace(viewer, ch, np.swapaxes(trace_list, 1,0), np.array(trc_name_list), maskdef_extr=np.array(maskdef_extr_list), manual_extr=np.array(manual_extr_list))
[docs]class Show2DSpec(scriptbase.ScriptBase):
[docs] @classmethod def get_parser(cls, width=None): parser = super().get_parser(description='Display sky subtracted, spec2d image in a ' 'ginga viewer.', width=width) parser.add_argument('file', type=str, default=None, help='Path to a PypeIt spec2d file') parser.add_argument('--list', default=False, action='store_true', help='List the extensions only?') parser.add_argument('--det', default='1', type=str, help='Detector name or number. If a number, the name is constructed ' 'assuming the reduction is for a single detector. If a string, ' 'it must match the name of the detector object (e.g., DET01 for ' 'a detector, MSC01 for a mosaic).') parser.add_argument('--spat_id', type=int, default=None, help='Restrict plotting to this slit (PypeIt ID notation)') parser.add_argument('--maskID', type=int, default=None, help='Restrict plotting to this maskID') parser.add_argument('--showmask', nargs='*', default=None, help='Include a channel showing the mask. If no arguments are ' 'provided, the mask bit values are provided directly. You can ' 'also specify one or more mask flags used to construct an ' 'image identifying which pixels are flagged by any of these ' 'issues. E.g., to show pixels flagged by the instrument ' 'specific bad-pixel mask or cosmic arrays, use --showmask BPM CR ' '. See https://pypeit.readthedocs.io/en/release/out_masks.html ' 'for the list of flags.') parser.add_argument('--removetrace', default=False, action='store_true', help='Do not overplot traces in the skysub, sky_resid, and resid ' 'channels') parser.add_argument('--embed', default=False, action='store_true', help='Upon completion embed in ipython shell') parser.add_argument('--ignore_extract_mask', default=False, action='store_true', help='Ignore the extraction mask') # parser.add_argument('--sensfunc', type=str, default=None, # help='Pass in a sensfunc to display the sky-subtracted image with a ' # 'flux calibration') parser.add_argument('--channels', type=str, help='Only show a subset of the channels (0-indexed), e.g. 1,3') parser.add_argument('--prefix', type=str, default='', help='Channel name prefix [lets you display more than one set]') parser.add_argument('--no_clear', dest='clear', default=True, action='store_false', help='Do *not* clear all existing tabs') parser.add_argument('-v', '--verbosity', type=int, default=1, help='Verbosity level between 0 [none] and 2 [all]') parser.add_argument('--try_old', default=False, action='store_true', help='Attempt to load old datamodel versions. A crash may ensue..') return parser
[docs] @staticmethod def main(args): chk_version = not args.try_old # List only? if args.list: io.fits_open(args.file).info() return # Set the verbosity, and create a logfile if verbosity == 2 msgs.set_logfile_and_verbosity('show_2dspec', args.verbosity) # Parse the detector name try: det = int(args.det) except: detname = args.det else: detname = DetectorContainer.get_name(det) # Find the set of channels to show show_channels = [0,1,2,3] if args.channels is None \ else [int(item) for item in args.channels.split(',')] # Need to update clear throughout in case only some channels are being displayed _clear = args.clear # Try to read the Spec2DObj using the current datamodel, but allowing # for the datamodel version to be different try: spec2DObj = spec2dobj.Spec2DObj.from_file(args.file, detname, chk_version=chk_version) except (PypeItDataModelError, PypeItBitMaskError): try: # Try to get the pypeit version used to write this file file_pypeit_version = fits.getval(args.file, 'VERSPYP', 0) except KeyError: file_pypeit_version = '*unknown*' if chk_version: msgs_func = msgs.error addendum = 'To allow the script to attempt to read the data anyway, use the ' \ '--try_old command-line option. This will first try to simply ' \ 'ignore the version number. If the datamodels are incompatible ' \ '(e.g., the new datamodel contains components not in a previous ' \ 'version), this may not be enough and the script will continue by ' \ 'trying to parse only the components necessary for use by this ' \ 'script. In either case, BEWARE that the displayed data may be in ' \ 'error!' else: msgs_func = msgs.warn addendum = 'The datamodels are sufficiently different that the script will now ' \ 'try to parse only the components necessary for use by this ' \ 'script. BEWARE that the displayed data may be in error!' msgs_func(f'Your installed version of PypeIt ({__version__}) cannot be used to parse ' f'{args.file}, which was reduced using version {file_pypeit_version}. You ' 'are strongly encouraged to re-reduce your data using this (or, better yet, ' 'the most recent) version of PypeIt. ' + addendum) spec2DObj = None if spec2DObj is None: # Try to get the relevant elements directly from the fits file with io.fits_open(args.file) as hdu: names = [h.name for h in hdu] has_det = any([detname in n for n in names]) if not has_det: msgs.error(f'Provided file has no extensions including {detname}.') for ext in ['SCIIMG', 'SKYMODEL', 'OBJMODEL', 'IVARMODEL']: _ext = f'{detname}-{ext}' if _ext not in names: msgs.error(f'{args.file} missing extension {_ext}.') sciimg = hdu[f'{detname}-SCIIMG'].data skymodel = hdu[f'{detname}-SKYMODEL'].data objmodel = hdu[f'{detname}-OBJMODEL'].data ivarmodel = hdu[f'{detname}-IVARMODEL'].data _ext = f'{detname}-BPMMASK' bpmmask = hdu[_ext].data if _ext in names else None _ext = f'{detname}-WAVEIMG' waveimg = hdu[_ext].data if _ext in names else None img_gpm = np.ones(sciimg.shape, dtype=bool) if bpmmask is None else bpmmask == 0 model_gpm = img_gpm.copy() _ext = f'{detname}-SLITS' if _ext not in names: msgs.warn(f'{args.file} missing extension {_ext}; cannot show slit edges.') else: slit_columns = hdu[_ext].columns.names slit_spat_id = hdu[_ext].data['spat_id'] if 'spat_id' in slit_columns else None slit_left = hdu[_ext].data['left_tweak'].T if 'left_tweak' in slit_columns \ else (hdu[_ext].data['left_init'].T if 'left_init' in slit_columns else None) slit_right = hdu[_ext].data['right_tweak'].T if 'right_tweak' in slit_columns \ else (hdu[_ext].data['right_init'].T if 'right_init' in slit_columns else None) slit_mask = hdu[_ext].data['mask'] if 'mask' in slit_columns else None slit_mask_id = hdu[_ext].data['maskdef_id'] \ if 'maskdef_id' in slit_columns else None if 'SCI_SPAT_FLEXURE' in hdu[f'{detname}-SCIIMG'].header \ and slit_left is not None and slit_right is not None: sci_spat_flexure \ = float(hdu[f'{detname}-SCIIMG'].header['SCI_SPAT_FLEXURE']) slit_left += sci_spat_flexure slit_right += sci_spat_flexure msgs.info(f'Offseting slits by {sci_spat_flexure} pixels.') pypeline = hdu[f'{detname}-SCIIMG'].header['PYPELINE'] \ if 'PYPELINE' in hdu[f'{detname}-SCIIMG'].header else None if pypeline in ['MultiSlit', 'SlicerIFU']: slit_slid_IDs = slit_spat_id elif pypeline == 'Echelle': slit_slid_IDs = hdu[_ext].data['ech_order'] \ if 'ech_order' in slit_columns else None else: slit_slid_IDs = None # Ignore any spec1d file sobjs = None else: # Use the parsed SpecObjs object sciimg = spec2DObj.sciimg skymodel = spec2DObj.skymodel objmodel = spec2DObj.objmodel ivarmodel = spec2DObj.ivarmodel bpmmask = spec2DObj.bpmmask waveimg = spec2DObj.waveimg img_gpm = spec2DObj.select_flag(invert=True) model_gpm = img_gpm.copy() if args.ignore_extract_mask: model_gpm |= spec2DObj.select_flag(flag='EXTRACT') if spec2DObj.sci_spat_flexure is not None: msgs.info(f'Offseting slits by {spec2DObj.sci_spat_flexure}') slit_left, slit_right, slit_mask \ = spec2DObj.slits.select_edges(flexure=spec2DObj.sci_spat_flexure) slit_spat_id = spec2DObj.slits.spat_id slit_mask_id = spec2DObj.slits.maskdef_id slit_slid_IDs = spec2DObj.slits.slitord_id # Object traces from spec1d file spec1d_file = args.file.replace('spec2d', 'spec1d') if args.file[-2:] == 'gz': spec1d_file = spec1d_file[:-3] if os.path.isfile(spec1d_file): sobjs = specobjs.SpecObjs.from_fitsfile(spec1d_file, chk_version=False) else: sobjs = None msgs.warn('Could not find spec1d file: {:s}'.format(spec1d_file) + msgs.newline() + ' No objects were extracted.') # TODO: This may be too restrictive, i.e. ignore BADFLTCALIB?? slit_gpm = slit_mask == 0 # Restrict on spat_id or maskid? if slit_gpm is not None: if slit_spat_id is not None and args.spat_id is not None: slit_gpm &= slit_spat_id == args.spat_id # TODO: Should this be 'elif'? Why not just 'if'? I.e., are # spat_id and maskdef_id mutually exclusive? elif slit_mask_id is not None and args.maskID is not None: slit_gpm &= slit_mask_id == args.maskID left = None if slit_left is None else slit_left[:, slit_gpm] right = None if slit_right is None else slit_right[:, slit_gpm] slid_IDs = None if slit_slid_IDs is None else slit_slid_IDs[slit_gpm] maskdef_id = None if slit_mask_id is None else slit_mask_id[slit_gpm] # Connect to an open ginga window, or open a new one display.connect_to_ginga(raise_err=True, allow_new=True) # Show the bitmask? if args.showmask is not None and bpmmask is not None: _mask = bpmmask.mask if len(args.showmask) == 0 \ else bpmmask.flagged(flag=args.showmask) viewer, ch_mask = display.show_image(_mask, chname='MASK', waveimg=waveimg, clear=_clear) _clear = False channel_names = [] # SCIIMG if 0 in show_channels: mean, med, sigma = sigma_clipped_stats(sciimg[img_gpm], sigma_lower=5.0, sigma_upper=5.0) cut_min = mean - 1.0 * sigma cut_max = mean + 4.0 * sigma chname_sci = args.prefix+f'sciimg-{detname}' # Clear all channels at the beginning viewer, ch_sci = display.show_image(sciimg, chname=chname_sci, waveimg=waveimg, clear=_clear, cuts=(cut_min, cut_max)) _clear=False if sobjs is not None: show_trace(sobjs, detname, viewer, ch_sci) display.show_slits(viewer, ch_sci, left, right, slit_ids=slid_IDs, maskdef_ids=maskdef_id) channel_names.append(chname_sci) # SKYSUB if 1 in show_channels: image = (sciimg - skymodel) * model_gpm.astype(float) mean, med, sigma = sigma_clipped_stats(image[model_gpm], sigma_lower=5.0, sigma_upper=5.0) cut_min = mean - 1.0 * sigma cut_max = mean + 4.0 * sigma chname_skysub = args.prefix+f'skysub-{detname}' viewer, ch_skysub = display.show_image(image, chname=chname_skysub, waveimg=waveimg, clear=_clear, cuts=(cut_min, cut_max), wcs_match=True) _clear = False if not args.removetrace and sobjs is not None: show_trace(sobjs, detname, viewer, ch_skysub) display.show_slits(viewer, ch_skysub, left, right, slit_ids=slid_IDs, maskdef_ids=maskdef_id) channel_names.append(chname_skysub) # TODO Place holder for putting in sensfunc #if args.sensfunc: # # Load the sensitivity function # wave_sens, sfunc, _, _, _ = sensfunc.SensFunc.load(sensfunc_name) # # Interpolate the sensitivity function onto the wavelength grid of the data. Since the image is rectified # # this is trivial and we don't need to do a 2d interpolation # sens_factor = flux_calib.get_sensfunc_factor( # pseudo_dict['wave_mid'][:,islit], wave_sens, sfunc, fits.getheader(files[0])['TRUITIME'], # extrap_sens=parset['fluxcalib']['extrap_sens']) # # Compute the median sensitivity and set the sensitivity to zero at locations 100 times the median. This # # prevents the 2d image from blowing up where the sens_factor explodes because there is no throughput # sens_gpm = sens_factor < 100.0*np.median(sens_factor) # sens_factor_masked = sens_factor*sens_gpm # sens_factor_img = np.repeat(sens_factor_masked[:, np.newaxis], pseudo_dict['nspat'], axis=1) # imgminsky = sens_factor_img*pseudo_dict['imgminsky'] # imgminsky_gpm = sens_gpm[:, np.newaxis] & pseudo_dict['inmask'] #else: # imgminsky= pseudo_dict['imgminsky'] # SKRESIDS if 2 in show_channels: # the block below is repeated because if showing this channel but # not channel 1 it will crash chname_skyresids = args.prefix+f'sky_resid-{detname}' image = (sciimg - skymodel) * np.sqrt(ivarmodel) * model_gpm.astype(float) viewer, ch_sky_resids = display.show_image(image, chname_skyresids, waveimg=waveimg, clear=_clear, cuts=(-5.0, 5.0)) _clear = False if not args.removetrace and sobjs is not None: show_trace(sobjs, detname, viewer, ch_sky_resids) display.show_slits(viewer, ch_sky_resids, left, right, slit_ids=slid_IDs, maskdef_ids=maskdef_id) channel_names.append(chname_skyresids) # RESIDS if 3 in show_channels: chname_resids = args.prefix+f'resid-{detname}' # full model residual map image = (sciimg - skymodel - objmodel) * np.sqrt(ivarmodel) * img_gpm.astype(float) viewer, ch_resids = display.show_image(image, chname=chname_resids, waveimg=waveimg, clear=_clear, cuts=(-5.0, 5.0), wcs_match=True) _clear = False if not args.removetrace and sobjs is not None: show_trace(sobjs, detname, viewer, ch_resids) display.show_slits(viewer, ch_resids, left, right, slit_ids=slid_IDs, maskdef_ids=maskdef_id) channel_names.append(chname_resids) # After displaying all the images sync up the images with WCS_MATCH shell = viewer.shell() shell.start_global_plugin('WCSMatch') shell.call_global_plugin_method('WCSMatch', 'set_reference_channel', [channel_names[-1]], {}) if args.embed: embed(header=utils.embed_header())
# Playing with some mask stuff #out = shell.start_operation('TVMask') #maskfile = '/Users/joe/python/PypeIt-development-suite/REDUX_OUT/Shane_Kast_blue/600_4310_d55/shane_kast_blue_setup_A/crmask.fits' #out = shell.call_local_plugin_method(chname_resids, 'TVMask', 'load_file', [maskfile], {})