Source code for pypeit.core.wavecal.waveio

""" Module for I/O in arclines

.. include:: ../include/links.rst

import pathlib

import astropy.table
import linetools.utils
import numpy as np

from pypeit import msgs
from pypeit.core.wavecal import defs
from pypeit import data

from IPython import embed

# TODO -- Move this to the WaveCalib object
[docs]def load_wavelength_calibration(filename: pathlib.Path) -> dict: """ Load the wavelength calibration data from a file. Args: filename (:obj:`pathlib.Path`): Name of the json file. Returns: :obj:`dict`: Returns the wavelength calibration dictionary. Lists read from the json file are returnes as numpy arrays. """ if not filename.is_file(): msgs.error(f"File does not exist: {filename}") # Force any possible pathlib.Path object to string before `loadjson` wv_calib = linetools.utils.loadjson(str(filename)) # Recast a few items as arrays for key in wv_calib.keys(): if key in ['steps', 'par']: # This isn't really necessary continue # Masked slit? if wv_calib[key] is None: continue # Arrays for tkey in wv_calib[key].keys(): if isinstance(wv_calib[key][tkey], list): wv_calib[key][tkey] = np.array(wv_calib[key][tkey]) return wv_calib
[docs]def load_template(arxiv_file:str, det:int, wvrng:list=None)->tuple[np.ndarray,np.ndarray,int]: """ Load a full template file from disk Parameters ---------- arxiv_file : str File with archive spectrum det : int Detector number wvrng : list, optional min, max wavelength range for the arxiv Returns ------- wave : ndarray Wavelength vector flux : ndarray Flux vector binning : int binning of the template arc spectrum """ calibfile, fmt = data.get_reid_arxiv_filepath(arxiv_file) tbl =, format=fmt) # Parse on detector? if 'det' in tbl.keys(): idx = np.where(tbl['det'].data & 2**det)[0] else: idx = np.arange(len(tbl)).astype(int) tbl_wv = tbl['wave'].data[idx] tbl_fx = tbl['flux'].data[idx] # Cut down? if wvrng is not None: gd_wv = (tbl_wv >= wvrng[0]) & (tbl_wv <= wvrng[1]) tbl_wv = tbl_wv[gd_wv] tbl_fx = tbl_fx[gd_wv] # Return return tbl_wv, tbl_fx, tbl.meta['BINSPEC']
[docs]def load_reid_arxiv(arxiv_file): """ Load a REID arxiv file Now there are 2 possible formats. We need to consolidate Args: arxiv_file (str): Returns: dict, dict-like: """ # This function allows users to specify their own `reid_arxiv`, in # particular, the output from `pypeit_identify`. calibfile, arxiv_fmt = data.get_reid_arxiv_filepath(arxiv_file) # This is a hack as it will fail if we change the data model yet again for wavelength solutions if arxiv_fmt == 'json': wv_calib_arxiv = load_wavelength_calibration(calibfile) par = wv_calib_arxiv['par'].copy() # Pop out par and steps if they were inserted in this calibration dictionary try: wv_calib_arxiv.pop('steps') except KeyError: pass try: wv_calib_arxiv.pop('par') except KeyError: pass elif arxiv_fmt == 'fits': # The following is a bit of a hack too par = None wv_tbl =, format='fits') wv_calib_arxiv = {} nrow = wv_tbl['wave'].shape[0] for irow in np.arange(nrow): wv_calib_arxiv[str(irow)] = {} wv_calib_arxiv[str(irow)]['spec'] = wv_tbl['flux'][irow,:] wv_calib_arxiv[str(irow)]['wave_soln'] = wv_tbl['wave'][irow,:] wv_calib_arxiv[str(irow)]['order'] = wv_tbl['order'][irow] else: msgs.error(f"Not ready for this `reid_arxiv` extension: {arxiv_fmt}") return wv_calib_arxiv, par
[docs]def load_line_list(line_file, use_ion=False): """ Parameters ---------- line_file : str Full path to line_list or name of ion use_ion : bool, optional Interpret line_file as an ion, e.g. CuI Returns ------- line_list : `astropy.table.Table`_ """ line_file = data.get_linelist_filepath(f'{line_file}_lines.dat') if use_ion else \ data.get_linelist_filepath(line_file) return, format='ascii.fixed_width', comment='#')
[docs]def load_line_lists(lamps, all=False, include_unknown:bool=False, restrict_on_instr=None): """ Loads a series of line list files Parameters ---------- lamps : list List of arc lamps to be used for wavelength calibration. E.g., ['ArI','NeI','KrI','XeI'] restrict_on_instr : str, optional Restrict according to the input spectrograph all : bool, optional Load all line lists, independent of the input lamps (not recommended) include_unknown : bool, optional If True, the tot_line_list includes the unknown lines Returns ------- tot_line_list : astropy Table of line lists (including unknown lines, if requested) line_list : astropy Table of line lists unkn_lines : astropy Table of unknown lines """ # All? if all: # Search both in the package directory and the PypeIt cache line_files = list(data.Paths.linelist.glob('*_lines.dat')) line_files.append(data.search_cache('_lines.dat')) lamps = [] for line_file in line_files: i0 = line_file.rfind('/') i1 = line_file.rfind('_') lamps.append(line_file[i0+1:i1])"Arc lamps used: {', '.join(lamps)}") # Read standard files # NOTE: If one of the `lamps` does not exist, data.get_linelist_filepath() # will exit with msgs.error(). lists = [load_line_list(data.get_linelist_filepath(f'{lamp}_lines.dat')) for lamp in lamps] # Stack if len(lists) == 0: return None line_lists_all = astropy.table.vstack(lists, join_type='exact') # Restrict on the spectrograph? if restrict_on_instr is not None: instr_dict = defs.instruments() gdI = (line_lists_all['Instr'] & instr_dict[restrict_on_instr]) > 0 line_lists_all = line_lists_all[gdI] # Load Unknowns if 'ThAr' in lamps: line_lists = line_lists_all[line_lists_all['ion'] != 'UNKNWN'] unkn_lines = line_lists_all[line_lists_all['ion'] == 'UNKNWN'] else: line_lists = line_lists_all unkn_lines = load_unknown_list(lamps) #unkn_lines.remove_column('line_flag') # may wish to have this info # Stack? tot_line_list = astropy.table.vstack([line_lists, unkn_lines]) if include_unknown else line_lists_all # Return return tot_line_list, line_lists, unkn_lines
[docs]def load_tree(polygon=4, numsearch=20): """ Load a KDTree of ThAr patterns that is stored on disk Parameters ---------- polygon : int Number of sides to the polygon used in pattern matching: - polygon=3 --> trigon (two anchor lines and one floating line) - polygon=4 --> tetragon (two anchor lines and two floating lines) - polygon=5 --> pentagon (two anchor lines and three floating lines) numsearch : int Number of consecutive detected lines used to generate a pattern. For example, if numsearch is 4, then for a trigon, the following patterns will be generated (assuming line #1 is the left anchor): - 1 2 3 (in this case line #3 is the right anchor) - 1 2 4 (in this case line #4 is the right anchor) - 1 3 4 (in this case line #4 is the right anchor) Returns ------- file_load : KDTree instance The KDTree containing the patterns index : ndarray For each pattern in the KDTree, this array stores the corresponding index in the linelist """ # TODO: Can we save these as fits files instead? # TODO: Please don't use imports within functions import pickle filename = data.get_linelist_filepath(f'ThAr_patterns_poly{polygon}_search{numsearch}.kdtree') fileindx = data.get_linelist_filepath( f'ThAr_patterns_poly{polygon}_search{numsearch}.index.npy' ) try: with open(filename, "rb", encoding="utf-8") as f_obj: file_load = pickle.load(f_obj) index = np.load(fileindx) except FileNotFoundError:'The requested KDTree was not found on disk' + msgs.newline() + 'please be patient while the ThAr KDTree is built and saved to disk.') from pypeit.core.wavecal import kdtree_generator file_load, index = kdtree_generator.main(polygon, numsearch=numsearch, verbose=True, ret_treeindx=True, outname=filename) return file_load, index
[docs]def load_unknown_list(lines, unknwn_file=None, all=False): """ Parameters ---------- lines : list Restricted lines; use all=True for all unknwn_file : str, optional all : bool, optional Returns ------- unknwn_lines : `astropy.table.Table`_ """ line_dict = defs.lines() # Load if unknwn_file is None: unknwn_file = data.get_linelist_filepath('UNKNWNs.dat') line_list = load_line_list(unknwn_file) # Cut on input lamps? if all: return line_list # Otherwise msk = np.zeros(len(line_list), dtype=bool) for line in lines: # Skip if the lines is not even in the line list if line not in line_dict.keys(): continue # Else consider masking line_flag = line_dict[line] match = line_list['line_flag'] % (2*line_flag) >= line_flag msk[match] = True # Finish return line_list[msk]