# Source code for pypeit.core.trace

```
"""
Module with generalized tracing routines.
Routines are primarily used for tracing slit edges.
TODO: Add object and wavelength tracing routines here?
TODO: Is there a way that we could define this link so that it's
accessible by the docstring of all modules?
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
from collections import Counter
from IPython import embed
import numpy as np
from scipy import ndimage, signal, optimize
from matplotlib import pyplot as plt
from astropy.stats import sigma_clipped_stats, sigma_clip
from pypeit import msgs
from pypeit import utils
from pypeit import sampling
from pypeit.core import moment, pydl, arc
# TODO: Some of these functions could probably just live in pypeit.edges
[docs]def detect_slit_edges(flux, bpm=None, median_iterations=0, min_sqm=30., sobel_mode='nearest',
sobel_enhance=0, sigdetect=30., grow_bpm=5):
r"""
Find slit edges using the input image.
The primary algorithm is to run a Sobel filter on the image and
then trigger on all significant gradients. Positive gradients are
left edges, negative gradients are right edges.
Args:
flux (`numpy.ndarray`_):
Calibration frame used to identify slit edges. Likely a
flat-field image that has been lightly smoothed in the
spectral direction. The image should also have its bad
pixels replaced (see
:func:`pypeit.core.procimg.replace_columns`). The image
*must* follow the pypeit convention, with shape
:math:`(N_{\rm spec}, N_{\rm spat})`.
bpm (`numpy.ndarray`_, optional):
A boolean or integer bad-pixel mask. If None, all pixels
are assumed valid. This is used to ignore features in the
image that may be due to bad pixels.
median_iterations (:obj:`int`, optional):
Number of median smoothing iteration to perform on the trace
image. The size of the smoothing is always (7,3). For
long-slit data, we recommend `median_iterations=0`.
min_sqm (:obj:`float`, optional):
Minimum error used when detecting a slit edge. TODO: This
needs a better description.
sobel_mode (:obj:`str`, optional):
Mode to use with the Sobel filter. See
`scipy.ndimage.sobel`_.
sobel_enhance (:obj:`int`, optional):
If slit edges are not well-defined (e.g. blurred) set this
parameter to a number greater than zero to enhance the edge detection.
sigdetect (:obj:`float`, optional):
Threshold for edge detection.
grow_bpm (int, optional):
The sobel_sig and edg_img are masked using the bpm. This is done by convolving the bpm with
a spatial boxcar of width grow_bpm pixels, ensuring that pixels which touched bad pixels are also masked.
Returns:
tuple: Returns two `numpy.ndarray`_ objects -- (1) The image of the
significance of the edge detection in sigma and (2) the array
isolating the slit edges. In the latter, left edges have a
value of -1 and right edges have a value of 1.
"""
# Checks
if flux.ndim != 2:
msgs.error('Trace image must be 2D.')
if bpm is not None and bpm.shape != flux.shape:
msgs.error('Mismatch in mask and trace image shapes.')
# Specify how many times to repeat the median filter. Even better
# would be to fit the filt/sqrt(abs(binarr)) array with a Gaussian
# near the maximum in each column
msgs.info("Detecting slit edges in the trace image")
# Generate sqrt image
sqmstrace = np.sqrt(np.abs(flux))
# Median filter
# TODO: Add size to parameter list
for ii in range(median_iterations):
sqmstrace = ndimage.median_filter(sqmstrace, size=(7, 3))
# Replace pixel values near 0
sqmstrace[(sqmstrace < 1.0) & (sqmstrace >= 0.0)] = 1.0
sqmstrace[(sqmstrace > -1.0) & (sqmstrace <= 0.0)] = -1.0
# Filter with a Sobel
filt = ndimage.sobel(sqmstrace, axis=1, mode=sobel_mode)
# Enhance blurred edges
if sobel_enhance > 0:
filt = sobel_enhance * ndimage.uniform_filter1d(filt, size=sobel_enhance, axis=1)
# Apply the bad-pixel mask
if bpm is not None:
# NOTE: Casts to float because filt is float
filt *= (1.0 - bpm)
# Significance of the edge detection
sobel_sig = np.sign(filt)*np.power(filt,2)/np.maximum(sqmstrace, min_sqm)
# First edges assigned according to S/N
# TODO: why not match the sign of the Sobel image to the edge it
# traces? I.e., why is the sign flipped?
# Answer: I defined left as -1 (i.e. counting from left to right (-1, 0, +1) = (left, middle, right)
tedges = np.zeros(flux.shape, dtype=float)
tedges[np.where(sobel_sig > sigdetect)] = -1.0 # A positive gradient is a left edge
tedges[np.where(sobel_sig < -sigdetect)] = 1.0 # A negative gradient is a right edge
# Clean the edges
wcl = np.where((ndimage.maximum_filter1d(sobel_sig, 10, axis=1) == sobel_sig) & (tedges == -1))
wcr = np.where((ndimage.minimum_filter1d(sobel_sig, 10, axis=1) == sobel_sig) & (tedges == 1))
edge_img = np.zeros(sobel_sig.shape, dtype=int)
edge_img[wcl] = -1
edge_img[wcr] = 1
if bpm is not None:
msgs.info("Applying bad pixel mask")
# JFH grow the bad pixel mask in the spatial direction
_nave = np.fmin(grow_bpm, flux.shape[0])
# Construct the kernel for mean calculation
kernel = np.ones((1, _nave)) / float(_nave)
bpm_grow = ndimage.convolve(bpm.astype(float), kernel, mode='nearest') > 0.0
edge_img *= (1 - bpm_grow)
sobel_sig *= (1 - bpm_grow)
return sobel_sig, edge_img
[docs]def identify_traces(edge_img, max_spatial_separation=4, follow_span=10, minimum_spec_length=50):
r"""
Follow slit edges to identify unique slit traces.
Args:
edge_img (`numpy.ndarray`_):
An array marked with -1 for left slit edges and +1 for
right slit edges and 0 everywhere else. The image *must*
follow the pypeit convention, with shape :math:`(N_{\rm
spec},N_{\rm spat})`. See :func:`detect_slit_edges`.
max_spatial_separation (:obj:`int`, optional):
The maximum spatial separation between two edges in proximal
spectral rows before they become separated into different
slit traces.
follow_span (:obj:`int`, optional):
The number of previous spectral rows to consider when
following slits forward.
minimum_spec_length (:obj:`int`, optional):
The minimum number of spectral rows in an edge trace.
Traces that do not meet this criterion are ignored.
Returns:
`numpy.ndarray`_: An integer array with trace ID numbers at
pixels locating the edge of that trace. Negative traces are
for left edges, positive for right edges. The number of left
and right traces can be determined using
:func:`count_edge_traces`. Pixels not associated to any edge
have a value of 0.
"""
msgs.info('Finding unique traces among detected edges.')
# Check the input
if edge_img.ndim > 2:
msgs.error('Provided edge image must be 2D.')
if not np.all(np.isin(np.unique(edge_img), [-1,0,1])):
msgs.error('Edge image must only have -1, 0, or 1 values.')
# No edges were detected.
if np.all(edge_img == 0):
msgs.warn('No edges were found!')
return np.zeros_like(edge_img, dtype=int)
# Find the left and right coordinates
lx, ly = np.where(edge_img == -1)
rx, ry = np.where(edge_img == 1)
x = np.concatenate((lx, rx))
# Put left traces at negative y
y = np.concatenate((-ly, ry))
# The trace ID to associate with each coordinate
traceid = np.full_like(x, -1)
# Loop over spectral channels
last = 0
for row in range(np.amin(x), np.amax(x)+1):
# Find the slit edges in this row
indx = x == row
in_row = np.sum(indx)
if in_row == 0:
# No slits found in this row
continue
# Find the unique edge y positions in the selected set of
# previous rows and their trace IDs
prev_indx = np.logical_and(x < row, x > row - follow_span)
if not np.any(prev_indx):
# This is likely the first row or the first row with any
# slit edges
traceid[indx] = np.arange(in_row)+last
last += in_row
continue
uniq_y, uniq_i = np.unique(y[prev_indx], return_index=True)
uniq_t = traceid[prev_indx][uniq_i]
# Assign trace IDs to this row
# - First match to any previous IDs
row_trace = np.full(in_row, -1)
for i, _y in enumerate(y[indx]):
dist = np.absolute(uniq_y-_y)
mindist = np.argmin(dist)
if dist[mindist] < max_spatial_separation:
row_trace[i] = uniq_t[mindist]
# - Assign new trace IDs to unmatched edges
unassigned = row_trace == -1
n_unassigned = np.sum(unassigned)
row_trace[unassigned] = np.arange(n_unassigned)+last
last += n_unassigned
# - Assign all edges and continue
traceid[indx] = row_trace
# Reorder the traces and remove any that do not meet the specified
# length.
# TODO: This duplicates functionality in EdgeTraceSet. Rethink
# this.
# - Left edges. Given negative IDs starting with -1
indx = y < 0
left, reconstruct, counts = np.unique(traceid[indx], return_inverse=True, return_counts=True)
# if np.any(counts > edge_img.shape[0]):
# warnings.warn('Some traces have more pixels than allowed by the image. The maximum '
# 'spatial separation for the edges in a given trace may be too large.')
good_trace = counts > minimum_spec_length
left[:] = 0
left[good_trace] = -1-np.arange(np.sum(good_trace))
traceid[indx] = left[reconstruct]
# - Right edges. Given positive IDs starting with 1
indx = np.invert(indx)
right, reconstruct, counts = np.unique(traceid[indx], return_inverse=True, return_counts=True)
# if np.any(counts > edge_img.shape[0]):
# warnings.warn('Some traces have more pixels than allowed by the image. The maximum '
# 'spatial separation for the edges in a given trace may be too large.')
good_trace = counts > minimum_spec_length
right[:] = 0
right[good_trace] = 1+np.arange(np.sum(good_trace))
traceid[indx] = right[reconstruct]
# Construct the image with the trace IDs and return
traceid_img = np.zeros_like(edge_img, dtype=int)
traceid_img[x,np.absolute(y)] = traceid
return traceid_img
[docs]def count_edge_traces(edge_img):
"""
Count the number of left and right edges traced.
Args:
edge_img (`numpy.ndarray`_):
Image with edge trace pixels numbered by their associated
trace. Pixels with positive numbers follow right slit edges
and negative numbers follow left slit edges.
Returns:
int or tuple:
Two integers with the number of left and right edges,
respectively. Or 0 if the minimum input value is 0
"""
# Avoid returning -0
nleft = np.amin(edge_img)
return 0 if nleft == 0 else -nleft, np.amax(edge_img)
[docs]def atleast_one_edge(edge_img, bpm=None, flux_valid=True, buffer=0, copy=False):
"""
Ensure that there is at least one left and one right slit edge
identified.
This is especially useful for long slits that fill the full
detector, e.g. Shane Kast.
Args:
edge_img (`numpy.ndarray`_):
Image with edge trace pixels numbered by their associated
trace. Pixels with positive numbers follow right slit edges
and negative numbers follow left slit edges.
bpm (`numpy.ndarray`_, optional):
Integer (0 unmasked; 1 masked) or boolean array indicating
bad pixels in the image. If None, all pixels are considered
good.
flux_valid (:obj:`bool`, optional):
The flux in the image used to construct the edge traces is
valid meaning that any problems should not be an issue with
the trace image itself.
buffer (:obj:`int`, optional):
If adding an edge, this is the minimum number of pixels
near the detector edge at which to place the edge.
copy (:obj:`bool`, optional):
Copy `edge_img` to a new array before making any
modifications. Otherwise, `edge_img` is modified in-place.
Returns:
`numpy.ndarray`_: The modified trace image, which is either a
new array or points to the in-place modification of `edge_img`
according to the value of `copy`. If no slit edges were found
and the flux in the trace image is invalid (`flux_valid=False`),
function returns `None`.
"""
# Get the number of traces
nleft, nright = count_edge_traces(edge_img)
# Determine whether or not to edit the image in place
_edge_img = edge_img.copy() if copy else edge_img
if nleft != 0 and nright != 0:
# Don't need to add anything
return _edge_img
if nleft == 0 and nright == 0 and not flux_valid:
# No traces and fluxes are invalid.
# TODO: This used to just be a warning, but I'm having it stop
# the code if no traces are found and the flux is low.
msgs.error('Unable to trace any edges! Image flux is low; check trace image is correct.')
# Use the mask to determine the first and last valid pixel column
sum_bpm = np.zeros(edge_img.shape[1]) if bpm is None else np.sum(bpm, axis=0)
if nleft == 0:
# Add a left edge trace at the first valid column
msgs.warn('No left edge found. Adding one at the detector edge.')
gdi0 = np.min(np.where(sum_bpm[buffer:] == 0)[0]) + buffer
_edge_img[:,gdi0] = -1
if nright == 0:
# Add a right edge trace at the last valid column
msgs.warn('No right edge found. Adding one at the detector edge.')
gdi1 = np.max(np.where(sum_bpm[:-buffer] == 0)[0])
_edge_img[:,gdi1] = 1
return _edge_img
# TODO: This needs to be better tested
[docs]def handle_orphan_edges(edge_img, sobel_sig, bpm=None, flux_valid=True, buffer=0, copy=False):
"""
In the case of single left/right traces and multiple matching
traces, pick the most significant matching trace and remove the
others.
If *no* left and/or right edge is present, this will add one using
:func:`atleast_one_edge`.
Args:
edge_img (`numpy.ndarray`_):
Image with edge trace pixels numbered by their associated
trace. Pixels with positive numbers follow right slit edges
and negative numbers follow left slit edges.
sobel_sig (`numpy.ndarray`_):
Image with the significance of the edge detection. See
:func:`detect_slit_edges`.
bpm (`numpy.ndarray`_, optional):
Integer (0 unmasked; 1 masked) or boolean array indicating
bad pixels in the image. If None, all pixels are considered
good.
flux_valid (:obj:`bool`, optional):
The flux in the image used to construct the edge traces is
valid meaning that any problems should not be an issue with
the trace image itself.
buffer (:obj:`int`, optional):
If adding an edge, this is the minimum number of pixels
near the detector edge at which to place the edge; see
:func:`atleast_one_edge`.
copy (:obj:`bool`, optional):
Copy `edge_img` to a new array before making any
modifications. Otherwise, `edge_img` is modified
in-place.
Returns:
`numpy.ndarray`_: The modified trace image, which is either a
new array or points to the in-place modification of
`edge_img` according to the value of `copy`.
"""
# Get the number of traces
nleft, nright = count_edge_traces(edge_img)
# if nleft == 0 or nright == 0:
# # Deal with no left or right edges
# # TODO: I think we should skip this and handle it in sync()
# _edge_img = atleast_one_edge(edge_img, bpm=bpm, flux_valid=flux_valid, buffer=buffer,
# copy=copy)
# # Update the number of edges
# nleft, nright = count_edge_traces(_edge_img)
# else:
# # Just do basic setup
# _edge_img = edge_img.copy() if copy else edge_img
_edge_img = edge_img.copy() if copy else edge_img
# if nleft != 1 and nright != 1 or nleft == 1 and nright == 1:
if nleft == 0 or nright == 0 or nleft != 1 and nright != 1 or nleft == 1 and nright == 1:
# Nothing to do
return _edge_img
if nright > 1:
# To get here, nleft must be 1. This is mainly in here for
# LRISb, which is a real pain..
msgs.warn('Only one left edge, and multiple right edges.')
msgs.info('Restricting right edge detection to the most significantly detected edge.')
# Find the most significant right trace
best_trace = np.argmin([-np.median(sobel_sig[_edge_img==t]) for t in range(nright)])+1
# Remove the other right traces
indx = _edge_img == best_trace
_edge_img[(_edge_img > 0) & np.invert(indx)] = 0
# Reset the number to a single right trace
_edge_img[indx] = 1
return _edge_img
# To get here, nright must be 1.
msgs.warn('Only one right edge, and multiple left edges.')
msgs.info('Restricting left edge detection to the most significantly detected edge.')
# Find the most significant left trace
best_trace = np.argmax([np.median(sobel_sig[_edge_img == -t]) for t in range(nleft)])+1
# Remove the other left traces
indx = _edge_img == best_trace
_edge_img[(_edge_img > 0) & np.invert(indx)] = 0
# Reset the number to a single left trace
_edge_img[indx] = 1
return _edge_img
[docs]def most_common_trace_row(trace_bpm, valid_frac=1/3.):
## JFH DO not use row and column in the docs!!!! Change everywhere to spectral spatial. Traces always
## run spectral
"""
Find the spectral position (row) that crosses the most traces.
If provided the mask for a single trace, this just returns the
median of the unmasked rows.
Args:
trace_bpm (`numpy.ndarray`_):
Bad-pixel mask for the trace data (True is bad; False is
good). Can be a 1D array for a single trace or a 2D array
with shape (nspec, ntrace) for multiple traces.
valid_frac (:obj:`float`, optional):
The valid fraction of the detector from which to choose
the row. For example, if 1/3, only choose the row from
the central third of the rows.
Returns:
:obj:`int`: The row that crosses the most valid trace data.
"""
if trace_bpm.ndim == 1 or trace_bpm.shape[1] == 1:
# Only a single vector provided. Use the central valid pixel
rows = np.where(np.invert(np.squeeze(trace_bpm)))[0]
return rows[rows.size//2]
s,e = ((0.5 + np.array([-1,1])*valid_frac/2)*trace_bpm.shape[0]).astype(int)
gpm = np.invert(trace_bpm[s:e,:])
n_good = np.sum(gpm, axis=0)
if np.all(n_good == e-s):
# Trace positions are all valid over this section of the
# detector, so just so use the central row
return trace_bpm.shape[0]//2
# Return the row with the most unmasked trace positions
return Counter(np.where(gpm)[0]).most_common(1)[0][0] + s
[docs]def prepare_sobel_for_trace(sobel_sig, bpm=None, boxcar=5, side='left'):
"""
Prepare the Sobel filtered image for tracing.
This method:
- Flips and/or truncates the pixel values based on the edge
side to be traced (see ``side``).
- Smooths along rows (spatially)
Args:
sobel_sig (`numpy.ndarray`_):
Image with the significance of the edge detection; see
:func:`detect_slit_edges`.
bpm (`numpy.ndarray`_, optional):
Boolean bad-pixel mask (bad pixels are True). If None, all pixels
are considered good. Must have the same shape as ``sobel_sig``.
boxcar (:obj:`int`, optional):
Boxcar smooth the detection image along rows before recentering the
edge centers; see :func:`~pypeit.utils.boxcar_smooth_rows`. If less
than 1, no smoothing is performed.
side (:obj:`str`, optional):
The side that the image will be used to trace. In the Sobel image,
positive values are for left traces, negative for right traces. If
``'left'``, the image is clipped at a minimum value of -0.1. If
``'right'``, the image sign is flipped and then clipped at a minimum
of -0.1. If None, the image is not flipped or clipped, only
smoothed.
Returns:
`numpy.ndarray`_: The smoothed image.
"""
# NOTE: This performs the operations of what was previously
# performed on the object passed to trace_crude_init, as well as
# the smoothing done at the beginning of that function
if side not in ['left', 'right', None]:
raise ValueError('Side must be left, right, or None.')
# Keep both sides if the side is undefined.
# TODO: This 0.1 is drawn out of the ether and different from what is done in peak_trace
img = sobel_sig if side is None else np.maximum((1 if side == 'left' else -1)*sobel_sig, 0.1)
# Returned the smoothed image
wgt = None if bpm is None else np.logical_not(bpm).astype(float)
return utils.boxcar_smooth_rows(img, boxcar, wgt=wgt, replace='zero')
[docs]def follow_centroid(flux, start_row, start_cen, ivar=None, bpm=None, fwgt=None, width=6.0,
maxshift_start=0.5, maxshift_follow=0.15, maxerror=0.2, continuous=True,
bitmask=None):
"""
Follow the centroid of features in an image along the first axis.
In the normal pypeit usage, this follows the spatial position
(column) of a feature in the image as a function of spectral
position (row).
Starting from a specified row and input centers along each
column, attempt to follow a set of features to both lower and
higher rows in the provided image.
Importantly, this function does not treat each row independently
(as would be the case for a direct call to
:func:`masked_centroid` for centroid positions along all rows),
but treats the calculation of the centroids sequentially where
the result for each row is dependent on and starts from the
result from the previous row. The only independent measurement is
the one performed at the input `start_row`. This function is much
slower than :func:`masked_centroid` because of this introduced
dependency.
.. note::
- This is an adaptation of ``trace_crude`` from ``idlspec2d``.
- You should consider smoothing the input ``flux`` array
before passing it to this function. See
:func:`pypeit.utils.boxcar_smooth_rows`. For example::
smimg = utils.boxcar_smooth_rows(img, nave, wgt=inmask)
cen, cene, cenm = trace.follow_centroid(smimg, ...)
Args:
flux (`numpy.ndarray`_):
Image used to weight the column coordinates when
recentering. For example, when tracing slit edges, this
should typically be the Sobel-filtered trace image after
adjusting for the correct side and performing any
smoothing; see :func:`prepare_sobel_for_trace`. In any
case, consider that this image may need to be smoothed
for robust output from this function. See
:func:`pypeit.utils.boxcar_smooth_rows`.
start_row (:obj:`int`):
Row at which to start the calculation. The function
begins with this row and then continues first to higher
indices and then to lower indices.
start_cen (:obj:`int`, `numpy.ndarray`_, optional):
One or more coordinates to recenter. If an array, must be
1D.
ivar (`numpy.ndarray`_, optional):
Inverse variance in the image. If not provided, unity
variance is assumed. Used for the calculation of the
errors in the moment analysis. If this is not provided,
be careful with the value set for `maxerror` (see below).
bpm (`numpy.ndarray`_, optional):
A boolean bad-pixel mask used to ignore pixels in the
image (bad pixels are True).
fwgt (`numpy.ndarray`_, optional):
An additional weight to apply to each pixel in `flux`. If
None, weights are uniform.
width (:obj:`float`, `numpy.ndarray`_, optional):
The size of the window about the provided starting center
for the moment integration window. See
:func:`pypeit.core.moment.moment1d`.
maxshift_start (:obj:`float`, optional):
Maximum shift in pixels allowed for the adjustment of the
first row analyzed.
maxshift_follow (:obj:`float`, optional):
Maximum shift in pixels between centroids in adjacent
rows as the routine follows the feature away from the
first row analyzed.
maxerror (:obj:`float`, optional):
Maximum allowed error in the centroid returned by
:func:`pypeit.core.moment.moment1d`.
continuous (:obj:`bool`, optional):
Keep only the continuous part of the feature trace from
the starting row.
bitmask (:class:`pypeit.bitmask.BitMask`, optional):
Object used to flag the feature traces. If None,
assessments use a boolean array to flag traces. If not
None, errors will be raised if the object cannot
interpret the correct flag names defined. This function
uses the DISCONTINUOUS flag.
Returns:
tuple: Three numpy arrays are returned. the optimized center, an
estimate of the error, and a bad-pixel mask (masked values
are True).
"""
if flux.ndim != 2:
raise ValueError('Input image must be 2D.')
# Shape of the image with pixel weights
nr, nc = flux.shape
# Instantiate theses supplementary arrays here to speed things up
# in iterative calling of moment1d. moment1d will check the array
# sizes. TODO: moment1d no longer instantiates these if they're not
# provided, so this likely isn't necessary; testing required.
_ivar = np.ones_like(flux, dtype=float) if ivar is None else ivar
_bpm = np.zeros_like(flux, dtype=bool) if bpm is None else bpm
_fwgt = np.ones_like(flux, dtype=float) if fwgt is None else fwgt
# Number of starting coordinates
_cen = np.atleast_1d(start_cen)
nt = _cen.size
# Check coordinates are within the image
if np.any((_cen > nc-1) | (_cen < 0)) or start_row < 0 or start_row > nr-1:
raise ValueError('Starting coordinates incompatible with input image!')
# Check the dimensionality
if _cen.ndim != 1:
raise ValueError('Input coordinates to be at most 1D.')
# Instantiate output; just repeat input for all image rows.
xc = np.tile(_cen, (nr,1)).astype(float)
xe = np.zeros_like(xc, dtype=float)
xm = np.zeros_like(xc, dtype=bool) if bitmask is None \
else np.zeros_like(xc, dtype=bitmask.minimum_dtype())
# NOTE: This is effectively the old trace_crude_init
# Recenter the starting row
i = start_row
xc[i,:], xe[i,:], xm[i,:] = masked_centroid(flux, xc[i,:], width, ivar=_ivar, bpm=_bpm,
fwgt=_fwgt, row=i, maxshift=maxshift_start,
maxerror=maxerror, bitmask=bitmask, fill='bound')
# Go to higher indices using the result from the previous row
for i in range(start_row+1,nr):
xc[i,:], xe[i,:], xm[i,:] = masked_centroid(flux, xc[i-1,:], width, ivar=_ivar, bpm=_bpm,
fwgt=_fwgt, row=i, maxshift=maxshift_follow,
maxerror=maxerror, bitmask=bitmask,
fill='bound')
# Go to lower indices using the result from the previous row
for i in range(start_row-1,-1,-1):
xc[i,:], xe[i,:], xm[i,:] = masked_centroid(flux, xc[i+1,:], width, ivar=_ivar, bpm=_bpm,
fwgt=_fwgt, row=i, maxshift=maxshift_follow,
maxerror=maxerror, bitmask=bitmask,
fill='bound')
# NOTE: In edgearr_tcrude, skip_bad (roughly opposite of continuous
# here) was True by default, meaning continuous would be False by
# default.
if not continuous:
# Not removing discontinuous traces
return xc, xe, xm
# Keep only the continuous part of the trace starting from the
# initial row
# TODO: Instead keep the longest continuous segment?
# TODO: Convert continuous from T/F to a tolerance that, e.g.,
# allows for few-pixel discontinuities, but allows the trace to
# pick back up again
bad = xm > 0
p = np.arange(nr)
for i in range(nt):
indx = bad[:,i] & (p > start_row)
if np.any(indx):
s = np.amin(p[indx])
xm[s:,i] = True if bitmask is None else bitmask.turn_on(xm[s:,i], 'DISCONTINUOUS')
indx = bad[:,i] & (p < start_row)
if np.any(indx):
e = np.amax(p[indx])-1
xm[:e,i] = True if bitmask is None else bitmask.turn_on(xm[:e,i], 'DISCONTINUOUS')
# Return centers, errors, and mask
return xc, xe, xm
[docs]def masked_centroid(flux, cen, width, ivar=None, bpm=None, fwgt=None, row=None,
weighting='uniform', maxshift=None, maxerror=None, bitmask=None, fill='input',
fill_error=-1):
"""
Measure the centroid within 1D apertures and flag and fill bad
results.
This is primarily a wrapper for
:func:`pypeit.core.moment.moment1d` that flags the output.
Data are flagged for the following reasons:
- The centroids must always be within the aperture used by
:func:`pypeit.core.moment.moment1d` (`OUTSIDEAPERTURE`).
- The centroid cannot be within half an aperture of the image
edge (`EDGEBUFFER`).
- If `maxerror` is not None, the centroid error must be below
the provided maximum (`MOMENTERROR`).
- If `maxshift` is not None, the centroid must be different
from the input value by less than the maximum
(`LARGESHIFT`).
The flags are provided as either a boolean array or an array of
mask bits, depending on the value of `bitmask` (see below). The
bitmask flags used for each criteria are given in the list above.
If `fill` is 'input', the output centroids are replaced by the
input value for any measurements captured by these flags. For
centroids above `maxshift`, however, the replacement value can be
at the maximum shift if `fill='bound'`.
Args:
flux (`numpy.ndarray`_):
Array used for the centroid calculations.
cen (`numpy.ndarray`_):
Current estimate of the trace center.
width (:obj:`float`):
Passed directly to :func:`pypeit.core.moment.moment1d`;
see the documentation there.
ivar (`numpy.ndarray`_, optional):
Inverse variance in `flux`; passed directly to
:func:`pypeit.core.moment.moment1d`.
bpm (`numpy.ndarray`_, optional):
Boolean bad-pixel mask for `flux`; passed directly to
:func:`pypeit.core.moment.moment1d`.
fwgt (`numpy.ndarray`_, optional):
A weight to apply to each pixel in `flux`; passed
directly to :func:`pypeit.core.moment.moment1d`.
row (:obj:`int`, optional):
Row (index along the first axis; spectral position) in
`flux` at which to recenter the trace position. See
`row` in :func:`pypeit.core.moment.moment1d`.
weighting (:obj:`str`, optional):
Passed directly to :func:`pypeit.core.moment.moment1d`;
see the documentation there.
maxshift (:obj:`float`, optional):
Maximum shift allowed between the input and recalculated
centroid. If None, no limit is applied.
maxerror (:obj:`float`, optional):
Maximum error allowed in the calculated centroid.
Measurements with errors larger than this value are
returned as the input center value. If None, no limit is
applied.
bitmask (:class:`pypeit.bitmask.BitMask`, optional):
Object used to toggle the returned bit masks. If
provided, must be able to interpret MATHERROR,
OUTSIDEAPERTURE, EDGEBUFFER, MOMENTERROR, and LARGESHIFT
flags. If None, the function returns boolean flags set to
True if there was an error in
:func:`pypeit.core.moment.moment1d` or if the error is
larger than `maxerror` (and `maxerror` is not None);
centroids that have been altered by the maximum shift are
*not* flagged.
fill (:obj:`str`, optional):
A string keyword specifying how flagged centroids should
be replaced. Options are:
- 'input': Replace the flagged centroids with their
input value.
- 'bound': *Only* for the case where the
centroid is outside the provided `maxshift`,
replace the centroid by the value at either the
positive or negative boundary edge. In all other
cases, the centroid is still replaced with the
input value. See replacement cases above.
fill_error (:obj:`float`, optional):
For flagged centroids, this error is replaced with this
dummy value.
Returns:
tuple: Returns three `numpy.ndarray`_ objects. the new centers, the
center errors, and the measurement flags with a data type
depending on `bitmask`.
"""
# Calculate the moments
radius = width/2
xfit, xerr, matherr = moment.moment1d(flux, cen, width, ivar=ivar, bpm=bpm, fwgt=fwgt,
row=row, weighting=weighting, order=1,
fill_error=fill_error)
# Flag centroids outide the aperture and too close to the image edge
outside_ap = (np.absolute(xfit - cen) > radius + 0.5)
edge_buffer = (xfit < radius - 0.5) | (xfit > flux.shape[1] - 0.5 - radius)
indx = matherr | outside_ap | edge_buffer
xfit[indx] = cen[indx]
xerr[indx] = fill_error
if maxshift is None and maxerror is None and bitmask is None:
# Nothing else to do
## TODO: JFH It seems like this shold be returning indx here and not matherr if bitmask is None.
return xfit, xerr, indx
# Flag large shifts
if maxshift is not None:
large_shift = np.absolute(xfit-cen) > maxshift
if fill == 'bound':
xfit = np.clip(xfit - cen, -maxshift, maxshift)+cen
else:
indx |= large_shift
# Flag large errors
if maxerror is not None:
large_error = xerr > maxerror
indx |= large_error
# Replace flagged data with the input and dummy error
xfit[indx] = cen[indx]
xerr[indx] = fill_error
# Toggle the mask bits
if bitmask is not None:
xmsk = np.zeros_like(xfit, dtype=bitmask.minimum_dtype())
xmsk[matherr] = bitmask.turn_on(xmsk[matherr], 'MATHERROR')
xmsk[outside_ap] = bitmask.turn_on(xmsk[outside_ap], 'OUTSIDEAPERTURE')
xmsk[edge_buffer] = bitmask.turn_on(xmsk[edge_buffer], 'EDGEBUFFER')
if maxerror is not None:
xmsk[large_error] = bitmask.turn_on(xmsk[large_error], 'MOMENTERROR')
if maxshift is not None:
xmsk[large_shift] = bitmask.turn_on(xmsk[large_shift], 'LARGESHIFT')
# Return the new centers, errors, and flags
return xfit, xerr, indx if bitmask is None else xmsk
# NOTE: keck_run_july changes: maxdev changed from 5.0 to 2.0
[docs]def fit_trace(flux, trace_cen, order, ivar=None, bpm=None, trace_bpm=None, weighting='uniform',
fwhm=3.0, maxshift=None, maxerror=None, function='legendre', maxdev=2.0, maxiter=25,
niter=9, bitmask=None, debug=False, idx=None, xmin=None, xmax=None,
flavor='trace'):
"""
Iteratively fit the trace of a feature in the provided image.
Each iteration performs two steps:
- Remeasure the trace centroid using :func:`masked_centroid`.
The size of the integration window (see the definition of
the `width` parameter for
:func:`pypeit.core.moment.moment1d`) depends on the type of
weighting: For *uniform weighting*, the code does a third
of the iterations with `width = 2*1.3*fwhm`, a third with
`width = 2*1.1*fhwm`, and a third with `width = 2*fwhm`.
For *Gaussian weighting*, all iterations use `width =
fwhm/2.3548`.
- Fit the centroid measurements with a 1D function of the
provided order. See :func:`pypeit.core.pydl.TraceSet`.
The number of iterations performed is set by the keyword argument
`niter`. There is no convergence test, meaning that this number
of iterations is *always* performed.
Notes:
Revision History:
- 23-June-2018 Written by J. Hennawi
Args:
flux (`numpy.ndarray`_):
Image to use for tracing. Must be 2D with shape (nspec,
nspat).
trace_cen (`numpy.ndarray`_):
Initial guesses for trace centroids in the spatial
dimension. This can either be an 2-d array with shape
(nspec, nTrace) array, or a 1-d array with shape (nspec)
for the case of a single trace.
order (:obj:`int`):
Order of function to fit to each trace. See `function`.
ivar (`numpy.ndarray`_, optional):
Inverse variance of the image intensity. If not provided,
unity variance is used. If provided, must have the same
shape as `flux`.
bpm (`numpy.ndarray`_, optional):
Boolean array with the input bad-pixel mask for the image
(pixels to ignore are True). If not provided, all values
in `flux` are considered valid. If provided, must have
the same shape as `flux`.
trace_bpm (`numpy.ndarray`_, optional):
Boolean array with the trace bad-pixel mask; i.e., places
where you know the trace is going to be bad that you
always want to mask in the fits. Shape must match
`trace_cen`.
weighting (:obj:`str`, optional):
The weighting to apply to the position within each
integration window (see options in
:func:`pypeit.core.moment.moment1d`).
fwhm (:obj:`float`, optional):
The expected width of the feature to trace, which is used
to define the size of the integration window during the
centroid calculation; see description above.
maxshift (:obj:`float`, optional):
Maximum shift allowed between the input and recalculated
centroid (see :func:`masked_centroid`). If None, no limit
is applied.
maxerror (:obj:`float`, optional):
Maximum error allowed in the calculated centroid (see
:func:`masked_centroid`). Measurements with errors larger
than this value are returned as the input center value.
If None, no limit is applied.
function (:obj:`str`, optional):
The name of the function to fit. Must be a valid
selection. See :class`pypeit.core.pydl.TraceSet`.
maxdev (:obj:`float`, optional):
If provided, reject points with `abs(data-model) >
maxdev` during the fitting. If None, no points are
rejected. See :func:`~pypeit.core.fitting.robust_fit`.
maxiter (:obj:`int`, optional):
Maximum number of rejection iterations allowed during the
fitting. See :func:`~pypeit.core.fitting.robust_fit`.
niter (:obj:`int`, optional):
The number of iterations for this method; i.e., the
number of times the two-step fitting algorithm described
above is performed.
bitmask (:class:`pypeit.bitmask.BitMask`, optional):
Object used to toggle the returned bit masks in edge
centroid measurements; see :func:`masked_centroid`.
debug (:obj:`bool`, optional):
Plot the data and the fits.
idx (`numpy.ndarray`_, optional):
Array of strings with the IDs for each object. Used only
if `debug` is true for the plotting. Default is just a
running number.
xmin (:obj:`float`, optional):
Lower reference for robust_fit polynomial fitting.
Default is to use zero
xmax (:obj:`float`, optional):
Upper reference for robust_fit polynomial fitting.
Default is to use the image size in nspec direction
flavor (:obj:`str`, optional):
Defines the type of fit performed. Only used by QA
Returns:
:obj:`tuple`: Returns four `numpy.ndarray`_ objects all with
the same shape as the input positions (`trace_cen`) and
provide:
- The best-fitting positions of each trace determined by the
polynomial fit.
- The centroids of the trace determined by either flux- or
Gaussian-weighting, to which the polynomial is fit.
- The errors in the centroids.
- Boolean flags for each centroid measurement (see
:func:`~pypeit.core.moment.moment1d`).
"""
# Ensure setup is correct
if flux.ndim != 2:
raise ValueError('Input image must be 2D.')
if ivar is None:
ivar = np.ones_like(flux, dtype=float)
if ivar.shape != flux.shape:
raise ValueError('Inverse variance array shape is incorrect.')
if bpm is None:
bpm = np.zeros_like(flux, dtype=bool)
if bpm.shape != flux.shape:
raise ValueError('Mask array shape is incorrect.')
fwgt = np.ones_like(flux, dtype=float)
if trace_bpm is None:
trace_bpm = np.zeros_like(trace_cen, dtype=bool)
if trace_bpm.shape != trace_cen.shape:
raise ValueError('Trace mask array shape is incorrect.')
# Allow for single vectors as input
_trace_cen = trace_cen.reshape(-1,1) if trace_cen.ndim == 1 else trace_cen
_trace_bpm = trace_bpm.reshape(-1, 1) if trace_cen.ndim == 1 else trace_bpm
nspec, ntrace = _trace_cen.shape
if _trace_cen.shape != _trace_bpm.shape:
raise ValueError('Trace data and its bad-pixel mask do not have the same shape.')
# Define the fitting limits
if xmin is None:
xmin = 0.0
if xmax is None:
xmax = float(nspec-1)
# Setup the width to use for each iteration depending on the weighting used
width = np.full(niter, 2*fwhm if weighting == 'uniform' else fwhm/2.3548, dtype=float)
if weighting == 'uniform':
width[:niter//3] *= 1.3
width[niter//3:2*niter//3] *= 1.1
# Abscissa for fitting; needs to be float type when passed to
# TraceSet
trace_coo = np.tile(np.arange(nspec), (ntrace,1)).astype(float)
# Values to fit
trace_fit = np.copy(_trace_cen)
# Uniform weighting during the fit
trace_fit_ivar = np.ones_like(trace_fit)
# NOTE: keck_run_july changes: Added down-weighting of masked parts
# of the trace.
# TODO: This feels arbitrary
# trace_fit_ivar[_trace_bpm] = 0.1**2
trace_fit_ivar[_trace_bpm] = 0.01**2
# JFH Changed this parameter from 0.1 to 0.01 to fix problem with NIRES slit extrapolation. In the future
# we may need to make this parameter, but don't touch this number unless you know what you are doing.
# JXP -- Squared to deal with proper usage of numpy legendre fitting routine
for i in range(niter):
# First recenter the trace using the previous trace fit/data.
# See the replacement rules of masked_centroid for how
# measurements that hit boundaries are treated. This replaces
# any measurement that is outside the integration window,
# within a buffer of the detector edge, above a maximum shift,
# or is above the maximum error. In these cases, the output is
# the same as the input and bad_trace has the relevant bit or
# boolean.
cen, err, msk = masked_centroid(flux, trace_fit, width[i], ivar=ivar, bpm=bpm, fwgt=fwgt,
weighting=weighting, maxshift=maxshift, maxerror=maxerror,
bitmask=bitmask)
################################################################
# NOTE: keck_run_july changes: Now always replace the masked
# data with the input trace data; downweight the masked points;
# and include the masked points in the fit.
# Always set the masked values to the initial input trace data.
# The input trace data initially comes from either the standard
# or the slit boundaries, and if we continually replace it for
# all iterations we will naturally always extraplate the trace
# to match the shape of a high S/N ratio fit (i.e. either the
# standard or the flat which was used to determine the slit
# edges.
cen[_trace_bpm] = _trace_cen[_trace_bpm]
# Do not do any kind of masking based on the trace recentering
# errors. Trace fitting is much more robust when masked pixels
# are simply replaced by the input trace values. Therefore,
# masked pixels are not excluded from the fit, but we do give
# them lower weight via that inverse variance. See the
# instantation of trace_fit_ivar above.
################################################################
# Fit the data
traceset = pydl.TraceSet(trace_coo, cen.T,
# Removed by keck_run_july: inmask=np.invert(_trace_bpm.T),
function=function, ncoeff=order, maxdev=maxdev, maxiter=maxiter,
invvar=trace_fit_ivar.T, xmin=xmin, xmax=xmax)
# TODO: Keep this around for now. I wanted to see how each
# iteration affected the centroids and fit.
# if debug:
# bad = msk.astype(bool)
# good = np.invert(bad)
# for i in range(trace_fit.shape[1]):
# plt.scatter(trace_coo[i,:], trace_fit[:,i], color='0.7', marker='.', s=50, lw=0,
# label='input')
# plt.scatter(trace_coo[i,good[:,i]], trace_cen[good[:,i],i],
# color='k', marker='.', s=50, lw=0, label='output')
# plt.scatter(trace_coo[i,bad[:,i]], trace_cen[bad[:,i],i],
# color='C1', marker='x', s=20, lw=0.5, label='bad output')
# plt.scatter(trace_coo[i,:], traceset.yfit[i,:], color='r', marker='.', s=50, lw=0,
# label='fit')
# plt.show()
# TODO: Report iteration number and mean/stddev in difference
# of coefficients with respect to previous iteration
# TODO: Do this (as was done before)? This means in the next
# iteration, the values being fit are based on the results of
# this fit even for the bad traces instead of the original
# input data.
trace_fit = traceset.yfit.T
# Plot the final fit if requested
if debug:
# Set the title based on the type of weighting used
title_text = 'Flux Weighted' if weighting == 'uniform' else 'Gaussian Weighted'
if idx is None:
idx = np.arange(1,ntrace+1).astype(str)
# Construct boolean flags
inpgpm = np.invert(_trace_bpm)
cengpm = np.invert(msk.astype(bool))
fitgpm = traceset.outmask.T
bpm_fit = _trace_bpm & fitgpm
bpm_rej = _trace_bpm & np.invert(fitgpm)
gpm_bdcen_fit = inpgpm & np.invert(cengpm) & fitgpm
gpm_bdcen_rej = inpgpm & np.invert(cengpm) & np.invert(fitgpm)
gpm_gdcen_fit = inpgpm & cengpm & fitgpm
gpm_gdcen_rej = inpgpm & cengpm & np.invert(fitgpm)
for i in range(ntrace):
# Plot data masked on input and included in fit using input
# locations and lower weight
if np.any(bpm_fit[:,i]):
plt.scatter(trace_coo[i,bpm_fit[:,i]], cen[bpm_fit[:,i],i], marker='o',
color='cornflowerblue', s=30, label='Input masked, fit')
# Plot data masked on input and included in fit using input
# locations and lower weight, but rejected by the fit
if np.any(bpm_rej[:,i]):
plt.scatter(trace_coo[i,bpm_rej[:,i]], cen[bpm_rej[:,i],i], marker='x',
color='C6', s=30, label='Input masked, fit, rejected')
# Plot data with bad recentroid measurements, included in
# fit using input locations and lower weight
if np.any(gpm_bdcen_fit[:,i]):
plt.scatter(trace_coo[i,gpm_bdcen_fit[:,i]], cen[gpm_bdcen_fit[:,i],i], marker='o',
color='0.7', s=30, label='Centroid masked, fit')
# Plot data with bad recentroid measurements, included in
# fit using input locations and lower weight, but rejected
# by the fit
if np.any(gpm_bdcen_rej[:,i]):
plt.scatter(trace_coo[i,gpm_bdcen_rej[:,i]], cen[gpm_bdcen_rej[:,i],i], marker='x',
color='C1', s=30, label='Centroid masked, fit, rejected')
# Plot data with good recentroid measurements and included
# in fit
if np.any(gpm_gdcen_fit[:,i]):
plt.scatter(trace_coo[i,gpm_gdcen_fit[:,i]], cen[gpm_gdcen_fit[:,i],i], marker='o',
color='k', s=30, label='Remeasured and fit')
# Plot data with good recentroid measurements and included
# in fit but rejected
if np.any(gpm_gdcen_rej[:,i]):
plt.scatter(trace_coo[i,gpm_gdcen_rej[:,i]], cen[gpm_gdcen_rej[:,i],i], marker='x',
color='C3', s=30, label='Remeasured, fit, and rejected')
# Plot all input trace locations as a line
plt.plot(trace_coo[i,:], _trace_cen[:,i], color='C2', linewidth=1.5,
linestyle='--', label='Input Trace Data')
# Plot all input trace locations as a line
plt.plot(trace_coo[i,:], trace_fit[:,i], color='r', linewidth=2.0,
linestyle='--', label='Fit')
plt.title(title_text + ' Centroid fit for trace {0}.'.format(idx[i]))
plt.ylim((0.995*np.amin(trace_fit[:,i]), 1.005*np.amax(trace_fit[:,i])))
if flavor in ['tilts']:
plt.xlabel('Spatial Pixel')
plt.ylabel('Spectral Pixel')
else:
plt.xlabel('Spectral Pixel')
plt.ylabel('Spatial Pixel')
plt.legend()
plt.show()
# Returns the fit, the actual weighted traces and errors, and
# measurement flags for the last iteration
return trace_fit, cen, err, msk, traceset
[docs]def build_trace_bpm(flux, trace_cen, bpm=None, boxcar=None, thresh=None, median_kernel=None):
"""
Construct a bad-pixel mask for edge trace data.
If no keyword arguments are provided, the traces are only masked
when they land outside the bounds of the image.
If both `boxcar` and `thresh` are provided, traces are also
masked by extracting the provided image along the trace (see
:func:`pypeit.core.moment.moment1d`) and flagging extracted
values below the provided threshold.
Args:
flux (`numpy.ndarray`_):
Image to use for tracing. Shape is expected to be (nspec,
nspat).
trace_cen (`numpy.ndarray`_):
Trace locations. Can be a 1D array for a single trace or a
2D array with shape (nspec, ntrace) for multiple traces.
bpm (`numpy.ndarray`_, optional):
Boolean array with the input bad-pixel mask for the
image. If not provided, all values in `flux` are
considered valid. If provided, must have the same shape
as `flux`.
boxcar (:obj:`float`, optional):
The width of the extraction window used for all traces
and spectral rows. If None, the trace mask will not
consider the extracted flux.
thresh (:obj:`float`, optional):
The minimum valid value of the extraced flux used to mask
the traces. If None, the trace mask will not consider the
extracted flux.
median_kernel (:obj:`int`, optional):
The spectral width of the kernel to use with
`scipy.signal.medfilt` to filter the *extracted* data
before setting the trace mask based on the provided
threshold. If None, the extracted data are not filtered
before flagging data below the threshold.
Returns:
`numpy.ndarray`_: The boolean mask for the traces.
"""
# Setup and ensure input is correct
if flux.ndim != 2:
raise ValueError('Input image must be 2D.')
nspec, nspat = flux.shape
if bpm is None:
bpm = np.zeros_like(flux, dtype=bool)
if bpm.shape != flux.shape:
raise ValueError('Mask array shape is incorrect.')
_trace_cen = trace_cen.reshape(-1,1) if trace_cen.ndim == 1 else trace_cen
if _trace_cen.shape[0] != nspec:
raise ValueError('Must provide trace position for each spectral pixel.')
# Flag based on the trace positions
trace_bpm = (_trace_cen < 0) | (_trace_cen > nspat - 1)
if boxcar is None or thresh is None:
# Only flagging based on the trace positions
return trace_bpm
# Get the extracted flux
extract_flux = moment.moment1d(flux, _trace_cen, boxcar, bpm=bpm)[0]
if median_kernel is not None:
# Median filter the extracted data
extract_flux = signal.medfilt(extract_flux, kernel_size=(median_kernel,1))
return trace_bpm | (extract_flux < thresh)
# TODO: Add an option where the user specifies the number of slits, and
# so it takes only the highest peaks from detect_lines
[docs]def peak_trace(flux, ivar=None, bpm=None, trace_map=None, extract_width=None, smash_range=None,
peak_thresh=100.0, peak_clip=None, trough=False, trace_median_frac=0.01,
trace_thresh=10.0, fwhm_uniform=3.0, fwhm_gaussian=3.0, maxshift=None,
maxerror=None, function='legendre', order=5, maxdev=5.0, maxiter=25,
niter_uniform=9, niter_gaussian=6, bitmask=None, debug=False):
"""
Find and trace features in an image by identifying peaks/troughs
after collapsing along the spectral axis.
The image is either compressed directly or after rectification
using the supplied ``trace_map``. The provided trace data *must*
have the same shape as the input ``flux`` image and map each
spatial position as a function of spectral position. This can be
the output of :func:`~pypeit.core.pca.pca_predict` where the
provided coordinates are ``np.arange(flux.shape[1])``; see also
:func:`~pypeit.tracepca.TracePCA.predict`. The rectification of
the input ``flux`` data is done using a boxcar extraction along
the provided traces with a width of ``extract_width``; see
:func:`~pypeit.core.moment.moment1d`.
The (rectified) image is collapsed spectrally (see ``smash_range``)
giving the sigma-clipped mean flux as a function of spatial
position. Peaks are then isolated in this vector (see
:func:`~pypeit.core.arc.detect_lines`).
Traces that pass through these peak positions are then passed to
two iterations of :func:`fit_trace`, which both remeasures the
centroids of the trace and fits a polynomial to those trace data.
The first iteration determines the centroids with uniform
weighting, passing ``fwhm=fwhm_uniform`` to :func:`fit_trace`,
and the second uses Gaussian weighting for the centroid
measurements (passing ``fwhm=fwhm_gaussian`` to
:func:`fit_trace`). The results of this second iteration of
:func:`fit_trace` are the data returned.
Troughs in the image can also be traced, which is done by
flipping the sign of the image about its median and then
repeating the "peak" finding and :func:`fit_trace` iterations. If
troughs are fit, the traces are ordered with the set of peak
traces first (the number of which is given by the last returned
object of the function), followed by the trough traces.
Args:
flux (`numpy.ndarray`_):
Image to use for tracing.
ivar (`numpy.ndarray`_, optional):
Inverse variance of the image intensity. If not provided,
unity variance is used. If provided, must have the same
shape as ``flux``.
bpm (`numpy.ndarray`_, optional):
Boolean array with the input bad-pixel mask for the
image. If not provided, all values in ``flux`` are
considered valid. If provided, must have the same shape
as ``flux``.
trace_map (`numpy.ndarray`_, optional):
Trace data that maps the spatial position of all spectra
as a function of spectral row. For example, this can be
the output of :func:`~pypeit.core.pca.pca_predict` where
the provided coordinates are
``np.arange(flux.shape[1])``; see also
:func:`~pypeit.tracepca.TracePCA.predict`. This is used
to rectify the input image so that spectra are
identically organized along image rows (i.e., to select
the ``i``th spectrum, one would slice with ``[:,i]``).
Shape *must* be identical to ``flux``. If None, ``flux``
is assumed to be rectified on input.
extract_width (:obj:`float`, optional):
The width of the extract aperture to use when rectifying
the flux image. If None, set to ``fwhm_gaussian``.
smash_range (:obj:`tuple`, optional):
Spectral range over which to collapse the input image
into the 1D flux as a function of spatial position. This
1D vector is used to detect features for tracing. This is
useful (and recommended) for defining the relevant
detector range for data with spectra that do not span the
length of the detector. The tuple gives the minimum and
maximum in the fraction of the full spectral length
(nspec). If None, the full image is collapsed.
peak_thresh (:obj:`float`, optional):
The threshold for detecting peaks in the image. See the
``input_thresh`` parameter for
:func:`~pypeit.core.arc.detect_lines`.
peak_clip (:obj:`float`, optional):
Sigma-clipping threshold used to clip peaks with small
values; no large values are clipped. If None, no clipping
is performed. Generally, if the peak detection algorithm
is finding insignificant peaks, one should instead raise
the value of ``peak_thresh``.
trough (:obj:`bool`, optional):
Trace both peaks **and** troughs in the input image. This
is done by flipping the value of the smashed image about
its median value, such that troughs can be identified as
peaks.
trace_median_frac (:obj:`float`, optional):
After rectification of the image and before refitting the
traces, the rectified image is median filtered with a
kernel width of ``trace_median_frac*nspec`` along the
spectral dimension.
trace_thresh (:obj:`float`, optional):
After rectification and median filtering of the image
(see ``trace_median_frac``), values in the resulting
image that are *below* this threshold are masked in the
refitting of the trace using :func:`fit_trace`.
fwhm_uniform (:obj:`float`, optional):
The ``fwhm`` parameter to use when using uniform
weighting in the calls to :func:`fit_trace`. See
description of the algorithm above.
fwhm_gaussian (:obj:`float`, optional):
The ``fwhm`` parameter to use when using Gaussian
weighting in the calls to :func:`fit_trace`. See
description of the algorithm above.
maxshift (:obj:`float`, optional):
Maximum shift allowed between the input and recalculated
centroid (see :func:`fit_trace`).
maxerror (:obj:`float`, optional):
Maximum error allowed in the calculated centroid (see
:func:`fit_trace`).
function (:obj:`str`, optional):
The type of polynomial to fit to the trace data. See
:func:`fit_trace`.
order (:obj:`int`, optional):
Order of the polynomial to fit to each trace. See
:func:`fit_trace`.
maxdev (:obj:`float`, optional):
See :func:`fit_trace`. If provided, reject points with
``abs(data-model) > maxdev`` when fitting the trace. If
None, no points are rejected.
maxiter (:obj:`int`, optional):
Maximum number of rejection iterations allowed during the
fitting. See :func:`fit_trace`.
niter_uniform (:obj:`int`, optional):
The number of iterations for :func:`fit_trace` when edge
measurements are based on uniform weighting. See
description above.
niter_gaussian (:obj:`int`, optional):
The number of iterations for :func:`fit_trace` when edge
measurements are based on Gaussian weighting. See
description above.
bitmask (:class:`~pypeit.bitmask.BitMask`, optional):
Object used to toggle the returned bit masks in edge
centroid measurements; see :func:`masked_centroid`.
debug (:obj:`bool`, optional):
Show plots useful for debugging.
Returns:
:obj:`tuple`: Returns four `numpy.ndarray`_ objects and the
number of peak traces. The number of peak traces should be
used to separate peak from trough traces; if ``trough`` is
False, this will just be the total number of traces. The four
`numpy.ndarray`_ objects provide:
- The best-fitting positions of each trace determined by the
polynomial fit.
- The centroids of the trace determined by the
Gaussian-weighting iteration, to which the polynomial is
fit.
- The errors in the Gaussian-weighted centroids.
- Boolean flags for each centroid measurement (see
:func:`~pypeit.core.moment.moment1d`).
Raises:
ValueError:
Raised if the input ``flux`` is not two-dimensional or if
the shape of ``ivar``, ``bpm``, or ``trace_map`` is not
identical to ``flux``.
"""
# Setup and ensure input is correct
if flux.ndim != 2:
raise ValueError('Input image must be 2D.')
nspec, nspat = flux.shape
if ivar is None:
ivar = np.ones_like(flux, dtype=float)
if ivar.shape != flux.shape:
raise ValueError('Inverse variance array shape is incorrect.')
if bpm is None:
bpm = np.zeros_like(flux, dtype=bool)
if bpm.shape != flux.shape:
raise ValueError('Mask array shape is incorrect.')
# Define the region to collapse
if smash_range is None:
smash_range = (0.,1.)
# Set the image to collapse
if trace_map is None:
# Assume image already rectified
flux_extract = flux
# Just set the trace to the follow the spatial columns
trace_map = np.tile(np.arange(nspat), (nspec,1))
else:
# Check there is a trace for each image pixel
if trace_map.shape != flux.shape:
raise ValueError('Provided trace data must match the image shape.')
msgs.info('Rectifying image by extracting along trace for each spatial pixel')
# TODO: JFH What should this aperture size be? I think fwhm=3.0
# since that is the width of the sobel filter
flux_extract = sampling.rectify_image(flux, trace_map, bpm=bpm, extract_width=fwhm_gaussian
if extract_width is None else extract_width)[0]
# if debug:
# ginga.show_image(flux_extract, chname ='rectified image')
# Collapse the image along the spectral direction to isolate peaks/troughs
start, end = np.clip(np.asarray(smash_range)*nspec, 0, nspec).astype(int)
msgs.info('Collapsing image spectrally between pixels {0}:{1}'.format(start, end))
flux_smash_mean, flux_smash_median, flux_smash_sig \
= sigma_clipped_stats(flux_extract[start:end,:], axis=0, sigma=4.0)
# Offset by the median
# TODO: If tracing Sobel-filtered image, this should be close to,
# or identically, 0
flux_median = np.median(flux_smash_mean)
flux_smash_mean -= flux_median
# Trace peak or both peaks and troughs
label = ['peak', 'trough'] if trough else ['peak']
sign = [1, -1] if trough else [1]
# Instantiate output
npeak = 0
fit = np.empty((nspec,0), dtype=float)
cen = np.empty((nspec,0), dtype=float)
err = np.empty((nspec,0), dtype=float)
msk = np.empty((nspec,0), dtype=bool if bitmask is None else bitmask.minimum_dtype())
# Get the smoothing kernel width and ensure it is odd
median_kernel = None if trace_median_frac is None \
else int(np.ceil(nspec*trace_median_frac))//2 * 2 + 1
# Identify and trace features in the image
for i,(l,s) in enumerate(zip(label,sign)):
# Identify the peaks
msgs.info('Searching for peaks.')
peak, _, _cen, _, _, best, _, _ \
= arc.detect_lines(s*flux_smash_mean, cont_subtract=False, fwhm=fwhm_gaussian,
input_thresh=peak_thresh, max_frac_fwhm=4.0,
min_pkdist_frac_fwhm=5.0, debug=debug)
if len(_cen) == 0 or not np.any(best):
msgs.warn('No good {0}s found!'.format(l))
continue
msgs.info('Found {0} good {1}(s) in the rectified, collapsed image'.format(
len(_cen[best]),l))
# Set the reference spatial locations to use for tracing the
# detected peaks
# TODO: Added this for a test case, not sure we should keep it
# in the long run.
_cen = _cen[best]
loc = np.round(_cen).astype(int)
if peak_clip is not None:
# Clip the peaks based on their amplitude as a stop-gap for
# a detection threshold that may be too low. Only clip
# aberrantly low values.
clipped_peak = sigma_clip(peak[best], sigma_lower=peak_clip, sigma_higher=np.inf)
peak_mask = np.ma.getmaskarray(clipped_peak)
if np.any(peak_mask):
msgs.warn('Clipping {0} detected peak(s) with aberrant amplitude(s).'.format(
np.sum(peak_mask)))
loc = loc[np.invert(peak_mask)]
_cen = _cen[np.invert(peak_mask)]
# As the starting point for the iterative trace fitting, use
# the input trace data at the positions of the detected peaks.
# The trace at column `loc` is expected to pass through `loc`
# at the reference spectral pixel. The offset of the trace is
# to allow for non-integer measurements of the peak centroid.
trace_peak = trace_map[:,loc] + (_cen-loc)[None,:]
# Image to trace: flip when tracing the troughs and clip low
# values
# TODO: This 1 is drawn out of the ether; this is different
# from what is done in prepare_sobel_for_trace
_flux = np.clip(s*(flux - flux_median), 1, None)
# Construct the trace mask
trace_peak_bpm = np.zeros(trace_peak.shape, dtype=bool) if trace_thresh is None \
else build_trace_bpm(_flux, trace_peak, bpm=bpm, boxcar=fwhm_gaussian,
thresh=trace_thresh, median_kernel=median_kernel)
# Remeasure and fit the trace using uniform weighting
trace_peak, _cen, _err, _msk, _ \
= fit_trace(_flux, trace_peak, order, ivar=ivar, bpm=bpm,
trace_bpm=trace_peak_bpm, fwhm=fwhm_uniform, maxshift=maxshift,
maxerror=maxerror, function=function, maxdev=maxdev, maxiter=maxiter,
niter=niter_uniform, bitmask=bitmask, debug=debug)
# Reset the mask
# TODO: Use or include `bad` resulting from fit_trace()?
trace_peak_bpm = np.zeros(trace_peak.shape, dtype=bool) if trace_thresh is None \
else build_trace_bpm(_flux, trace_peak, bpm=bpm, boxcar=fwhm_gaussian,
thresh=trace_thresh, median_kernel=median_kernel)
# Redo the measurements and trace fitting with Gaussian
# weighting
trace_peak, _cen, _err, _msk, _ \
= fit_trace(_flux, trace_peak, order, ivar=ivar, bpm=bpm, trace_bpm=trace_peak_bpm,
weighting='gaussian', fwhm=fwhm_gaussian, maxshift=maxshift,
maxerror=maxerror, function=function, maxdev=maxdev, maxiter=maxiter,
niter=niter_gaussian, bitmask=bitmask, debug=debug)
# Save the results
fit = np.append(fit, trace_peak, axis=1)
cen = np.append(cen, _cen, axis=1)
err = np.append(err, _err, axis=1)
msk = np.append(msk, _msk, axis=1)
if i == 0:
# Save the number of peaks (troughs are appended, if they're located)
npeak = cen.shape[1]
return fit, cen, err, msk, npeak
[docs]def parse_user_slits(add_slits, this_det, rm=False):
"""
Parse the parset syntax for adding or removing slits
Args:
add_slits (str, list):
Taken from the parset
this_det (int):
current detector
rm (bool, optional):
Remove instead of add?
Returns:
list or None:
if list, [[x0,x1,yrow]] for add (rm=False) with one or more entries
if list, [[xcen,yrow]] for rm=True with one or more entries
"""
# Might not be a list yet (only a str)
if not isinstance(add_slits, list):
add_slits = [add_slits]
#
user_slits = []
for islit in add_slits:
# Add?
if not rm:
det, x0, x1, yrow = [int(ii) for ii in islit.split(':')]
if det == this_det:
user_slits.append([x0,x1,yrow])
else: # Remove
det, xcen, yrow = [int(ii) for ii in islit.split(':')]
if det == this_det:
user_slits.append([xcen,yrow])
# Finish
return None if len(user_slits) == 0 else user_slits
[docs]def find_missing_orders(cen, width_fit, gap_fit, tol=0.2):
"""
Using simple models for the order width and order gap as a function of
spatial position, identify orders missed by the automated tracing.
Args:
cen (`numpy.ndarray`_):
The spatial pixel positions of the orders traced.
width_fit (:class:`~pypeit.core.fitting.PypeItFit`):
Model of the order width as a function of the order center.
gap_fit (:class:`~pypeit.core.fitting.PypeItFit`):
Model of the order gap *after* each order as a function of the order
center.
tol (:obj:`float`, optional):
Fraction of the order width used as the tolerance to identify missed
orders.
Returns:
:obj:`tuple`: Two arrays providing (1) the centers of all slit orders
and (2) a boolean array selecting orders that were missed by the tracing
algorithm.
"""
# Start with the first order found
c = [cen[0]]
missing = [False]
# Only interpolate; i.e., only iterate through the region covered by
# successfully traced orders.
i = 1
while c[-1] < cen[-1]:
# Calculate where the model would predict the next order to land and add
# it to the list of centers.
l = width_fit.eval(c[-1])
c += [c[-1] + l + gap_fit.eval(c[-1])]
# Determine if its missing
missing += [True if i >= len(cen) else np.absolute(c[-1] - cen[i]) > tol*l]
if not missing[-1]:
# If not, reset the center to the measured value and increment the
# array index
c[-1] = cen[i]
i += 1
# Return arrays
return np.array(c), np.array(missing)
[docs]def predicted_center_difference(lower_spat, spat, width_fit, gap_fit):
"""
Return the difference between the predicted and true location of an order
center.
This is specifically implemented for :func:`extrapolate_orders` and its use
of an optimization algorithm to extrapolate the order locations to *lower*
spatial pixel values.
Args:
lower_spat (`numpy.ndarray`_):
Optimization parameter. Must be an array with a single element
giving the predicted spatial location of the order toward lower
spatial pixels compared to the known order (``true``).
spat (:obj:`float`):
The true spatial location of the known order.
width_fit (:class:`~pypeit.core.fitting.PypeItFit`):
Model of the order width as a function of the order center.
gap_fit (:class:`~pypeit.core.fitting.PypeItFit`):
Model of the order gap *after* each order as a function of the order
center.
Returns:
:obj:`float`: The absolute value of the difference between the
prediction and the measure location of the order.
"""
test_spat = lower_spat[0] + width_fit.eval(lower_spat[0]) + gap_fit.eval(lower_spat[0])
return np.absolute(spat - test_spat)
[docs]def extrapolate_orders(cen, width_fit, gap_fit, min_spat, max_spat, tol=0.01):
"""
Predict the locations of additional orders by extrapolation.
Order centers are only predicted for those that fall between a minimum and
maximum spatial pixel value (see ``min_spat`` and ``max_spat``).
The models of the order width and gap are defined such that the location of
the orders are :math:`c_{i+1} = c_i + w_i + g_i`. Extrapolation toward
larger spatial positions is, therefore, trivial. Toward smaller pixels,
:math:`c_i` is unknown and the width and gap models are not required to be
linear; we use a simple minimiation algorithm to optimize the extrapolated
values so that the order location model is accurate (see ``tol``).
Args:
cen (`numpy.ndarray`_):
The spatial pixel positions of the orders traced.
width_fit (:class:`~pypeit.core.fitting.PypeItFit`):
Model of the order width as a function of the order center.
gap_fit (:class:`~pypeit.core.fitting.PypeItFit`):
Model of the order gap *after* each order as a function of the order
center.
min_spat (:obj:`float`):
The minimum spatial pixel for the extrapolation range.
max_spat (:obj:`float`):
The maximum spatial pixel for the extrapolation range.
tol (:obj:`float`, optional):
Tolerance used when optimizing the order locations predicted toward
lower spatial pixels.
Returns:
:obj:`tuple`: Two arrays with orders centers (1) below the first and (2)
above the last measured center. One or both of the arrays can be empty
if extrapolation leads to no orders outside the specified minimum and
maximum spatial range (``min_spat``, ``max_spat``).
"""
# Extrapolate toward lower spatial positions
lower_spat = [cen[0]]
while lower_spat[-1] > min_spat:
# Guess the position of the previous order
l = width_fit.eval(lower_spat[-1])
guess = np.array([lower_spat[-1] - l - gap_fit.eval(lower_spat[-1])])
# Set the bounds based on this guess and the expected order width
bounds = optimize.Bounds(lb=guess - l/2, ub=guess + l/2)
# Optimize the spatial position
res = optimize.minimize(predicted_center_difference, guess,
args=(lower_spat[-1], width_fit, gap_fit),
method='trust-constr', jac='2-point', bounds=bounds, tol=tol)
lower_spat += [res.x[0]]
# Extrapolate toward larger spatial positions
upper_spat = [cen[-1]]
while upper_spat[-1] < max_spat:
upper_spat += [upper_spat[-1] + width_fit.eval(upper_spat[-1])
+ gap_fit.eval(upper_spat[-1])]
# Return arrays after removing the first and last spatial position (which
# are either repeats of values in `cen` or outside the spatial range)
return np.array(lower_spat[-2:0:-1]), np.array(upper_spat[1:-1])
```