"""
Spec1dView is a plugin for the Ginga viewer that provides functionality for
visualizing and analyzing 1D spectra from FITS files. The plugin allows users
to plot spectra, identify spectral lines from various line lists, and
customize the display according to different parameters.
Plugin Type: Local
==================
Spec1dView is a local plugin, which means it is associated with a specific
channel in the Ginga viewer. An instance of the plugin can be opened for
each channel, allowing for multiple spectra to be analyzed simultaneously.
Usage
-----
- Load and visualize 1D spectra from FITS files.
- Customize the display by selecting different line lists, extraction types,
and flux/mask settings.
- Update the redshift to shift the spectral lines accordingly.
Editing
-------
Users can modify the visualization by:
- Choosing from a variety of line lists to identify spectral features.
- Selecting different types of extraction methods (OPT, BOX).
- Applying or removing flux calibration and masking options.
- Updating the redshift value to reflect the observed wavelengths.
UI
--
The user interface provides controls for:
- Selecting the line list from a combobox.
- Entering a redshift value to shift the spectrum.
- Choosing the extraction type, flux calibration, and masking options via
comboboxes.
- Buttons to load a FITS file and clear the current selection.
Buttons
-------
- Update z: Updates the redshift value and refreshes the spectrum plot.
- Enter: Loads the specified FITS file for analysis.
- Clear: Clears the current inputs and resets the UI settings.
Tips
----
- Use the comboboxes to switch between different line lists and adjust the
spectrum display settings.
- Ensure that the correct FITS file path is entered before attempting to load
the data.
"""
import time
import numpy as np
# for gaussian line fitting
from astropy.modeling import models, fitting
from ginga import GingaPlugin
from ginga.misc import Bunch
from ginga.gw import Widgets
from ginga.table.AstroTable import AstroTable
from ginga.plot.Plotable import Plotable
from ginga.canvas.CanvasObject import get_canvas_types
from ginga.util.syncops import Shelf
from pypeit import specobjs
from pypeit import utils
__all__ = ['Spec1dView']
[docs]
class Spec1dView(GingaPlugin.LocalPlugin):
def __init__(self, fv, fitsimage):
"""Constructor for the plugin."""
# superclass defines some variables for us, like logger
super().__init__(fv, fitsimage)
# get Spec1dView preferences
prefs = self.fv.get_preferences()
self.settings = prefs.create_category('plugin_Spec1dView')
self.settings.add_defaults(lines="error", start_ext=0,
extraction='OPT', fluxed=False, masked=False,
plot_error=True, autozoom=True)
self.settings.load(onError='silent')
# will be set if we are invoked
self.data = Bunch.Bunch()
self.sobjs = []
self.num_exten = 0
self.exten = 0
self.w = None
# selected line list
self.line_list = 'None'
# allowed line lists
self.line_lists = ['None'] + utils.get_line_list_names()
self.llist = None # the actual line list object
self.ext_name = ''
# redshift
self.z = 0.0
self.start_ext = self.settings.get('start_ext', 0)
self.extraction_types = ('OPT', 'BOX')
self.extraction = self.settings.get('extraction', 'OPT')
self.fluxed_options = (True, False)
self.fluxed = self.settings.get('fluxed', False)
self.masked_options = (True, False)
self.masked = self.settings.get('masked', False)
# smoothing factor
self.nsmooth = 0.0
# dictionary of plotable types
self.dc = get_canvas_types()
self.plot = None
self.plot_shelf = Shelf()
self.plot_stocker = self.plot_shelf.get_stocker()
# holds regions of interest in the X axis
self.region_dct = dict(mark=[None, None])
# holds markers in the X axis
self.marker_dct = dict()
viewer = self.channel.get_viewer('Ginga Plot')
viewer.add_callback('range-set', self.range_changed_cb)
self.gui_up = False
[docs]
def build_gui(self, container):
"""Construct the UI in the plugin container.
This get's called just prior to the ``start`` method when the plugin is
activated.
"""
top = Widgets.VBox()
top.set_border_width(4)
vbox = Widgets.VBox()
vbox.set_border_width(4)
vbox.set_spacing(2)
fr = Widgets.Frame("File")
captions = (("1D file:", 'label', 'filepath', 'entryset'),
("Extensions:", 'label', 'extensions', 'llabel'),
("Extension:", 'label', 'exten', 'combobox'),
("Extraction:", 'label', 'extraction', 'combobox'),
("Fluxed:", 'label', 'fluxed', 'combobox'),
("Masked:", 'label', 'masked', 'combobox'),
("Plot Error", 'checkbox'),
)
w, b = Widgets.build_info(captions, orientation='vertical')
self.w = b
b.filepath.add_callback('activated', self.set_filepath_cb)
b.exten.set_tooltip("Choose extension of file to view")
b.exten.add_callback('activated', self.set_exten_cb)
b.exten.set_enabled(False)
combobox = b.extraction
for name in self.extraction_types:
combobox.append_text(name)
index = self.extraction_types.index(self.extraction)
combobox.set_index(index)
combobox.add_callback('activated', self.set_extraction_cb)
combobox = b.fluxed
for name in self.fluxed_options:
combobox.append_text(str(name))
index = self.fluxed_options.index(self.fluxed)
combobox.set_index(index)
combobox.add_callback('activated', self.set_fluxed_cb)
combobox = b.masked
for name in self.masked_options:
combobox.append_text(str(name))
index = self.masked_options.index(self.masked)
combobox.set_index(index)
combobox.add_callback('activated', self.set_masked_cb)
b.plot_error.set_state(self.settings.get('plot_error', True))
b.plot_error.set_tooltip("Include the error plot")
b.plot_error.add_callback('activated', self.plot_error_cb)
fr.set_widget(w)
vbox.add_widget(fr, stretch=0)
fr = Widgets.Frame("Lines")
captions = (("Z:", 'label', 'redshift', 'entryset'),
("Line lists:", 'label', 'lists', 'combobox'),
)
w, b = Widgets.build_info(captions, orientation="vertical")
self.w.update(b)
b.redshift.set_text(str(self.z))
b.redshift.add_callback('activated', self.set_z_cb)
b.redshift.set_tooltip("Set the redshift (Z) value")
combobox = b.lists
for name in self.line_lists:
combobox.append_text(name)
index = self.line_lists.index(self.line_list)
combobox.set_index(index)
combobox.set_tooltip("Line list to plot on this spectrum")
combobox.add_callback('activated', self.set_line_list_cb)
fr.set_widget(w)
vbox.add_widget(fr, stretch=0)
tbar = Widgets.Toolbar(orientation='horizontal')
vbox.add_widget(tbar, stretch=0)
btn = tbar.add_action("No smooth")
btn.add_callback('activated', lambda w: self.no_smooth())
btn.set_enabled(self.nsmooth > 0.0)
self.w.no_smooth = btn
btn.set_tooltip("Turn off smoothing")
btn = tbar.add_action("Smooth-")
btn.add_callback('activated', lambda w: self.smooth_less())
btn.set_enabled(self.nsmooth > 0.0)
btn.set_tooltip("Smooth a bit less")
self.w.smooth_less = btn
btn = tbar.add_action("Smooth+")
btn.add_callback('activated', lambda w: self.smooth_more())
btn.set_tooltip("Smooth a bit more")
self.w.smooth_more = btn
fr = Widgets.Frame("Result")
self.w.text = Widgets.TextArea(wrap=True)
fr.set_widget(self.w.text)
vbox.add_widget(fr, stretch=1)
top.add_widget(vbox, stretch=0)
spacer = Widgets.Label('')
top.add_widget(spacer, stretch=1)
btns = Widgets.HBox()
btns.set_spacing(3)
btn = Widgets.Button("Close")
btn.add_callback('activated', lambda w: self.close())
btns.add_widget(btn, stretch=0)
btn = Widgets.Button("Help")
btn.add_callback('activated', lambda w: self.help())
btns.add_widget(btn, stretch=0)
btns.add_widget(Widgets.Label(''), stretch=1)
top.add_widget(btns, stretch=0)
container.add_widget(top, stretch=1)
self.gui_up = True
[docs]
def set_z_cb(self, w):
"""Callback for setting the zed value in the plugin.
Replot the lines as a result.
"""
z = float(w.get_text())
self.logger.info(f"z = {z}")
self.z = z
self.fv.gui_do(self.plot_lines)
[docs]
def set_line_list_cb(self, w, idx):
"""Callback for setting the line list in the plugin.
Construct a new line list (or None) and replot the lines as a result.
"""
self.line_list = self.line_lists[idx]
if self.line_list == 'None':
self.llist = None
else:
self.llist = utils.get_line_list(self.line_list)
self.logger.info(f"Loaded line list: '{self.line_list}'")
self.fv.gui_do(self.plot_lines)
[docs]
def fit_y(self):
"""Zoom to fit Y axis"""
viewer = self.channel.get_viewer('Ginga Plot')
viewer.zoom_fit(axis='y')
[docs]
def set_fluxed_cb(self, w, idx):
"""Callback for changing the `fluxed` option in the plugin.
Redo the data extraction and replot everything as a result.
"""
self.fluxed = self.fluxed_options[idx]
self.logger.debug(f"Selected fluxed option: {self.fluxed}")
self.recalc()
self.fit_y()
[docs]
def set_masked_cb(self, w, idx):
"""Callback for changing the `masked` option in the plugin.
Redo the data extraction and replot everything as a result.
"""
self.masked = self.masked_options[idx]
self.logger.debug(f"Selected masked option: {self.masked}")
self.recalc()
[docs]
def no_smooth(self):
"""Completely remove all smoothing from the plot."""
self.nsmooth = 0.0
self.w.no_smooth.set_enabled(False)
self.w.smooth_less.set_enabled(False)
self.recalc()
[docs]
def smooth_more(self):
"""Increase the level of smoothing in the plot."""
self.nsmooth += (0.5 if self.nsmooth > 0.0 else 1.0)
self.w.no_smooth.set_enabled(True)
self.w.smooth_less.set_enabled(True)
self.recalc()
[docs]
def smooth_less(self):
"""Decrease the level of smoothing in the plot."""
self.nsmooth -= (0.5 if self.nsmooth > 1.0 else 1.0)
self.nsmooth = max(self.nsmooth, 0.0)
self.w.no_smooth.set_enabled(self.nsmooth > 0.0)
self.w.smooth_less.set_enabled(self.nsmooth > 0.0)
self.recalc()
[docs]
def set_region_side(self, name, side, value):
"""Define one side (left, right, top, bottom) of a named region.
Parameters
----------
name : str
The name of a region (e.g. 'mark')
side : str ('left', 'right', 'top', 'bottom')
The side of the region to set
value : float
The value to set for this side of the region
"""
region = self.region_dct.setdefault(name, [None, None])
if side in ('left', 'bottom'):
region[0] = value
if region[1] is not None and region[1] <= value:
region[1] = None
elif side in ('right', 'top'):
region[1] = value
if region[0] is not None and region[0] >= value:
region[0] = None
self.plot_lines()
[docs]
def clear_region(self, name):
"""Clear the region named `name`."""
self.region_dct[name] = [None, None]
self.plot_lines()
def clear_markers(self):
"""Clear all the markers."""
self.marker_dct = dict()
self.plot_lines()
[docs]
def cut_x_range(self, name):
"""Zoom the X range of the plot to the region named `name`."""
x_lo, x_hi = self.region_dct[name]
if None in (x_lo, x_hi):
self.fv.show_error("Please set left and right markers to delineate region to be cut!")
return
viewer = self.channel.get_viewer('Ginga Plot')
viewer.set_ranges(x_range=(x_lo, x_hi))
self.clear_region(name)
[docs]
def fit_range(self, name):
"""Perform a fitting on the flux data within the region named `name`."""
x_lo, x_hi = self.region_dct[name]
if None in (x_lo, x_hi):
self.fv.show_error("Please set left and right markers to delineate region to be fitted!")
return
try:
# find indices of low and high X values
idx_lo = np.searchsorted(self.data.wave, x_lo, side="left")
idx_hi = np.searchsorted(self.data.wave, x_hi, side="right")
# fit a gaussian
model, fitter = fit_gaussian_line(self.data.wave, self.data.flux,
idx_lo, idx_hi)
continuum = model[0]
line = model[1]
except Exception as e:
errtxt = f"error doing fitting: {e}; traceback in viewer log"
self.w.text.set_text(errtxt)
self.logger.error(f"error doing fitting: {e}", exc_info=True)
return
# collect some results and display
line_center = line.mean.value
line_sigma = line.stddev.value
line_fwhm = 2.35482 * line_sigma
line_flux = line.amplitude.value * np.sqrt(2 * np.pi) * line_sigma
# Plot the fitted gaussian
# Dense grid for smooth model curve
x_fitted = np.linspace(self.data.wave[idx_lo],
self.data.wave[idx_hi], 100) # 1000?
y_fitted = model(x_fitted)
result = dict(x=line_center, sigma=line_sigma, fwhm=line_fwhm,
flux=line_flux, x_fitted=x_fitted, y_fitted=y_fitted)
# TODO: probably want to plot multiple unique lines and label them
# name = str(line_center) # ?? what is a better unique name
# for now, just plot the last result
# self.add_marker(name, result)
self.marker_dct = dict(fitting=result)
text = """
Center: {x:.4f}
Sigma: {sigma:.6f}
FWHM: {fwhm:.6f}
Flux: {flux:.4f}
""".format(**result)
self.w.text.set_text(text)
self.plot_lines()
[docs]
def add_marker(self, name, dct):
self.marker_dct[name] = dct
self.plot_lines()
[docs]
def clear_marker(self, name):
if name in self.marker_dct:
del self.marker_dct[name]
self.plot_lines()
[docs]
def clear_markers(self):
self.marker_dct = dict()
self.plot_lines()
[docs]
def plot_markers(self):
"""Plot the regions and markers.
"""
if self.plot is None:
return
canvas = self.plot.get_canvas()
objs = canvas.get_objects_by_tag_pfx('mark_')
canvas.delete_objects(objs)
x_lo, x_hi = self.region_dct['mark']
if None not in (x_lo, x_hi):
canvas.add(self.dc.XRange(x_lo, x_hi, fillcolor='aquamarine',
fillalpha=0.25),
tag='mark_%region', redraw=False)
viewer = self.channel.get_viewer('Ginga Plot')
(x_lo, x_hi), (y_lo, y_hi) = viewer.get_ranges()
for name, dct in self.marker_dct.items():
x = dct['x']
line = self.dc.Line(x, y_lo, x, y_hi, linewidth=2,
linestyle='dotted', arrow='start',
color='green')
canvas.add(line, tag=f'mark_{name}', redraw=False)
if 'x_fitted' in dct:
# <-- there is a fit to be plotted
points = np.array((dct['x_fitted'], dct['y_fitted'])).T
path = self.dc.Path(points, linewidth=2, linestyle='solid',
alpha=1.0, color='green')
canvas.add(path, tag='mark_{name}_fit', redraw=False)
[docs]
def plot_lines(self):
"""Plot the line list + regions + markers.
Lines are made into a single compound object so that it is easier
to remove them as a group if the line list is changed.
"""
if self.plot is None:
return
canvas = self.plot.get_canvas()
canvas.delete_object_by_tag('lines', redraw=False)
lines = self.dc.Canvas()
if self.llist is not None:
# NOTE: adapted straight from linetools
x_min, x_max = self.data.x_min, self.data.x_max
y_min, y_max = self.data.y_min, self.data.y_max
z = self.z
wvobs = np.array((1 + z) * self.llist['wrest'])
ylbl_pos = y_max - 0.2 * (y_max - y_min)
gdwv = np.where((wvobs > x_min) & (wvobs < x_max))[0]
viewer = self.channel.get_viewer('Ginga Plot')
(x_lo, x_hi), (y_lo, y_hi) = viewer.get_ranges()
# make label always sit about 3/4 up the Y range
y_lbl = y_lo + (y_hi - y_lo) * 0.75
# line should reach to the label at least
y_max = max(y_lbl, y_max)
for kk in range(len(gdwv)):
jj = gdwv[kk]
wrest = self.llist['wrest'][jj]
x_lbl = wrest * (z + 1)
if not (x_lo < x_lbl < x_hi):
# skip plotting lines that are not visible
continue
# Plot
x_data = wrest * np.array([z + 1, z + 1])
y_data = (y_min, y_max)
lines.add(self.dc.Line(x_data[0], y_data[0],
x_data[1], y_data[1],
linewidth=1,
#linestyle='dotted',
linestyle='solid',
color='blue'), redraw=False)
# Label
lbl = self.llist['name'][jj]
lines.add(self.dc.Text(x_lbl, y_lbl, text=lbl, rot_deg=90,
bgcolor='white', bgalpha=1.0,
color='blue', fontsize=10),
redraw=False)
# will be empty if there were no lines
canvas.add(lines, tag='lines', redraw=False)
self.plot_markers()
# this causes the plot viewer to redraw itself
with self.plot_stocker:
self.plot.make_callback('modified')
[docs]
def replot(self):
"""Replot the plot and line list.
"""
if self.plot is None:
return
# plot flux vs. wavelength
self.plot.clear()
canvas = self.plot.get_canvas()
# plot flux vs. wavelen
points = np.array((self.data.wave, self.data.flux)).T
p1 = self.dc.Path(points, linewidth=1, linestyle='solid',
alpha=0.7, color='black')
canvas.add(p1, tag='spectrum', redraw=False)
# optionally, plot error vs. wavelength
if self.settings.get('plot_error', False):
points = np.array((self.data.wave, self.data.sig)).T
p2 = self.dc.Path(points, linewidth=2, linestyle='solid',
alpha=1.0, color='red')
canvas.add(p2, tag='error', redraw=False)
self.plot.set_titles(title=f"Spec1dView: {self.ext_name}",
x_axis="Wavelength (Ang)", y_axis="Flux")
self.plot.set_grid(True)
self.plot_lines()
[docs]
def range_changed_cb(self, viewer, ranges):
"""Called when the plot viewers displayed range has changed.
We use this to replot lines of interest.
"""
if not self.plot_shelf.is_blocked():
self.plot_lines()
[docs]
def process_file(self, filepath):
"""Process `filepath` creating `SpecObjs` (a series of extensions),
which can then have data extracted and plotted.
"""
self.w.filepath.set_text(filepath)
self.sobjs = specobjs.SpecObjs.from_fitsfile(filepath,
chk_version=False)
self.num_exten = len(self.sobjs)
self.w.extensions.set_text(str(self.num_exten))
self.w.exten.clear()
for name in self.sobjs.NAME:
self.w.exten.append_text(name)
self.w.exten.set_enabled(True)
self.exten = min(self.start_ext, self.num_exten - 1)
self.w.exten.set_index(self.exten)
# create a new plotable to contain the plot
self.plot = Plotable(logger=self.logger)
self.plot.set(name="plot-{}".format(str(time.time())),
path=None, nothumb=True)
self.recalc()
# add the plot to this channel
self.channel.add_image(self.plot)
self.autozoom_plot()
[docs]
def recalc(self):
"""Reprocess the chosen extension, based on current choices for
extraction method, fluxing and masking.
Replot everything as a result.
"""
specobj = self.sobjs[self.exten]
# check if we have BOX or OPT extractions and adjust UI
# accordingly
if specobj['OPT_WAVE'] is None:
self.w.extraction.set_text('BOX')
self.extraction = 'BOX'
self.w.extraction.set_enabled(False)
elif specobj['BOX_WAVE'] is None:
self.w.extraction.set_text('OPT')
self.extraction = 'OPT'
self.w.extraction.set_enabled(False)
else:
self.w.extraction.set_enabled(True)
# look for OPT_FLAM_IVAR or BOX_FLAM_IVAR
# if don't have, then fluxed==True cannot be used
if specobj[f'{self.extraction}_FLAM_IVAR'] is None:
self.w.fluxed.set_text('False')
self.fluxed = False
self.w.fluxed.set_enabled(False)
else:
self.w.fluxed.set_enabled(True)
wave, flux, ivar, gpm = specobj.to_arrays(extraction=self.extraction,
fluxed=self.fluxed)
if self.fluxed and ivar is None:
# <-- fluxed=True cannot be used
self.fv.show_error("fluxed=True cannot be used with this data")
return
sig = np.sqrt(utils.inverse(ivar))
wave_gpm = wave > 1.0
wave, flux, sig, gpm = (wave[wave_gpm], flux[wave_gpm],
sig[wave_gpm], gpm[wave_gpm])
if self.masked:
flux = flux*gpm
sig = sig*gpm
if self.nsmooth > 0.0:
# do smoothing if requested
flux = utils.convolve_psf(flux, self.nsmooth)
# TODO: I don't think this is right. I think it should be:
# sig = np.sqrt(utils.convolve_psf(sig**2, self.nsmooth))
sig = utils.convolve_psf(sig, self.nsmooth)
self.data.x_min, self.data.x_max = np.nanmin(wave), np.nanmax(wave)
self.data.y_min, self.data.y_max = np.nanmin(flux), np.nanmax(flux)
self.data.wave = wave
self.data.flux = flux
self.data.sig = sig
self.ext_name = self.sobjs.NAME[self.exten]
self.replot()
[docs]
def close(self):
"""Method called to close the plugin when the Close button is pressed."""
self.fv.stop_local_plugin(self.chname, str(self))
[docs]
def start(self):
"""Method called right after `build_gui` when the plugin is activated.
Simply attempt to process the latest FITS file loaded in the channel.
"""
viewer = self.channel.get_viewer('Ginga Plot')
bd = viewer.get_bindings()
bd.set_mode(viewer, 'spec1d', mode_type='locked')
self.redo()
[docs]
def stop(self):
"""Method called when the plugin is deactivated.
Clean up instance variables so we don't hang on to any large data
structures.
"""
viewer = self.channel.get_viewer('Ginga Plot')
viewer.clear()
self.sobjs = 0
self.exten = 0
self.ext_name = ''
self.num_exten = 0
self.data = Bunch.Bunch()
self.plot = None
self.gui_up = False
[docs]
def redo(self):
"""Method called when a new FITS image or extension is loaded into the
channel.
We do some minimal checks to make sure that it is a table, then call the
routine to process the file that is behind this table.
"""
if not self.gui_up:
return
dataobj = self.channel.get_current_image()
if not isinstance(dataobj, AstroTable):
# NOTE: do we need a better test here for a 1D data item?
return
path = dataobj.get('path', None)
if path is None:
self.fv.show_error(
"Cannot open dataobj: no value for metadata key 'path'")
return
self.process_file(path)
[docs]
def set_filepath_cb(self, w):
"""Callback for changing the path in the UI.
Try to process the new file.
"""
filepath = w.get_text().strip()
self.process_file(filepath)
[docs]
def set_exten_cb(self, w, val):
"""Callback for changing the extension in the UI.
Try to process the new extension.
"""
self.exten = val
self.recalc()
self.autozoom_plot()
[docs]
def plot_error_cb(self, w, val):
"""Callback for toggling the "Plot Error" checkbox in the UI.
"""
self.settings.set(plot_error=val)
self.recalc()
[docs]
def autozoom_plot(self):
if self.settings.get('autozoom', False):
viewer = self.channel.get_viewer('Ginga Plot')
viewer.zoom_fit()
[docs]
def set_params(self, ext=None, extraction=None, masked=None, fluxed=None):
"""Used to set up defaults from command line args to pypeit_show_1dspec script."""
self.logger.info(f"ext={ext} extraction={extraction} masked={masked} fluxed={fluxed}")
if ext is not None:
self.start_ext = ext
if extraction is not None:
self.extraction = extraction
if masked is not None:
self.masked = masked
if fluxed is not None:
self.fluxed = fluxed
if self.gui_up:
index = self.extraction_types.index(self.extraction)
self.w.extraction.set_index(index)
index = self.fluxed_options.index(self.fluxed)
self.w.fluxed.set_index(index)
index = self.masked_options.index(self.masked)
self.w.masked.set_index(index)
def __str__(self):
# necessary to identify the plugin and provide correct operation in Ginga
return 'spec1dview'
# TODO: This should be moved somewhere more general.
def fit_gaussian_line(x, flux, i_low, i_high):
"""
Fit a Gaussian spectral line with a local linear continuum.
Parameters
----------
x : array_like
Monotonically increasing spectral coordinate (wavelength, frequency, etc.).
flux : array_like
Flux values.
i_low : int
Lower index (inclusive) of fitting window.
i_high : int
Upper index (exclusive) of fitting window.
Returns
-------
model : astropy.modeling.CompoundModel
Best-fit (Gaussian1D + Linear1D) model.
fitter : astropy.modeling.fitting.LevMarLSQFitter
Fitter instance containing fit diagnostics.
"""
x = np.asarray(x)
flux = np.asarray(flux)
if x.ndim != 1 or flux.ndim != 1:
raise ValueError("x and flux must be 1D arrays")
if len(x) != len(flux):
raise ValueError("x and flux must have the same length")
if not (0 <= i_low < i_high <= len(x)):
raise ValueError("Invalid index range")
# Extract window
x_fit = x[i_low:i_high]
f_fit = flux[i_low:i_high]
# --- Continuum initialization ---
# Estimate continuum from window edges
n_edge = max(1, int(0.1 * len(x_fit)))
cont_level = np.median(
np.concatenate([f_fit[:n_edge], f_fit[-n_edge:]])
)
cont_init = models.Linear1D(
slope=0.0,
intercept=cont_level
)
# --- Gaussian initialization ---
peak_index = np.argmax(np.abs(f_fit - cont_level))
mean_init = x_fit[peak_index]
amplitude_init = f_fit[peak_index] - cont_level
stddev_init = 0.25 * (x_fit[-1] - x_fit[0])
gauss_init = models.Gaussian1D(
amplitude=amplitude_init,
mean=mean_init,
stddev=stddev_init
)
# Compound model: continuum + line
model_init = cont_init + gauss_init
fitter = fitting.LevMarLSQFitter()
model_fit = fitter(model_init, x_fit, f_fit)
return model_fit, fitter