Coverage for src/amisc/interpolator.py: 90%
362 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-29 21:38 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-29 21:38 +0000
1"""Provides interpolator classes. Interpolators manage training data and specify how to refine/gather more data.
3Includes:
5- `BaseInterpolator`: Abstract class providing basic structure of an interpolator
6- `LagrangeInterpolator`: Concrete implementation for tensor-product barycentric Lagrange interpolation
7"""
8import copy
9import itertools
10from abc import ABC, abstractmethod
12import numpy as np
13from scipy.optimize import direct
14from sklearn.linear_model import Ridge
15from sklearn.pipeline import Pipeline
16from sklearn.preprocessing import MaxAbsScaler
18from amisc.rv import BaseRV
19from amisc.utils import get_logger
22class BaseInterpolator(ABC):
23 """Base interpolator abstract class.
25 !!! Info "Setting the training data"
26 You can leave the training data `xi`, `yi` empty; they can be iteratively refined later on.
28 !!! Info "Model specification"
29 The model is a callable function of the form `ret = model(x, *args, **kwargs)`. The return value is a dictionary
30 of the form `ret = {'y': y, 'files': files, 'cost': cost}`. In the return dictionary, you specify the raw model
31 output `y` as an `np.ndarray` at a _minimum_. Optionally, you can specify paths to output files and the average
32 model cost (in units of seconds of cpu time), and anything else you want.
34 :ivar beta: specifies the refinement level of this interpolator as a set of natural number indices
35 :ivar x_vars: list of variables that fully determines the input domain of interest for interpolation
36 :ivar xi: `(Nx, x_dim)`, interpolation points (or knots, training samples, etc.) stored as an array
37 :ivar yi: `(Nx, y_dim)`, function values at the interpolation points, i.e. the training data
38 :ivar _model: stores a ref to the model or function that is to be interpolated, callable as `ret = model(x)`
39 :ivar _model_args: additional arguments to supply to the model
40 :ivar _model_kwargs: additional keyword arguments to supply to the model
41 :ivar model_cost: the average total cpu time (in seconds) for a single model evaluation call of one set of inputs
42 :ivar output_files: tracks model output files corresponding to `yi` training data (for more complex models)
43 :ivar logger: a logging utility reference
45 :vartype beta: tuple[int, ...]
46 :vartype x_vars: list[BaseRV]
47 :vartype xi: np.ndarray
48 :vartype yi: np.ndarray
49 :vartype _model: callable[np.ndarray] -> dict
50 :vartype _model_args: tuple
51 :vartype _model_kwargs: dict
52 :vartype model_cost: float
53 :vartype output_files: list[str | Path]
54 :vartype logger: logging.Logger
55 """
57 def __init__(self, beta: tuple, x_vars: BaseRV | list[BaseRV], xi=None, yi=None,
58 model=None, model_args=(), model_kwargs=None):
59 """Construct the interpolator.
61 :param beta: refinement level indices
62 :param x_vars: list of variables to specify input domain of interpolation
63 :param xi: `(Nx, xdim)`, interpolation points (optional)
64 :param yi: `(Nx, ydim)`, the function values at the interpolation points (optional)
65 :param model: callable as {'y': y} = model(x), with `x = (..., x_dim)`, `y = (..., y_dim)`
66 :param model_args: optional args for the model
67 :param model_kwargs: optional kwargs for the model
68 """
69 x_vars = [x_vars] if not isinstance(x_vars, list) else x_vars
70 self.logger = get_logger(self.__class__.__name__)
71 self._model = model
72 self._model_args = model_args
73 self._model_kwargs = model_kwargs if model_kwargs is not None else {}
74 self.output_files = [] # Save output files with same indexing as xi, yi
75 self.xi = xi # Interpolation points
76 self.yi = yi # Function values at interpolation points
77 self.beta = beta # Refinement level indices
78 self.x_vars = x_vars # BaseRV() objects for each input
79 self.model_cost = None # Total cpu time to evaluate model once (s)
81 def update_input_bds(self, idx: int, bds: tuple):
82 """Update the input bounds at the given index.
84 :param idx: the index of the input variable to update
85 :param bds: the new bounds for the variable
86 """
87 self.x_vars[idx].update_bounds(*bds)
89 def xdim(self):
90 """Get the dimension of the input domain."""
91 return len(self.x_vars)
93 def ydim(self):
94 """Get the dimension of the outputs."""
95 return self.yi.shape[-1] if self.yi is not None else None
97 def save_enabled(self):
98 """Return whether the underlying model wants to save outputs to file.
100 !!! Note
101 You can specify that a model wants to save outputs to file by providing an `'output_dir'` kwarg.
102 """
103 return self._model_kwargs.get('output_dir') is not None
105 def _fmt_input(self, x: float | list | np.ndarray) -> tuple[bool, np.ndarray]:
106 """Helper function to make sure input `x` is an ndarray of shape `(..., xdim)`.
108 :param x: if 1d-like as (n,), then converted to 2d as (1, n) if n==xdim or (n, 1) if xdim==1
109 :returns: `x` as at least a 2d array `(..., xdim)`, and whether `x` was originally 1d-like
110 """
111 x = np.atleast_1d(x)
112 shape_1d = len(x.shape) == 1
113 if shape_1d:
114 if x.shape[0] != self.xdim() and self.xdim() > 1:
115 raise ValueError(f'Input x shape {x.shape} is incompatible with xdim of {self.xdim()}')
116 x = np.expand_dims(x, axis=0 if x.shape[0] == self.xdim() else 1)
118 return shape_1d, x
120 def set_yi(self, yi: np.ndarray = None, model: callable = None,
121 x_new: tuple[list[int | tuple], np.ndarray] = ()) -> dict[str: np.ndarray] | None:
122 """Set the training data; if `yi=None`, then compute from the model.
124 !!! Warning
125 You would use `x_new` if you wanted to compute the model at these specific locations and store the result.
126 This will ignore anything passed in for `yi`, and it assumes a model is already specified (or passed in).
128 !!! Info
129 You can pass in integer indices for `x_new` or tuple indices. Integers will index into `self.xi`. Tuples
130 provide extra flexibility for more complicated indexing, e.g. they might specify indices along different
131 coordinate directions in an N-dimensional grid. If you pass in a list of tuple indices for `x_new`, the
132 resulting model outputs will be returned back to you in the form `dict[str: np.ndarray]`. The keys are
133 string casts of the tuple indices, and the values are the corresponding model outputs.
135 :param yi: `(Nx, y_dim)`, training data to set, must match dimension of `self.xi`
136 :param model: callable function, optionally overrides `self._model`
137 :param x_new: tuple of `(idx, x)`, where `x` is an `(N_new, x_dim)` array of new interpolation points to
138 include and `idx` specifies the indices of these points into `self.xi`
139 :returns: dict[str: np.ndarray] if `idx` contains tuple elements, otherwise `None`
140 """
141 if model is not None:
142 self._model = model
143 if self._model is None:
144 error_msg = 'Model not specified for computing QoIs at interpolation grid points.'
145 self.logger.error(error_msg)
146 raise Exception(error_msg)
148 # Overrides anything passed in for yi (you would only be using this if yi was set previously)
149 if x_new:
150 new_idx = x_new[0]
151 new_x = x_new[1]
152 return_y = isinstance(new_idx[0], tuple) # Return y rather than storing it if tuple indices are passed in
153 ret = dict(y=dict(), files=dict())
154 model_ret = self._model(new_x, *self._model_args, **self._model_kwargs)
155 if not isinstance(model_ret, dict):
156 self.logger.warning(
157 f"Function {self._model} did not return a dict of the form {{'y': y}}. Please make sure"
158 f" you do so to avoid conflicts. Returning the value directly instead...")
159 model_ret = dict(y=model_ret)
160 y_new, files_new, cpu_time = model_ret['y'], model_ret.get('files', None), model_ret.get('cost', 1)
162 if self.save_enabled():
163 for j in range(y_new.shape[0]):
164 if return_y:
165 ret['y'][str(new_idx[j])] = y_new[j, :].astype(np.float32)
166 ret['files'][str(new_idx[j])] = files_new[j]
167 else:
168 self.yi[new_idx[j], :] = y_new[j, :].astype(np.float32)
169 self.output_files[new_idx[j]] = files_new[j]
170 else:
171 for j in range(y_new.shape[0]):
172 if return_y:
173 ret['y'][str(new_idx[j])] = y_new[j, :].astype(np.float32)
174 else:
175 self.yi[new_idx[j], :] = y_new[j, :].astype(np.float32)
177 if self.model_cost is None:
178 self.model_cost = max(1, cpu_time)
180 return ret
182 # Set yi directly
183 if yi is not None:
184 self.yi = yi.astype(np.float32)
185 return
187 # Compute yi
188 model_ret = self._model(self.xi, *self._model_args, **self._model_kwargs)
189 if not isinstance(model_ret, dict):
190 self.logger.warning(f"Function {self._model} did not return a dict of the form {{'y': y}}. Please make sure"
191 f" you do so to avoid conflicts. Returning the value directly instead...")
192 model_ret = dict(y=model_ret)
194 self.yi, self.output_files, cpu_time = model_ret['y'], model_ret.get('files', list()), model_ret.get('cost', 1)
196 if self.model_cost is None:
197 self.model_cost = max(1, cpu_time)
199 @abstractmethod
200 def refine(self, beta: tuple, auto=True):
201 """Return a new interpolator with one dimension refined by one level, as specified by `beta`.
203 !!! Info "When you want to compute the model manually"
204 You can set `auto=False`, in which case the newly refined interpolation points `x` will be returned to you
205 along with their indices, in the form `idx, x, interp = refine(beta, auto=False)`. You might also want to
206 do this if you did not provide a model when constructing the Interpolator (so `auto=True` won't work).
208 :param beta: the new refinement level indices, should only refine one dimension by one level
209 :param auto: whether to automatically compute and store model at refinement points (default is True)
210 :returns: `idx` - indices into `xi`, `x` - the new interpolation points, and `interp` - a refined
211 BaseInterpolator object, just returns `interp` if `auto=True`
212 """
213 pass
215 @abstractmethod
216 def __call__(self, x: np.ndarray | float) -> np.ndarray:
217 """Evaluate the interpolation at points `x`.
219 :param x: `(..., x_dim)`, the points to be interpolated, must be within the input domain for accuracy
220 :returns y: `(..., y_dim)`, the interpolated function values
221 """
222 pass
224 @abstractmethod
225 def grad(self, x: np.ndarray | float | list, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray:
226 """Evaluate the gradient/Jacobian at points `x` using the interpolator.
228 :param x: `(..., xdim)`, the evaluation points, must be within domain of `self.xi` for accuracy
229 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`)
230 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`)
231 :returns jac: `(..., ydim, xdim)`, the Jacobian at points `x`
232 """
234 @abstractmethod
235 def hessian(self, x: np.ndarray | float | list, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray:
236 """Evaluate the Hessian at points `x` using the interpolator.
238 :param x: `(..., xdim)`, the evaluation points, must be within domain of `self.xi` for accuracy
239 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`)
240 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`)
241 :returns hess: `(..., ydim, xdim, xdim)`, the Hessian at points `x`
242 """
245class LagrangeInterpolator(BaseInterpolator):
246 """Tensor-product (multivariate) grid interpolator, based on barycentric Lagrange polynomials.
248 !!! Info
249 The refinement level indices `beta` are used in this class to specify anisotropic refinement along each
250 coordinate direction of the input domain, so `x_dim = len(beta)`.
252 :ivar x_grids: univariate Leja sequence points in each 1d dimension
253 :ivar weights: the barycentric weights corresponding to `x_grids`
254 :ivar reduced: whether to store `xi` and `yi` training data, can set to `False` to save memory, e.g. if an external
255 sparse grid data structure manages this data instead
257 :vartype x_grids: list[np.ndarray]
258 :vartype weights: list[np.ndarray]
259 :vartype reduced: bool
260 """
262 def __init__(self, beta: tuple, x_vars: BaseRV | list[BaseRV], init_grids=True, reduced=False, **kwargs):
263 """Initialize a Lagrange tensor-product grid interpolator.
265 :param beta: refinement level indices for each input dimension
266 :param x_vars: list of variables specifying bounds/pdfs for each input x
267 :param init_grids: whether to compute 1d Leja sequences on initialization
268 :param reduced: whether to store xi/yi matrices, e.g. set true if storing in external sparse grid structure
269 :param **kwargs: other optional arguments (see `BaseInterpolator`)
270 """
271 self.weights = [] # Barycentric weights for each dimension
272 self.x_grids = [] # Univariate nested leja sequences in each dimension
273 self.reduced = reduced
274 super().__init__(beta, x_vars, **kwargs)
276 if init_grids:
277 # Construct 1d univariate Leja sequences in each dimension
278 grid_sizes = self.get_grid_sizes(self.beta)
279 self.x_grids = [self.leja_1d(grid_sizes[n], self.x_vars[n].bounds(),
280 wt_fcn=self.x_vars[n].pdf).astype(np.float32) for n in range(self.xdim())]
282 for n in range(self.xdim()):
283 Nx = grid_sizes[n]
284 bds = self.x_vars[n].bounds()
285 grid = self.x_grids[n]
286 C = (bds[1] - bds[0]) / 4.0 # Interval capacity (see Berrut and Trefethen 2004)
287 xj = grid.reshape((Nx, 1))
288 xi = grid.reshape((1, Nx))
289 dist = (xj - xi) / C
290 np.fill_diagonal(dist, 1) # Ignore product when i==j
291 self.weights.append((1.0 / np.prod(dist, axis=1)).astype(np.float32)) # (Nx,)
293 # Cartesian product of univariate grids
294 if not self.reduced:
295 self.xi = np.empty((np.prod(grid_sizes), self.xdim()), dtype=np.float32)
296 for i, ele in enumerate(itertools.product(*self.x_grids)):
297 self.xi[i, :] = ele
299 def refine(self, beta, auto=True, x_refine: np.ndarray = None):
300 """Return a new interpolator with one dimension refined by one level, specified by `beta`.
302 !!! Note
303 If `self.reduced=True` or `auto=False`, then this function will return tuple indices `idx` corresponding
304 to the new interpolation points `x`. The tuple indices specify one index along each input dimension.
306 :param beta: the new refinement level indices
307 :param auto: whether to automatically compute model at refinement points
308 :param x_refine: `(Nx,)` use this array as the refined 1d grid if provided, otherwise compute via `leja_1d`
309 :returns: `interp` - a `LagrangeInterpolator` with a refined grid (default), otherwise if `auto=False`,
310 returns `idx, x, interp`, where `idx` and `x` correspond to new interpolation points.
311 """
312 try:
313 # Initialize a new interpolator with the new refinement levels
314 interp = LagrangeInterpolator(beta, self.x_vars, model=self._model, model_args=self._model_args,
315 model_kwargs=self._model_kwargs, init_grids=False, reduced=self.reduced)
317 # Find the dimension and number of new points to add
318 old_grid_sizes = self.get_grid_sizes(self.beta)
319 new_grid_sizes = interp.get_grid_sizes(beta)
320 dim_refine = 0
321 num_new_pts = 0
322 for idx, grid_size in enumerate(new_grid_sizes):
323 if grid_size != old_grid_sizes[idx]:
324 dim_refine = idx
325 num_new_pts = grid_size - old_grid_sizes[idx]
326 break
328 # Add points to leja grid in this dimension
329 interp.x_grids = copy.deepcopy(self.x_grids)
330 xi = copy.deepcopy(x_refine) if x_refine is not None else self.leja_1d(num_new_pts,
331 interp.x_vars[dim_refine].bounds(),
332 z_pts=interp.x_grids[dim_refine],
333 wt_fcn=interp.x_vars[dim_refine].pdf)
334 interp.x_grids[dim_refine] = xi.astype(np.float32)
336 # Update barycentric weights in this dimension
337 interp.weights = copy.deepcopy(self.weights)
338 Nx_old = old_grid_sizes[dim_refine]
339 Nx_new = new_grid_sizes[dim_refine]
340 old_wts = copy.deepcopy(self.weights[dim_refine])
341 new_wts = np.zeros(Nx_new, dtype=np.float32)
342 new_wts[:Nx_old] = old_wts
343 bds = interp.x_vars[dim_refine].bounds()
344 C = (bds[1] - bds[0]) / 4.0 # Interval capacity
345 xi = interp.x_grids[dim_refine]
346 for j in range(Nx_old, Nx_new):
347 new_wts[:j] *= (C / (xi[:j] - xi[j]))
348 new_wts[j] = np.prod(C / (xi[j] - xi[:j]))
349 interp.weights[dim_refine] = new_wts
351 # Copy yi over at existing interpolation points
352 x_new = np.zeros((0, interp.xdim()), dtype=np.float32)
353 x_new_idx = []
354 tol = 1e-12 # Tolerance for floating point comparison
355 j = 0 # Use this idx for iterating over existing yi
356 if not self.reduced:
357 interp.xi = np.zeros((np.prod(new_grid_sizes), self.xdim()), dtype=np.float32)
358 interp.yi = np.zeros((np.prod(new_grid_sizes), self.ydim()), dtype=np.float32)
359 if self.save_enabled():
360 interp.output_files = [None] * np.prod(new_grid_sizes)
362 old_indices = [list(range(old_grid_sizes[n])) for n in range(self.xdim())]
363 old_indices = list(itertools.product(*old_indices))
364 new_indices = [list(range(new_grid_sizes[n])) for n in range(self.xdim())]
365 new_indices = list(itertools.product(*new_indices))
366 for i in range(len(new_indices)):
367 # Get the new grid coordinate/index and physical x location/point
368 new_x_idx = new_indices[i]
369 new_x_pt = np.array([float(interp.x_grids[n][new_x_idx[n]]) for n in range(self.xdim())],
370 dtype=np.float32)
372 if not self.reduced:
373 # Store the old xi/yi and return new x points
374 interp.xi[i, :] = new_x_pt
375 if j < len(old_indices) and np.all(np.abs(np.array(old_indices[j]) -
376 np.array(new_indices[i])) < tol):
377 # If we already have this interpolation point
378 interp.yi[i, :] = self.yi[j, :]
379 if self.save_enabled():
380 interp.output_files[i] = self.output_files[j]
381 j += 1
382 else:
383 # Otherwise, save new interpolation point and its index
384 x_new = np.concatenate((x_new, new_x_pt.reshape((1, self.xdim()))))
385 x_new_idx.append(i)
386 else:
387 # Just find the new x indices and return those for the reduced case
388 if j < len(old_indices) and np.all(np.abs(np.array(old_indices[j]) -
389 np.array(new_indices[i])) < tol):
390 j += 1
391 else:
392 x_new = np.concatenate((x_new, new_x_pt.reshape((1, self.xdim()))))
393 x_new_idx.append(new_x_idx) # Add a tuple() multi-index if not saving xi/yi
395 # Evaluate the model at new interpolation points
396 interp.model_cost = self.model_cost
397 if self._model is None:
398 self.logger.warning('No model available to evaluate new interpolation points, returning the points '
399 'to you instead...')
400 return x_new_idx, x_new, interp
401 elif not auto or self.reduced:
402 return x_new_idx, x_new, interp
403 else:
404 interp.set_yi(x_new=(x_new_idx, x_new))
405 return interp
406 except Exception as e:
407 import traceback
408 tb_str = str(traceback.format_exception(e))
409 self.logger.error(tb_str)
410 raise Exception(f'Original exception in refine(): {tb_str}')
412 def __call__(self, x: np.ndarray | float, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray:
413 """Evaluate the barycentric interpolation at points `x`.
415 :param x: `(..., xdim)`, the points to be interpolated, must be within domain of `self.xi` for accuracy
416 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`)
417 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`)
418 :returns y: `(..., ydim)`, the interpolated function values
419 """
420 shape_1d, x = self._fmt_input(x)
421 if yi is None:
422 yi = self.yi.copy()
423 if xi is None:
424 xi = self.xi.copy()
425 xdim = xi.shape[-1]
426 ydim = yi.shape[-1]
427 dims = list(range(xdim))
429 nan_idx = np.any(np.isnan(yi), axis=-1)
430 if np.any(nan_idx):
431 # Use a simple linear regression fit to impute missing values (may have resulted from bad model outputs)
432 imputer = Pipeline([('scaler', MaxAbsScaler()), ('model', Ridge(alpha=1))])
433 imputer.fit(xi[~nan_idx, :], yi[~nan_idx, :])
434 yi[nan_idx, :] = imputer.predict(xi[nan_idx, :])
436 # Create ragged edge matrix of interpolation pts and weights
437 grid_sizes = self.get_grid_sizes(self.beta) # For example:
438 x_j = np.empty((xdim, max(grid_sizes))) # A= [#####--
439 w_j = np.empty((xdim, max(grid_sizes))) #######
440 x_j[:] = np.nan ###----]
441 w_j[:] = np.nan
442 for n in range(xdim):
443 x_j[n, :grid_sizes[n]] = self.x_grids[n]
444 w_j[n, :grid_sizes[n]] = self.weights[n]
445 diff = x[..., np.newaxis] - x_j
446 div_zero_idx = np.isclose(diff, 0, rtol=1e-4, atol=1e-8)
447 diff[div_zero_idx] = 1
448 quotient = w_j / diff # (..., xdim, Nx)
449 qsum = np.nansum(quotient, axis=-1) # (..., xdim)
451 # Loop over multi-indices and compute tensor-product lagrange polynomials
452 y = np.zeros(x.shape[:-1] + (ydim,), dtype=x.dtype) # (..., ydim)
453 indices = [np.arange(grid_sizes[n]) for n in range(xdim)]
454 for i, j in enumerate(itertools.product(*indices)):
455 L_j = quotient[..., dims, j] / qsum # (..., xdim)
456 other_pts = np.copy(div_zero_idx)
457 other_pts[div_zero_idx[..., dims, j]] = False
459 # Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j
460 L_j[div_zero_idx[..., dims, j]] = 1
461 L_j[np.any(other_pts, axis=-1)] = 0
463 # Add multivariate basis polynomial contribution to interpolation output
464 L_j = np.prod(L_j, axis=-1, keepdims=True) # (..., 1)
465 y += L_j * yi[i, :]
467 return np.atleast_1d(np.squeeze(y)) if shape_1d else y
469 def grad(self, x: np.ndarray | float | list, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray:
470 """Evaluate the gradient/Jacobian at points `x` using the interpolator.
472 :param x: `(..., xdim)`, the evaluation points, must be within domain of `self.xi` for accuracy
473 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`)
474 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`)
475 :returns jac: `(..., ydim, xdim)`, the Jacobian at points `x`
476 """
477 shape_1d, x = self._fmt_input(x)
478 if yi is None:
479 yi = self.yi.copy()
480 if xi is None:
481 xi = self.xi.copy()
482 xdim = xi.shape[-1]
483 ydim = yi.shape[-1]
484 nan_idx = np.any(np.isnan(yi), axis=-1)
485 if np.any(nan_idx):
486 # Use a simple linear regression fit to impute missing values (may have resulted from bad model outputs)
487 imputer = Pipeline([('scaler', MaxAbsScaler()), ('model', Ridge(alpha=1))])
488 imputer.fit(xi[~nan_idx, :], yi[~nan_idx, :])
489 yi[nan_idx, :] = imputer.predict(xi[nan_idx, :])
491 # Create ragged edge matrix of interpolation pts and weights
492 grid_sizes = self.get_grid_sizes(self.beta) # For example:
493 x_j = np.empty((xdim, max(grid_sizes))) # A= [#####--
494 w_j = np.empty((xdim, max(grid_sizes))) #######
495 x_j[:] = np.nan ###----]
496 w_j[:] = np.nan
497 for n in range(xdim):
498 x_j[n, :grid_sizes[n]] = self.x_grids[n]
499 w_j[n, :grid_sizes[n]] = self.weights[n]
501 # Compute values ahead of time that will be needed for the gradient
502 diff = x[..., np.newaxis] - x_j
503 div_zero_idx = np.isclose(diff, 0, rtol=1e-4, atol=1e-8)
504 diff[div_zero_idx] = 1
505 quotient = w_j / diff # (..., xdim, Nx)
506 qsum = np.nansum(quotient, axis=-1) # (..., xdim)
507 sqsum = np.nansum(w_j / diff ** 2, axis=-1) # (..., xdim)
509 # Loop over multi-indices and compute derivative of tensor-product lagrange polynomials
510 jac = np.zeros(x.shape[:-1] + (ydim, xdim), dtype=x.dtype) # (..., ydim, xdim)
511 indices = [np.arange(grid_sizes[n]) for n in range(self.xdim())]
512 for k in range(xdim):
513 dims = [idx for idx in np.arange(xdim) if idx != k]
514 for i, j in enumerate(itertools.product(*indices)):
515 j_dims = [j[idx] for idx in dims]
516 L_j = quotient[..., dims, j_dims] / qsum[..., dims] # (..., xdim-1)
517 other_pts = np.copy(div_zero_idx)
518 other_pts[div_zero_idx[..., list(np.arange(xdim)), j]] = False
520 # Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j
521 L_j[div_zero_idx[..., dims, j_dims]] = 1
522 L_j[np.any(other_pts[..., dims, :], axis=-1)] = 0
524 # Partial derivative of L_j with respect to x_k
525 dLJ_dx = ((w_j[k, j[k]] / (qsum[..., k] * diff[..., k, j[k]])) *
526 (sqsum[..., k] / qsum[..., k] - 1 / diff[..., k, j[k]]))
528 # Set derivatives when x is at the interpolation points (i.e. x==x_j)
529 p_idx = [idx for idx in np.arange(grid_sizes[k]) if idx != j[k]]
530 w_j_large = np.broadcast_to(w_j[k, :], x.shape[:-1] + w_j.shape[-1:]).copy()
531 curr_j_idx = div_zero_idx[..., k, j[k]]
532 other_j_idx = np.any(other_pts[..., k, :], axis=-1)
533 dLJ_dx[curr_j_idx] = -np.nansum((w_j[k, p_idx] / w_j[k, j[k]]) /
534 (x[curr_j_idx, k, np.newaxis] - x_j[k, p_idx]), axis=-1)
535 dLJ_dx[other_j_idx] = ((w_j[k, j[k]] / w_j_large[other_pts[..., k, :]]) /
536 (x[other_j_idx, k] - x_j[k, j[k]]))
538 dLJ_dx = np.expand_dims(dLJ_dx, axis=-1) * np.prod(L_j, axis=-1, keepdims=True) # (..., 1)
540 # Add contribution to the Jacobian
541 jac[..., k] += dLJ_dx * yi[i, :]
543 return np.atleast_1d(np.squeeze(jac)) if shape_1d else jac
545 def hessian(self, x: np.ndarray | float | list, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray:
546 """Evaluate the Hessian at points `x` using the interpolator.
548 :param x: `(..., xdim)`, the evaluation points, must be within domain of `self.xi` for accuracy
549 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`)
550 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`)
551 :returns hess: `(..., ydim, xdim, xdim)`, the vector Hessian at points `x`
552 """
553 shape_1d, x = self._fmt_input(x)
554 if yi is None:
555 yi = self.yi.copy()
556 if xi is None:
557 xi = self.xi.copy()
558 xdim = xi.shape[-1]
559 ydim = yi.shape[-1]
560 nan_idx = np.any(np.isnan(yi), axis=-1)
561 if np.any(nan_idx):
562 # Use a simple linear regression fit to impute missing values (may have resulted from bad model outputs)
563 imputer = Pipeline([('scaler', MaxAbsScaler()), ('model', Ridge(alpha=1))])
564 imputer.fit(xi[~nan_idx, :], yi[~nan_idx, :])
565 yi[nan_idx, :] = imputer.predict(xi[nan_idx, :])
567 # Create ragged edge matrix of interpolation pts and weights
568 grid_sizes = self.get_grid_sizes(self.beta) # For example:
569 x_j = np.empty((xdim, max(grid_sizes))) # A= [#####--
570 w_j = np.empty((xdim, max(grid_sizes))) #######
571 x_j[:] = np.nan ###----]
572 w_j[:] = np.nan
573 for n in range(xdim):
574 x_j[n, :grid_sizes[n]] = self.x_grids[n]
575 w_j[n, :grid_sizes[n]] = self.weights[n]
577 # Compute values ahead of time that will be needed for the gradient
578 diff = x[..., np.newaxis] - x_j
579 div_zero_idx = np.isclose(diff, 0, rtol=1e-4, atol=1e-8)
580 diff[div_zero_idx] = 1
581 quotient = w_j / diff # (..., xdim, Nx)
582 qsum = np.nansum(quotient, axis=-1) # (..., xdim)
583 qsum_p = -np.nansum(w_j / diff ** 2, axis=-1) # (..., xdim)
584 qsum_pp = 2 * np.nansum(w_j / diff ** 3, axis=-1) # (..., xdim)
586 # Loop over multi-indices and compute 2nd derivative of tensor-product lagrange polynomials
587 hess = np.zeros(x.shape[:-1] + (ydim, xdim, xdim), dtype=x.dtype) # (..., ydim, xdim, xdim)
588 indices = [np.arange(grid_sizes[n]) for n in range(self.xdim())]
589 for m in range(xdim):
590 for n in range(m, xdim):
591 dims = [idx for idx in np.arange(xdim) if idx not in [m, n]]
592 for i, j in enumerate(itertools.product(*indices)):
593 j_dims = [j[idx] for idx in dims]
594 L_j = quotient[..., dims, j_dims] / qsum[..., dims] # (..., xdim-2)
595 other_pts = np.copy(div_zero_idx)
596 other_pts[div_zero_idx[..., list(np.arange(xdim)), j]] = False
598 # Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j
599 L_j[div_zero_idx[..., dims, j_dims]] = 1
600 L_j[np.any(other_pts[..., dims, :], axis=-1)] = 0
602 # Cross-terms in Hessian
603 if m != n:
604 # Partial derivative of L_j with respect to x_m and x_n
605 d2LJ_dx2 = np.ones(x.shape[:-1])
606 for k in [m, n]:
607 dLJ_dx = ((w_j[k, j[k]] / (qsum[..., k] * diff[..., k, j[k]])) *
608 (-qsum_p[..., k] / qsum[..., k] - 1 / diff[..., k, j[k]]))
610 # Set derivatives when x is at the interpolation points (i.e. x==x_j)
611 p_idx = [idx for idx in np.arange(grid_sizes[k]) if idx != j[k]]
612 w_j_large = np.broadcast_to(w_j[k, :], x.shape[:-1] + w_j.shape[-1:]).copy()
613 curr_j_idx = div_zero_idx[..., k, j[k]]
614 other_j_idx = np.any(other_pts[..., k, :], axis=-1)
615 dLJ_dx[curr_j_idx] = -np.nansum((w_j[k, p_idx] / w_j[k, j[k]]) /
616 (x[curr_j_idx, k, np.newaxis] - x_j[k, p_idx]), axis=-1)
617 dLJ_dx[other_j_idx] = ((w_j[k, j[k]] / w_j_large[other_pts[..., k, :]]) /
618 (x[other_j_idx, k] - x_j[k, j[k]]))
620 d2LJ_dx2 *= dLJ_dx
622 d2LJ_dx2 = np.expand_dims(d2LJ_dx2, axis=-1) * np.prod(L_j, axis=-1, keepdims=True) # (..., 1)
623 hess[..., m, n] += d2LJ_dx2 * yi[i, :]
624 hess[..., n, m] += d2LJ_dx2 * yi[i, :]
626 # Diagonal terms in Hessian:
627 else:
628 front_term = w_j[m, j[m]] / (qsum[..., m] * diff[..., m, j[m]])
629 first_term = (-qsum_pp[..., m] / qsum[..., m]) + 2*(qsum_p[..., m] / qsum[..., m]) ** 2
630 second_term = (2*(qsum_p[..., m] / (qsum[..., m] * diff[..., m, j[m]]))
631 + 2 / diff[..., m, j[m]] ** 2)
632 d2LJ_dx2 = front_term * (first_term + second_term)
634 # Set derivatives when x is at the interpolation points (i.e. x==x_j)
635 curr_j_idx = div_zero_idx[..., m, j[m]]
636 other_j_idx = np.any(other_pts[..., m, :], axis=-1)
637 if np.any(curr_j_idx) or np.any(other_j_idx):
638 p_idx = [idx for idx in np.arange(grid_sizes[m]) if idx != j[m]]
639 w_j_large = np.broadcast_to(w_j[m, :], x.shape[:-1] + w_j.shape[-1:]).copy()
640 x_j_large = np.broadcast_to(x_j[m, :], x.shape[:-1] + x_j.shape[-1:]).copy()
642 # if these points are at the current j interpolation point
643 d2LJ_dx2[curr_j_idx] = (2 * np.nansum((w_j[m, p_idx] / w_j[m, j[m]]) /
644 (x[curr_j_idx, m, np.newaxis] - x_j[m, p_idx]), axis=-1) ** 2 + # noqa: E501
645 2 * np.nansum((w_j[m, p_idx] / w_j[m, j[m]]) /
646 (x[curr_j_idx, m, np.newaxis] - x_j[m, p_idx])**2, axis=-1)) # noqa: E501
648 # if these points are at any other interpolation point
649 other_pts_inv = other_pts.copy()
650 other_pts_inv[other_j_idx, m, :grid_sizes[m]] = np.invert(other_pts[other_j_idx, m, :grid_sizes[m]]) # noqa: E501
651 curr_x_j = x_j_large[other_pts[..., m, :]].reshape((-1, 1))
652 other_x_j = x_j_large[other_pts_inv[..., m, :]].reshape((-1, len(p_idx)))
653 curr_w_j = w_j_large[other_pts[..., m, :]].reshape((-1, 1))
654 other_w_j = w_j_large[other_pts_inv[..., m, :]].reshape((-1, len(p_idx)))
655 curr_div = w_j[m, j[m]] / np.squeeze(curr_w_j, axis=-1)
656 curr_diff = np.squeeze(curr_x_j, axis=-1) - x_j[m, j[m]]
657 d2LJ_dx2[other_j_idx] = ((-2*curr_div / curr_diff) * (np.nansum(
658 (other_w_j / curr_w_j) / (curr_x_j - other_x_j), axis=-1) + 1 / curr_diff))
660 d2LJ_dx2 = np.expand_dims(d2LJ_dx2, axis=-1) * np.prod(L_j, axis=-1, keepdims=True) # (..., 1)
661 hess[..., m, n] += d2LJ_dx2 * yi[i, :]
663 return np.atleast_1d(np.squeeze(hess)) if shape_1d else hess
665 @staticmethod
666 def get_grid_sizes(beta: tuple, k: int = 2) -> list[int]:
667 """Compute number of grid points in each input dimension.
669 :param beta: refinement level indices
670 :param k: level-to-grid-size multiplier (probably just always `k=2`)
671 :returns: list of grid sizes in each dimension
672 """
673 return [k*beta[i] + 1 for i in range(len(beta))]
675 @staticmethod
676 def leja_1d(N: int, z_bds: tuple, z_pts: np.ndarray = None, wt_fcn: callable = None) -> np.ndarray:
677 """Find the next `N` points in the Leja sequence of `z_pts`.
679 :param N: number of new points to add to the sequence
680 :param z_bds: bounds on the 1d domain
681 :param z_pts: current univariate Leja sequence `(Nz,)`, start at middle of `z_bds` if `None`
682 :param wt_fcn: weighting function, uses a constant weight if `None`, callable as `wt_fcn(z)`
683 :returns: the Leja sequence `z_pts` augmented by `N` new points
684 """
685 # if wt_fcn is None:
686 wt_fcn = lambda z: 1 # UPDATE: ignore RV weighting, unbounded pdfs like Gaussian cause problems
687 if z_pts is None:
688 z_pts = (z_bds[1] + z_bds[0]) / 2
689 N = N - 1
690 z_pts = np.atleast_1d(z_pts).astype(np.float32)
692 # Construct Leja sequence by maximizing the Leja objective sequentially
693 for i in range(N):
694 obj_fun = lambda z: -wt_fcn(np.array(z).astype(np.float32)) * np.prod(np.abs(z - z_pts))
695 res = direct(obj_fun, [z_bds]) # Use global DIRECT optimization over 1d domain
696 z_star = res.x
697 z_pts = np.concatenate((z_pts, z_star))
699 return z_pts