# Licensed under a 3-clause BSD style license - see PYDL_LICENSE.rst
# -*- coding: utf-8 -*-
# Also cite https://doi.org/10.5281/zenodo.1095150 when referencing PYDL
"""
Implements the bspline class
.. include:: ../include/links.rst
"""
import copy
import warnings
from IPython import embed
import numpy as np
from pypeit.core import basis
from pypeit import datamodel
from pypeit import msgs
try:
from pypeit.bspline.utilc import cholesky_band, cholesky_solve, solution_arrays, intrv, \
bspline_model
except:
warnings.warn('Unable to load bspline C extension. Try rebuilding pypeit. In the '
'meantime, falling back to pure python code.')
from pypeit.bspline.utilpy import cholesky_band, cholesky_solve, solution_arrays, intrv, \
bspline_model
# TODO: Used for testing. Keep around for now.
#from pypeit.bspline.utilpy import bspline_model
#from pypeit.bspline.utilpy import cholesky_band, cholesky_solve, solution_arrays, intrv, \
# bspline_model
# TODO: Types are important for the C extension. Types should be
# limited to int, float, bool!
# TODO: May need to add hooks to utilc.py that do the type conversion,
# but that should be a last resort for stability.
# TODO: This whole module needs to be cleaned up.
[docs]
class bspline(datamodel.DataContainer):
"""Bspline class.
Functions in the bspline library are implemented as methods on this
class.
The datamodel attributes are:
.. include:: ../include/class_datamodel_bspline.rst
When written to an output-file HDU, all `numpy.ndarray`_ elements are
bundled into an `astropy.io.fits.BinTableHDU`_, and the other elements are
written as header keywords. Any datamodel elements that are None are *not*
included in the output.
Parameters
----------
x : `numpy.ndarray`_
The data.
nord : :class:`int`, optional
To be documented.
npoly : :class:`int`, optional
To be documented.
bkpt : `numpy.ndarray`_, optional
To be documented.
bkspread : :class:`float`, optional
To be documented.
verbose : :class:`bool`, optional.
If ``True`` print extra information.
Attributes
----------
breakpoints
Breakpoints for bspline, spacing for these breakpoints are determined by keywords inputs;
nord
Order of bspline; [default=4]
npoly
Polynomial order to fit over 2nd variable (when specified as x2): [default=1]
mask
Output mask, set =1 for good points, =0 for bad points;
coeff
Output coefficient of the bspline;
icoeff
Cholesky band matrix used to solve for the bspline coefficients;
xmin
Normalization minimum for x2; [default max(xdata)]
xmax
Normalization maximum for x2; [default min(xdata)]
funcname
Function for the second variable; [default 'legendre']
from_dict
If not None, create a bspline from a dictionary created by to_dict(). [default 'None']
It is possible to instantiate a bspline from a dict without the x data:
new_bspline = bspline(None, from_dict=dictionary)
"""
version = '1.0.0'
# TODO: Fix the description of icoeff
datamodel = {'breakpoints': dict(otype=np.ndarray, atype=np.floating,
descr='Breakpoint locations'),
'nord': dict(otype=int, descr='Order of the bspline fit'),
'npoly': dict(otype=int, descr='Order of the bspline polynomial'),
'mask': dict(otype=np.ndarray, atype=np.bool_, descr='Mask'),
'coeff': dict(otype=np.ndarray, atype=np.floating, descr='Fit coefficients'),
'icoeff': dict(otype=np.ndarray, atype=np.floating, descr='??'),
'xmin': dict(otype=float, descr='Normalization for input data'),
'xmax': dict(otype=float, descr='Normalization for input data'),
'funcname': dict(otype=str, descr='Function of fit')}
# ToDO Consider refactoring the argument list so that there are no kwargs
def __init__(self, x, fullbkpt=None, nord=4, npoly=1, bkpt=None, bkspread=1.0, verbose=False,
from_dict=None, **kwargs):
"""Init creates an object whose attributes are similar to the
structure returned by the create_bspline function.
"""
# Setup the DataContainer with everything None
datamodel.DataContainer.__init__(self)
# JFH added this to enforce immutability of these input arguments, as this code modifies bkpt and fullbkpt
# as it goes
fullbkpt1 = copy.copy(fullbkpt)
bkpt1 = copy.copy(bkpt)
if from_dict is not None:
self.nord=from_dict['nord']
self.npoly=from_dict['npoly']
self.breakpoints=np.array(from_dict['breakpoints']).astype(float) # Force type
self.mask=np.array(from_dict['mask'])
self.coeff=np.array(from_dict['coeff'])
self.icoeff=np.array(from_dict['icoeff'])
self.xmin=from_dict['xmin']
self.xmax=from_dict['xmax']
self.funcname=from_dict['funcname']
return
# Instantiate empty if neither fullbkpt or x is set
elif x is None and fullbkpt is None:
self.nord = None
self.npoly = None
self.breakpoints= None
self.mask= None
self.coeff= None
self.icoeff= None
self.xmin= None
self.xmax= None
self.funcname= None
return
else:
#
# Set the breakpoints.
#
if fullbkpt1 is None:
if bkpt1 is None:
startx = x.min()
rangex = x.max() - startx
if 'placed' in kwargs:
w = ((kwargs['placed'] >= startx) &
(kwargs['placed'] <= startx+rangex))
if w.sum() < 2:
bkpt1 = np.arange(2, dtype=float) * rangex + startx
else:
bkpt1 = kwargs['placed'][w]
elif 'bkspace' in kwargs:
nbkpts = int(rangex/kwargs['bkspace']) + 1
if nbkpts < 2:
nbkpts = 2
tempbkspace = rangex/float(nbkpts-1)
bkpt1 = np.arange(nbkpts, dtype=float)*tempbkspace + startx
elif 'nbkpts' in kwargs:
nbkpts = kwargs['nbkpts']
if nbkpts < 2:
nbkpts = 2
tempbkspace = rangex/float(nbkpts-1)
bkpt1 = np.arange(nbkpts, dtype=float) * tempbkspace + startx
elif 'everyn' in kwargs:
nx = x.size
nbkpts = max(nx/kwargs['everyn'], 1)
if nbkpts == 1:
xspot = [0]
else:
xspot = (nx/nbkpts)*np.arange(nbkpts)
# JFH This was a bug. Made fixes
#xspot = int(nx/(nbkpts-1)) * np.arange(nbkpts, dtype='i4')
#bkpt = x[xspot].astype('f')
bkpt1 = np.interp(xspot,np.arange(nx),x)
else:
raise ValueError('No information for bkpts.')
# JFH added this new code, because bkpt.size = 1 implies fullbkpt has only 2*(nord-1) + 1 elements.
# This will cause a crash in action because nbkpt < 2*nord, i.e. for bkpt = 1, nord = 4 fullbkpt has
# seven elements which is less than 2*nord = 8. The codes above seem to require nbkpt >=2, so I'm implementing
# this requirement. Note that the previous code before this fix simply sets bkpt to bkpt[imax] =x.max()
# which is equally arbitrary, but still results in a crash. By requiring at least 2 bkpt, fullbkpt will
# have 8 elements preventing action from crashing
if (bkpt1.size < 2):
bkpt1 = np.zeros(2, dtype=float)
bkpt1[0] = x.min()
bkpt1[1] = x.max()
else:
imin = bkpt1.argmin()
imax = bkpt1.argmax()
if x.min() < bkpt1[imin]:
if verbose:
print('Lowest breakpoint does not cover lowest x value: changing.')
bkpt1[imin] = x.min()
if x.max() > bkpt1[imax]:
if verbose:
print('Highest breakpoint does not cover highest x value: changing.')
bkpt1[imax] = x.max()
nshortbkpt = bkpt1.size
fullbkpt1 = bkpt1.copy()
# Note that with the JFH change above, this nshortbkpt ==1 is never realized beacause above I forced
# bkpt to have at least two elements. Not sure why this was even allowed, since bkpt.size = 1
# basically results in action crashing as described above.
if nshortbkpt == 1:
bkspace = bkspread
else:
bkspace = (bkpt1[1] - bkpt1[0])*bkspread
for i in np.arange(1, nord):
fullbkpt1 = np.insert(fullbkpt1, 0, bkpt1[0]-bkspace*i)
fullbkpt1 = np.insert(fullbkpt1, fullbkpt1.shape[0],
bkpt1[nshortbkpt-1] + bkspace*i)
# JFH added this to fix bug in cases where fullbkpt is passed in but has < 2*nord elements
if fullbkpt1.size < 2*nord:
fullbkpt_init = fullbkpt1.copy()
nshortbkpt = fullbkpt_init.size
bkspace = (fullbkpt_init[1] - fullbkpt_init[0])*bkspread
for i in np.arange(1, nord):
fullbkpt1 = np.insert(fullbkpt1, 0, fullbkpt_init[0] - bkspace * i)
fullbkpt1 = np.insert(fullbkpt1, fullbkpt1.shape[0],
fullbkpt_init[nshortbkpt - 1] + bkspace * i)
nc = fullbkpt1.size - nord
self.breakpoints = fullbkpt1.astype(float) # Ensure type is float for C extension
self.nord = nord
self.npoly = npoly
self.mask = np.ones((fullbkpt1.size,), dtype=bool)
if npoly > 1:
self.coeff = np.zeros((npoly, nc), dtype=float)
self.icoeff = np.zeros((npoly, nc), dtype=float)
else:
self.coeff = np.zeros((nc,), dtype=float)
self.icoeff = np.zeros((nc,), dtype=float)
self.xmin = 0.0
self.xmax = 1.0
self.funcname = kwargs['funcname'] if 'funcname' in kwargs else 'legendre'
[docs]
def reinit_coeff(self):
nc = self.breakpoints.size - self.nord
self.coeff = np.zeros((self.npoly, nc), dtype=float) if self.npoly > 1 \
else np.zeros(nc, dtype=float)
[docs]
def _bundle(self):
"""
Overload for the HDU name
Returns:
list:
"""
return super(bspline, self)._bundle(ext='BSPLINE')
[docs]
def copy(self):
"""
Return a copied instance of the object.
"""
bsp_copy = bspline(None)
bsp_copy.nord = self.nord
bsp_copy.npoly = self.npoly
bsp_copy.breakpoints = np.copy(self.breakpoints)
bsp_copy.mask = np.copy(self.mask)
bsp_copy.coeff = np.copy(self.coeff)
bsp_copy.icoeff = np.copy(self.icoeff)
bsp_copy.xmin = self.xmin
bsp_copy.xmax = self.xmax
bsp_copy.funcname = self.funcname
return bsp_copy
[docs]
def to_dict(self):
"""
Write bspline attributes to a dict.
Attributes returned are: :attr:`breakpoints`, :attr:`nord`,
:attr:`npoly`, :attr:`mask`, :attr:`coeff`, :attr:`icoeff`,
:attr:`xmin`, :attr:`xmax`, and :attr:`funcname`.
.. note::
`numpy.ndarray`_ objects are converted to lists in the
dictionary to make it JSON compatible.
Returns:
:obj:`dict`: A dictionary with the above keys and items.
"""
return dict(breakpoints=self.breakpoints.tolist(),
nord=self.nord,
npoly=self.npoly,
mask=self.mask.tolist(),
coeff=self.coeff.tolist(),
icoeff=self.icoeff.tolist(),
xmin=self.xmin,
xmax=self.xmax,
funcname=self.funcname)
# TODO: C this
# TODO: Should this be used, or should we effectively replace it
# with the content of utils.bspline_profile
[docs]
def fit(self, xdata, ydata, invvar, x2=None):
"""Calculate a B-spline in the least-squares sense.
Fit is based on two variables: x which is sorted and spans a large range
where bkpts are required y which can be described with a low order
polynomial.
Parameters
----------
xdata : `numpy.ndarray`_
Independent variable.
ydata : `numpy.ndarray`_
Dependent variable.
invvar : `numpy.ndarray`_
Inverse variance of `ydata`.
x2 : `numpy.ndarray`_, optional
Orthogonal dependent variable for 2d fits.
Returns
-------
:obj:`tuple`
A tuple containing an integer error code, and the evaluation of the
b-spline at the input values. An error code of -2 is a failure,
-1 indicates dropped breakpoints, 0 is success, and positive
integers indicate ill-conditioned breakpoints.
"""
goodbk = self.mask[self.nord:]
nn = goodbk.sum()
if nn < self.nord:
yfit = np.zeros(ydata.shape, dtype=float)
return (-2, yfit)
nfull = nn * self.npoly
bw = self.npoly * self.nord
a1, lower, upper = self.action(xdata, x2=x2)
foo = np.tile(invvar, bw).reshape(bw, invvar.size).transpose()
a2 = a1 * foo
alpha = np.zeros((bw, nfull+bw), dtype=float)
beta = np.zeros((nfull+bw,), dtype=float)
bi = np.arange(bw, dtype=int)
bo = np.arange(bw, dtype=int)
for k in range(1, bw):
bi = np.append(bi, np.arange(bw-k, dtype=int)+(bw+1)*k)
bo = np.append(bo, np.arange(bw-k, dtype=int)+bw*k)
for k in range(nn-self.nord+1):
itop = k*self.npoly
ibottom = min(itop, nfull) + bw - 1
ict = upper[k] - lower[k] + 1
if ict > 0:
work = np.dot(a1[lower[k]:upper[k]+1, :].T, a2[lower[k]:upper[k]+1, :])
wb = np.dot(ydata[lower[k]:upper[k]+1], a2[lower[k]:upper[k]+1, :])
alpha.T.flat[bo+itop*bw] += work.flat[bi]
beta[itop:ibottom+1] += wb
min_influence = 1.0e-10 * invvar.sum() / nfull
errb = cholesky_band(alpha, mininf=min_influence) # ,verbose=True)
if isinstance(errb[0], int) and errb[0] == -1:
a = errb[1]
else:
yfit, foo = self.value(xdata, x2=x2, action=a1, upper=upper, lower=lower)
return (self.maskpoints(errb[0]), yfit)
errs = cholesky_solve(a, beta)
if isinstance(errs[0], int) and errs[0] == -1:
sol = errs[1]
else:
#
# It is not possible for this to get called, because cholesky_solve
# has only one return statement, & that statement guarantees that
# errs[0] == -1
#
yfit, foo = self.value(xdata, x2=x2, action=a1, upper=upper, lower=lower)
return (self.maskpoints(errs[0]), yfit)
if self.coeff.ndim == 2:
# JFH made major bug fix here.
self.icoeff[:, goodbk] = np.array(a[0, 0:nfull].T.reshape(self.npoly, nn,order='F'), dtype=a.dtype)
self.coeff[:, goodbk] = np.array(sol[0:nfull].T.reshape(self.npoly, nn, order='F'), dtype=sol.dtype)
else:
self.icoeff[goodbk] = np.array(a[0, 0:nfull], dtype=a.dtype)
self.coeff[goodbk] = np.array(sol[0:nfull], dtype=sol.dtype)
yfit, foo = self.value(xdata, x2=x2, action=a1, upper=upper, lower=lower)
return (0, yfit)
[docs]
def action(self, x, x2=None):
"""Construct banded bspline matrix, with dimensions [ndata, bandwidth].
Parameters
----------
x : `numpy.ndarray`_
Independent variable.
x2 : `numpy.ndarray`_, optional
Orthogonal dependent variable for 2d fits.
Returns
-------
:obj:`tuple`
A tuple containing the b-spline action matrix; the 'lower' parameter,
a list of pixel positions, each corresponding to the first
occurence of position greater than breakpoint indx; and 'upper',
Same as lower, but denotes the upper pixel positions.
"""
nbkpt = self.mask.sum()
if nbkpt < 2*self.nord:
warnings.warn('Order ({0}) too low for {1} breakpoints.'.format(self.nord, nbkpt))
return -2, 0, 0
nx = x.size
n = nbkpt - self.nord
lower = np.zeros((n - self.nord + 1,), dtype=int)
upper = np.zeros((n - self.nord + 1,), dtype=int) - 1
indx = intrv(self.nord, self.breakpoints[self.mask], x)
bf1 = self.bsplvn(x, indx)
# print('F_CONTIGUOUS after bsplvn: {0}'.format(bf1.flags['F_CONTIGUOUS']))
aa = uniq(indx)
upper[indx[aa]-self.nord+1] = aa
rindx = indx[::-1]
bb = uniq(rindx)
lower[rindx[bb]-self.nord+1] = nx - bb - 1
if x2 is None:
return bf1, lower, upper
# print('x2!')
if x2.size != nx:
raise ValueError('Dimensions of x and x2 do not match.')
# TODO: Below is unchanged.
x2norm = 2.0 * (x2 - self.xmin) / (self.xmax - self.xmin) - 1.0
# TODO: Should consider faster ways of generating the temppoly arrays for poly and poly1
if self.funcname == 'poly':
temppoly = np.ones((nx, self.npoly), dtype=float)
for i in range(1, self.npoly):
temppoly[:, i] = temppoly[:, i-1] * x2norm
elif self.funcname == 'poly1':
temppoly = np.tile(x2norm, self.npoly).reshape(nx, self.npoly)
for i in range(1, self.npoly):
temppoly[:, i] = temppoly[:, i-1] * x2norm
elif self.funcname == 'chebyshev':
# JFH fixed bug here where temppoly needed to be transposed because of different IDL and python array conventions
# NOTE: Transposed them in the functions themselves
temppoly = basis.fchebyshev(x2norm, self.npoly)
elif self.funcname == 'legendre':
temppoly = basis.flegendre(x2norm, self.npoly)
else:
raise ValueError('Unknown value of funcname.')
# TODO: Should consider faster way of calculating action that
# doesn't require a nested loop. Below might work, but it needs
# to be tested.
# _action = (bf1[:,:,None] * temppoly[:,None,:]).reshape(nx,-1)
bw = self.npoly*self.nord
action = np.zeros((nx, bw), dtype=float, order='F')
counter = -1
for ii in range(self.nord):
for jj in range(self.npoly):
counter += 1
action[:, counter] = bf1[:, ii]*temppoly[:, jj]
return action, lower, upper
# TODO: C this?
[docs]
def bsplvn(self, x, ileft):
"""To be documented.
Parameters
----------
x : `numpy.ndarray`_
To be documented.
ileft : :class:`int`
To be documented
Returns
-------
vnikx : `numpy.ndarray`_
To be documented.
"""
bkpt = self.breakpoints[self.mask]
# TODO: Had to set the order here to keep it consistent with
# utils.bspline_profile, but is this going to break things
# elsewhere? Ideally, we wouldn't be setting the memory order
# anywhere...
vnikx = np.zeros((x.size, self.nord), dtype=x.dtype, order='F')
deltap = vnikx.copy()
deltam = vnikx.copy()
j = 0
vnikx[:, 0] = 1.0
while j < self.nord - 1:
ipj = ileft+j+1
deltap[:, j] = bkpt[ipj] - x
imj = ileft-j
deltam[:, j] = x - bkpt[imj]
vmprev = 0.0
for l in range(j+1):
vm = vnikx[:, l]/(deltap[:, l] + deltam[:, j-l])
vnikx[:, l] = vm*deltap[:, l] + vmprev
vmprev = vm*deltam[:, j-l]
j += 1
vnikx[:, j] = vmprev
return vnikx
[docs]
def value(self, x, x2=None, action=None, lower=None, upper=None):
"""Evaluate a bspline at specified values.
Parameters
----------
x : `numpy.ndarray`_
Independent variable.
x2 : `numpy.ndarray`_, optional
Orthogonal dependent variable for 2d fits.
action : `numpy.ndarray`_, optional
Action matrix to use. If not supplied it is calculated.
lower : `numpy.ndarray`_, optional
If the action parameter is supplied, this parameter must also
be supplied.
upper : `numpy.ndarray`_, optional
If the action parameter is supplied, this parameter must also
be supplied.
Returns
-------
yfit : `numpy.ndarray`_
Results of the bspline evaluation
mask : `numpy.ndarray`_
Mask indicating where the evaluation was good (i.e., True
is good).
"""
# TODO: Is the sorting necessary?
xsort = x.argsort()
if action is None:
action, lower, upper = self.action(x[xsort], x2=None if x2 is None else x2[xsort])
else:
if lower is None or upper is None:
raise ValueError('Must specify lower and upper if action is set.')
n = self.mask.sum() - self.nord
coeffbk = self.mask[self.nord:].nonzero()[0]
goodcoeff = self.coeff[...,coeffbk]
yfit = bspline_model(x, action, lower, upper, goodcoeff, n, self.nord, self.npoly)
mask = np.ones(x.shape, dtype=bool)
goodbk = self.mask.nonzero()[0]
gb = self.breakpoints[goodbk]
mask[(x < gb[self.nord-1]) | (x > gb[n])] = False
hmm = (np.diff(goodbk) > 2).nonzero()[0]
if hmm.size == 0:
return yfit[np.argsort(xsort)], mask
for jj in range(hmm.size):
mask[(x >= self.breakpoints[goodbk[hmm[jj]]])
& (x <= self.breakpoints[goodbk[hmm[jj]+1]-1])] = False
return yfit[np.argsort(xsort)], mask
[docs]
def maskpoints(self, err):
"""Perform simple logic of which breakpoints to mask.
Parameters
----------
err : `numpy.ndarray`_, :obj:`int`
The list of indexes returned by the cholesky routines.
This is indexed to the set of currently *good*
breakpoints (i.e. self.mask=True) and the first nord are
skipped.
Returns
-------
:obj:`int`
An integer indicating the results of the masking. -1 indicates
that the error points were successfully masked. -2 indicates
failure; the calculation should be aborted.
Notes
-----
The mask attribute is modified, assuming it is possible to create the
mask.
"""
# Recast err as an array if a single value int was passed in (occasional)
if not isinstance(err, np.ndarray):
err = np.array([err])
# Currently good points
goodbkpt = np.where(self.mask)[0]
nbkpt = len(goodbkpt)
if nbkpt <= 2*self.nord:
warnings.warn('Fewer good break points than order of b-spline. Returning...')
return -2
# Find the unique ones for the polynomial
hmm = err[uniq(err//self.npoly)]//self.npoly
n = nbkpt - self.nord
if np.any(hmm >= n):
warnings.warn('Note enough unique points in cholesky_band decomposition of b-spline matrix. Returning...')
return -2
test = np.zeros(nbkpt, dtype=bool)
for jj in range(-int(np.ceil(self.nord/2)), int(self.nord/2.)):
foo = np.where((hmm+jj) > 0, hmm+jj, np.zeros(hmm.shape, dtype=hmm.dtype))
inside = np.where((foo+self.nord) < n-1, foo+self.nord, np.zeros(hmm.shape, dtype=hmm.dtype)+n-1)
if len(inside)>0:
test[inside] = True
if test.any():
reality = goodbkpt[test]
if self.mask[reality].any():
self.mask[reality] = False
return -1
return -2
return -2
[docs]
def workit(self, xdata, ydata, invvar, action, lower, upper):
"""An internal routine for bspline_extract and bspline_radial which solve a general
banded correlation matrix which is represented by the variable "action". This routine
only solves the linear system once, and stores the coefficients in sset. A non-zero return value
signifies a failed inversion
Parameters
----------
xdata : `numpy.ndarray`_
Independent variable.
ydata : `numpy.ndarray`_
Dependent variable.
invvar : `numpy.ndarray`_
Inverse variance of `ydata`.
action : `numpy.ndarray`_
Banded correlation matrix
lower : `numpy.ndarray`_
A list of pixel positions, each corresponding to the first occurence of position greater than breakpoint indx
upper : `numpy.ndarray`_
Same as lower, but denotes the upper pixel positions
Returns
-------
success : :obj:`int`
Method error code: 0 is good; -1 is dropped breakpoints,
try again; -2 is failure, should abort.
yfit : `numpy.ndarray`_
Evaluation of the b-spline yfit at the input values.
"""
goodbk = self.mask[self.nord:]
# KBW: Interesting: x.sum() is actually a bit faster than np.sum(x)
nn = goodbk.sum()
if nn < self.nord:
warnings.warn('Fewer good break points than order of b-spline. Returning...')
return -2, np.zeros(ydata.shape, dtype=float)
alpha, beta = solution_arrays(nn, self.npoly, self.nord, ydata, action, invvar, upper,
lower)
nfull = nn * self.npoly
# Right now we are not returning the covariance, although it may arise that we should
# covariance = alpha
err, a = cholesky_band(alpha, mininf=1.0e-10 * invvar.sum() / nfull)
# successful cholseky_band returns -1
if not isinstance(err, int) or err != -1:
return self.maskpoints(err), \
self.value(xdata, x2=xdata, action=action, upper=upper, lower=lower)[0]
# NOTE: cholesky_solve ALWAYS returns err == -1; don't even catch it.
sol = cholesky_solve(a, beta)[1]
if self.coeff.ndim == 2:
self.icoeff[:,goodbk] = np.array(a[0,:nfull].T.reshape(self.npoly, nn, order='F'), dtype=a.dtype)
self.coeff[:,goodbk] = np.array(sol[:nfull].T.reshape(self.npoly, nn, order='F'), dtype=sol.dtype)
else:
self.icoeff[goodbk] = np.array(a[0,:nfull], dtype=a.dtype)
self.coeff[goodbk] = np.array(sol[:nfull], dtype=sol.dtype)
return 0, self.value(xdata, x2=xdata, action=action, upper=upper, lower=lower)[0]
# TODO: I don't think we need to make this reproducible with the IDL version anymore, and can opt for speed instead.
# TODO: Move this somewhere for more common access?
# Faster than previous version but not as fast as if we could switch to
# np.unique.
[docs]
def uniq(x, index=None):
"""
Return the indices of the *last* occurrence of the unique
elements in a sorted array.
The input vector must be sorted before being passed to this
function. This can be done by sorting ``x`` directly or by
passing the array that sorts ``x`` (``index``).
Replicates the IDL ``UNIQ()`` function.
Parameters
----------
x : array-like
Search this array for unique items.
index : array-like, optional
This array provides the array subscripts that sort `x`. That
is::
index = np.argsort(x)
Returns
-------
result : `numpy.ndarray`_
The indices of the last occurence in `x` of its unique
values.
Notes
-----
Given a sorted array, and assuming that there is a set of
adjacent identical items, ``uniq()`` will return the subscript of
the *last* unique item. This charming feature is retained for
reproducibility.
References
----------
http://www.harrisgeospatial.com/docs/uniq.html
Speed improvement thanks to discussion here:
https://stackoverflow.com/questions/47495510/numpy-in-a-sorted-list-find-the-first-and-the-last-index-for-each-unique-value
Examples
--------
>>> import numpy as np
>>> from pypeit.core.pydl import uniq
>>> data = np.array([ 1, 2, 3, 1, 5, 6, 1, 7, 3, 2, 5, 9, 11, 1 ])
>>> print(uniq(np.sort(data)))
[ 3 5 7 9 10 11 12 13]
"""
if len(x) == 0:
raise ValueError('No unique elements in an empty array!')
if index is None:
return np.flatnonzero(np.concatenate(([True], x[1:] != x[:-1], [True])))[1:]-1
_x = x[index]
return np.flatnonzero(np.concatenate(([True], _x[1:] != _x[:-1], [True])))[1:]-1