Coverage for src/amisc/interpolator.py: 90%

362 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-29 21:38 +0000

1"""Provides interpolator classes. Interpolators manage training data and specify how to refine/gather more data. 

2 

3Includes: 

4 

5- `BaseInterpolator`: Abstract class providing basic structure of an interpolator 

6- `LagrangeInterpolator`: Concrete implementation for tensor-product barycentric Lagrange interpolation 

7""" 

8import copy 

9import itertools 

10from abc import ABC, abstractmethod 

11 

12import numpy as np 

13from scipy.optimize import direct 

14from sklearn.linear_model import Ridge 

15from sklearn.pipeline import Pipeline 

16from sklearn.preprocessing import MaxAbsScaler 

17 

18from amisc.rv import BaseRV 

19from amisc.utils import get_logger 

20 

21 

22class BaseInterpolator(ABC): 

23 """Base interpolator abstract class. 

24 

25 !!! Info "Setting the training data" 

26 You can leave the training data `xi`, `yi` empty; they can be iteratively refined later on. 

27 

28 !!! Info "Model specification" 

29 The model is a callable function of the form `ret = model(x, *args, **kwargs)`. The return value is a dictionary 

30 of the form `ret = {'y': y, 'files': files, 'cost': cost}`. In the return dictionary, you specify the raw model 

31 output `y` as an `np.ndarray` at a _minimum_. Optionally, you can specify paths to output files and the average 

32 model cost (in units of seconds of cpu time), and anything else you want. 

33 

34 :ivar beta: specifies the refinement level of this interpolator as a set of natural number indices 

35 :ivar x_vars: list of variables that fully determines the input domain of interest for interpolation 

36 :ivar xi: `(Nx, x_dim)`, interpolation points (or knots, training samples, etc.) stored as an array 

37 :ivar yi: `(Nx, y_dim)`, function values at the interpolation points, i.e. the training data 

38 :ivar _model: stores a ref to the model or function that is to be interpolated, callable as `ret = model(x)` 

39 :ivar _model_args: additional arguments to supply to the model 

40 :ivar _model_kwargs: additional keyword arguments to supply to the model 

41 :ivar model_cost: the average total cpu time (in seconds) for a single model evaluation call of one set of inputs 

42 :ivar output_files: tracks model output files corresponding to `yi` training data (for more complex models) 

43 :ivar logger: a logging utility reference 

44 

45 :vartype beta: tuple[int, ...] 

46 :vartype x_vars: list[BaseRV] 

47 :vartype xi: np.ndarray 

48 :vartype yi: np.ndarray 

49 :vartype _model: callable[np.ndarray] -> dict 

50 :vartype _model_args: tuple 

51 :vartype _model_kwargs: dict 

52 :vartype model_cost: float 

53 :vartype output_files: list[str | Path] 

54 :vartype logger: logging.Logger 

55 """ 

56 

57 def __init__(self, beta: tuple, x_vars: BaseRV | list[BaseRV], xi=None, yi=None, 

58 model=None, model_args=(), model_kwargs=None): 

59 """Construct the interpolator. 

60 

61 :param beta: refinement level indices 

62 :param x_vars: list of variables to specify input domain of interpolation 

63 :param xi: `(Nx, xdim)`, interpolation points (optional) 

64 :param yi: `(Nx, ydim)`, the function values at the interpolation points (optional) 

65 :param model: callable as {'y': y} = model(x), with `x = (..., x_dim)`, `y = (..., y_dim)` 

66 :param model_args: optional args for the model 

67 :param model_kwargs: optional kwargs for the model 

68 """ 

69 x_vars = [x_vars] if not isinstance(x_vars, list) else x_vars 

70 self.logger = get_logger(self.__class__.__name__) 

71 self._model = model 

72 self._model_args = model_args 

73 self._model_kwargs = model_kwargs if model_kwargs is not None else {} 

74 self.output_files = [] # Save output files with same indexing as xi, yi 

75 self.xi = xi # Interpolation points 

76 self.yi = yi # Function values at interpolation points 

77 self.beta = beta # Refinement level indices 

78 self.x_vars = x_vars # BaseRV() objects for each input 

79 self.model_cost = None # Total cpu time to evaluate model once (s) 

80 

81 def update_input_bds(self, idx: int, bds: tuple): 

82 """Update the input bounds at the given index. 

83 

84 :param idx: the index of the input variable to update 

85 :param bds: the new bounds for the variable 

86 """ 

87 self.x_vars[idx].update_bounds(*bds) 

88 

89 def xdim(self): 

90 """Get the dimension of the input domain.""" 

91 return len(self.x_vars) 

92 

93 def ydim(self): 

94 """Get the dimension of the outputs.""" 

95 return self.yi.shape[-1] if self.yi is not None else None 

96 

97 def save_enabled(self): 

98 """Return whether the underlying model wants to save outputs to file. 

99 

100 !!! Note 

101 You can specify that a model wants to save outputs to file by providing an `'output_dir'` kwarg. 

102 """ 

103 return self._model_kwargs.get('output_dir') is not None 

104 

105 def _fmt_input(self, x: float | list | np.ndarray) -> tuple[bool, np.ndarray]: 

106 """Helper function to make sure input `x` is an ndarray of shape `(..., xdim)`. 

107 

108 :param x: if 1d-like as (n,), then converted to 2d as (1, n) if n==xdim or (n, 1) if xdim==1 

109 :returns: `x` as at least a 2d array `(..., xdim)`, and whether `x` was originally 1d-like 

110 """ 

111 x = np.atleast_1d(x) 

112 shape_1d = len(x.shape) == 1 

113 if shape_1d: 

114 if x.shape[0] != self.xdim() and self.xdim() > 1: 

115 raise ValueError(f'Input x shape {x.shape} is incompatible with xdim of {self.xdim()}') 

116 x = np.expand_dims(x, axis=0 if x.shape[0] == self.xdim() else 1) 

117 

118 return shape_1d, x 

119 

120 def set_yi(self, yi: np.ndarray = None, model: callable = None, 

121 x_new: tuple[list[int | tuple], np.ndarray] = ()) -> dict[str: np.ndarray] | None: 

122 """Set the training data; if `yi=None`, then compute from the model. 

123 

124 !!! Warning 

125 You would use `x_new` if you wanted to compute the model at these specific locations and store the result. 

126 This will ignore anything passed in for `yi`, and it assumes a model is already specified (or passed in). 

127 

128 !!! Info 

129 You can pass in integer indices for `x_new` or tuple indices. Integers will index into `self.xi`. Tuples 

130 provide extra flexibility for more complicated indexing, e.g. they might specify indices along different 

131 coordinate directions in an N-dimensional grid. If you pass in a list of tuple indices for `x_new`, the 

132 resulting model outputs will be returned back to you in the form `dict[str: np.ndarray]`. The keys are 

133 string casts of the tuple indices, and the values are the corresponding model outputs. 

134 

135 :param yi: `(Nx, y_dim)`, training data to set, must match dimension of `self.xi` 

136 :param model: callable function, optionally overrides `self._model` 

137 :param x_new: tuple of `(idx, x)`, where `x` is an `(N_new, x_dim)` array of new interpolation points to 

138 include and `idx` specifies the indices of these points into `self.xi` 

139 :returns: dict[str: np.ndarray] if `idx` contains tuple elements, otherwise `None` 

140 """ 

141 if model is not None: 

142 self._model = model 

143 if self._model is None: 

144 error_msg = 'Model not specified for computing QoIs at interpolation grid points.' 

145 self.logger.error(error_msg) 

146 raise Exception(error_msg) 

147 

148 # Overrides anything passed in for yi (you would only be using this if yi was set previously) 

149 if x_new: 

150 new_idx = x_new[0] 

151 new_x = x_new[1] 

152 return_y = isinstance(new_idx[0], tuple) # Return y rather than storing it if tuple indices are passed in 

153 ret = dict(y=dict(), files=dict()) 

154 model_ret = self._model(new_x, *self._model_args, **self._model_kwargs) 

155 if not isinstance(model_ret, dict): 

156 self.logger.warning( 

157 f"Function {self._model} did not return a dict of the form {{'y': y}}. Please make sure" 

158 f" you do so to avoid conflicts. Returning the value directly instead...") 

159 model_ret = dict(y=model_ret) 

160 y_new, files_new, cpu_time = model_ret['y'], model_ret.get('files', None), model_ret.get('cost', 1) 

161 

162 if self.save_enabled(): 

163 for j in range(y_new.shape[0]): 

164 if return_y: 

165 ret['y'][str(new_idx[j])] = y_new[j, :].astype(np.float32) 

166 ret['files'][str(new_idx[j])] = files_new[j] 

167 else: 

168 self.yi[new_idx[j], :] = y_new[j, :].astype(np.float32) 

169 self.output_files[new_idx[j]] = files_new[j] 

170 else: 

171 for j in range(y_new.shape[0]): 

172 if return_y: 

173 ret['y'][str(new_idx[j])] = y_new[j, :].astype(np.float32) 

174 else: 

175 self.yi[new_idx[j], :] = y_new[j, :].astype(np.float32) 

176 

177 if self.model_cost is None: 

178 self.model_cost = max(1, cpu_time) 

179 

180 return ret 

181 

182 # Set yi directly 

183 if yi is not None: 

184 self.yi = yi.astype(np.float32) 

185 return 

186 

187 # Compute yi 

188 model_ret = self._model(self.xi, *self._model_args, **self._model_kwargs) 

189 if not isinstance(model_ret, dict): 

190 self.logger.warning(f"Function {self._model} did not return a dict of the form {{'y': y}}. Please make sure" 

191 f" you do so to avoid conflicts. Returning the value directly instead...") 

192 model_ret = dict(y=model_ret) 

193 

194 self.yi, self.output_files, cpu_time = model_ret['y'], model_ret.get('files', list()), model_ret.get('cost', 1) 

195 

196 if self.model_cost is None: 

197 self.model_cost = max(1, cpu_time) 

198 

199 @abstractmethod 

200 def refine(self, beta: tuple, auto=True): 

201 """Return a new interpolator with one dimension refined by one level, as specified by `beta`. 

202 

203 !!! Info "When you want to compute the model manually" 

204 You can set `auto=False`, in which case the newly refined interpolation points `x` will be returned to you 

205 along with their indices, in the form `idx, x, interp = refine(beta, auto=False)`. You might also want to 

206 do this if you did not provide a model when constructing the Interpolator (so `auto=True` won't work). 

207 

208 :param beta: the new refinement level indices, should only refine one dimension by one level 

209 :param auto: whether to automatically compute and store model at refinement points (default is True) 

210 :returns: `idx` - indices into `xi`, `x` - the new interpolation points, and `interp` - a refined 

211 BaseInterpolator object, just returns `interp` if `auto=True` 

212 """ 

213 pass 

214 

215 @abstractmethod 

216 def __call__(self, x: np.ndarray | float) -> np.ndarray: 

217 """Evaluate the interpolation at points `x`. 

218 

219 :param x: `(..., x_dim)`, the points to be interpolated, must be within the input domain for accuracy 

220 :returns y: `(..., y_dim)`, the interpolated function values 

221 """ 

222 pass 

223 

224 @abstractmethod 

225 def grad(self, x: np.ndarray | float | list, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray: 

226 """Evaluate the gradient/Jacobian at points `x` using the interpolator. 

227 

228 :param x: `(..., xdim)`, the evaluation points, must be within domain of `self.xi` for accuracy 

229 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`) 

230 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`) 

231 :returns jac: `(..., ydim, xdim)`, the Jacobian at points `x` 

232 """ 

233 

234 @abstractmethod 

235 def hessian(self, x: np.ndarray | float | list, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray: 

236 """Evaluate the Hessian at points `x` using the interpolator. 

237 

238 :param x: `(..., xdim)`, the evaluation points, must be within domain of `self.xi` for accuracy 

239 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`) 

240 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`) 

241 :returns hess: `(..., ydim, xdim, xdim)`, the Hessian at points `x` 

242 """ 

243 

244 

245class LagrangeInterpolator(BaseInterpolator): 

246 """Tensor-product (multivariate) grid interpolator, based on barycentric Lagrange polynomials. 

247 

248 !!! Info 

249 The refinement level indices `beta` are used in this class to specify anisotropic refinement along each 

250 coordinate direction of the input domain, so `x_dim = len(beta)`. 

251 

252 :ivar x_grids: univariate Leja sequence points in each 1d dimension 

253 :ivar weights: the barycentric weights corresponding to `x_grids` 

254 :ivar reduced: whether to store `xi` and `yi` training data, can set to `False` to save memory, e.g. if an external 

255 sparse grid data structure manages this data instead 

256 

257 :vartype x_grids: list[np.ndarray] 

258 :vartype weights: list[np.ndarray] 

259 :vartype reduced: bool 

260 """ 

261 

262 def __init__(self, beta: tuple, x_vars: BaseRV | list[BaseRV], init_grids=True, reduced=False, **kwargs): 

263 """Initialize a Lagrange tensor-product grid interpolator. 

264 

265 :param beta: refinement level indices for each input dimension 

266 :param x_vars: list of variables specifying bounds/pdfs for each input x 

267 :param init_grids: whether to compute 1d Leja sequences on initialization 

268 :param reduced: whether to store xi/yi matrices, e.g. set true if storing in external sparse grid structure 

269 :param **kwargs: other optional arguments (see `BaseInterpolator`) 

270 """ 

271 self.weights = [] # Barycentric weights for each dimension 

272 self.x_grids = [] # Univariate nested leja sequences in each dimension 

273 self.reduced = reduced 

274 super().__init__(beta, x_vars, **kwargs) 

275 

276 if init_grids: 

277 # Construct 1d univariate Leja sequences in each dimension 

278 grid_sizes = self.get_grid_sizes(self.beta) 

279 self.x_grids = [self.leja_1d(grid_sizes[n], self.x_vars[n].bounds(), 

280 wt_fcn=self.x_vars[n].pdf).astype(np.float32) for n in range(self.xdim())] 

281 

282 for n in range(self.xdim()): 

283 Nx = grid_sizes[n] 

284 bds = self.x_vars[n].bounds() 

285 grid = self.x_grids[n] 

286 C = (bds[1] - bds[0]) / 4.0 # Interval capacity (see Berrut and Trefethen 2004) 

287 xj = grid.reshape((Nx, 1)) 

288 xi = grid.reshape((1, Nx)) 

289 dist = (xj - xi) / C 

290 np.fill_diagonal(dist, 1) # Ignore product when i==j 

291 self.weights.append((1.0 / np.prod(dist, axis=1)).astype(np.float32)) # (Nx,) 

292 

293 # Cartesian product of univariate grids 

294 if not self.reduced: 

295 self.xi = np.empty((np.prod(grid_sizes), self.xdim()), dtype=np.float32) 

296 for i, ele in enumerate(itertools.product(*self.x_grids)): 

297 self.xi[i, :] = ele 

298 

299 def refine(self, beta, auto=True, x_refine: np.ndarray = None): 

300 """Return a new interpolator with one dimension refined by one level, specified by `beta`. 

301 

302 !!! Note 

303 If `self.reduced=True` or `auto=False`, then this function will return tuple indices `idx` corresponding 

304 to the new interpolation points `x`. The tuple indices specify one index along each input dimension. 

305 

306 :param beta: the new refinement level indices 

307 :param auto: whether to automatically compute model at refinement points 

308 :param x_refine: `(Nx,)` use this array as the refined 1d grid if provided, otherwise compute via `leja_1d` 

309 :returns: `interp` - a `LagrangeInterpolator` with a refined grid (default), otherwise if `auto=False`, 

310 returns `idx, x, interp`, where `idx` and `x` correspond to new interpolation points. 

311 """ 

312 try: 

313 # Initialize a new interpolator with the new refinement levels 

314 interp = LagrangeInterpolator(beta, self.x_vars, model=self._model, model_args=self._model_args, 

315 model_kwargs=self._model_kwargs, init_grids=False, reduced=self.reduced) 

316 

317 # Find the dimension and number of new points to add 

318 old_grid_sizes = self.get_grid_sizes(self.beta) 

319 new_grid_sizes = interp.get_grid_sizes(beta) 

320 dim_refine = 0 

321 num_new_pts = 0 

322 for idx, grid_size in enumerate(new_grid_sizes): 

323 if grid_size != old_grid_sizes[idx]: 

324 dim_refine = idx 

325 num_new_pts = grid_size - old_grid_sizes[idx] 

326 break 

327 

328 # Add points to leja grid in this dimension 

329 interp.x_grids = copy.deepcopy(self.x_grids) 

330 xi = copy.deepcopy(x_refine) if x_refine is not None else self.leja_1d(num_new_pts, 

331 interp.x_vars[dim_refine].bounds(), 

332 z_pts=interp.x_grids[dim_refine], 

333 wt_fcn=interp.x_vars[dim_refine].pdf) 

334 interp.x_grids[dim_refine] = xi.astype(np.float32) 

335 

336 # Update barycentric weights in this dimension 

337 interp.weights = copy.deepcopy(self.weights) 

338 Nx_old = old_grid_sizes[dim_refine] 

339 Nx_new = new_grid_sizes[dim_refine] 

340 old_wts = copy.deepcopy(self.weights[dim_refine]) 

341 new_wts = np.zeros(Nx_new, dtype=np.float32) 

342 new_wts[:Nx_old] = old_wts 

343 bds = interp.x_vars[dim_refine].bounds() 

344 C = (bds[1] - bds[0]) / 4.0 # Interval capacity 

345 xi = interp.x_grids[dim_refine] 

346 for j in range(Nx_old, Nx_new): 

347 new_wts[:j] *= (C / (xi[:j] - xi[j])) 

348 new_wts[j] = np.prod(C / (xi[j] - xi[:j])) 

349 interp.weights[dim_refine] = new_wts 

350 

351 # Copy yi over at existing interpolation points 

352 x_new = np.zeros((0, interp.xdim()), dtype=np.float32) 

353 x_new_idx = [] 

354 tol = 1e-12 # Tolerance for floating point comparison 

355 j = 0 # Use this idx for iterating over existing yi 

356 if not self.reduced: 

357 interp.xi = np.zeros((np.prod(new_grid_sizes), self.xdim()), dtype=np.float32) 

358 interp.yi = np.zeros((np.prod(new_grid_sizes), self.ydim()), dtype=np.float32) 

359 if self.save_enabled(): 

360 interp.output_files = [None] * np.prod(new_grid_sizes) 

361 

362 old_indices = [list(range(old_grid_sizes[n])) for n in range(self.xdim())] 

363 old_indices = list(itertools.product(*old_indices)) 

364 new_indices = [list(range(new_grid_sizes[n])) for n in range(self.xdim())] 

365 new_indices = list(itertools.product(*new_indices)) 

366 for i in range(len(new_indices)): 

367 # Get the new grid coordinate/index and physical x location/point 

368 new_x_idx = new_indices[i] 

369 new_x_pt = np.array([float(interp.x_grids[n][new_x_idx[n]]) for n in range(self.xdim())], 

370 dtype=np.float32) 

371 

372 if not self.reduced: 

373 # Store the old xi/yi and return new x points 

374 interp.xi[i, :] = new_x_pt 

375 if j < len(old_indices) and np.all(np.abs(np.array(old_indices[j]) - 

376 np.array(new_indices[i])) < tol): 

377 # If we already have this interpolation point 

378 interp.yi[i, :] = self.yi[j, :] 

379 if self.save_enabled(): 

380 interp.output_files[i] = self.output_files[j] 

381 j += 1 

382 else: 

383 # Otherwise, save new interpolation point and its index 

384 x_new = np.concatenate((x_new, new_x_pt.reshape((1, self.xdim())))) 

385 x_new_idx.append(i) 

386 else: 

387 # Just find the new x indices and return those for the reduced case 

388 if j < len(old_indices) and np.all(np.abs(np.array(old_indices[j]) - 

389 np.array(new_indices[i])) < tol): 

390 j += 1 

391 else: 

392 x_new = np.concatenate((x_new, new_x_pt.reshape((1, self.xdim())))) 

393 x_new_idx.append(new_x_idx) # Add a tuple() multi-index if not saving xi/yi 

394 

395 # Evaluate the model at new interpolation points 

396 interp.model_cost = self.model_cost 

397 if self._model is None: 

398 self.logger.warning('No model available to evaluate new interpolation points, returning the points ' 

399 'to you instead...') 

400 return x_new_idx, x_new, interp 

401 elif not auto or self.reduced: 

402 return x_new_idx, x_new, interp 

403 else: 

404 interp.set_yi(x_new=(x_new_idx, x_new)) 

405 return interp 

406 except Exception as e: 

407 import traceback 

408 tb_str = str(traceback.format_exception(e)) 

409 self.logger.error(tb_str) 

410 raise Exception(f'Original exception in refine(): {tb_str}') 

411 

412 def __call__(self, x: np.ndarray | float, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray: 

413 """Evaluate the barycentric interpolation at points `x`. 

414 

415 :param x: `(..., xdim)`, the points to be interpolated, must be within domain of `self.xi` for accuracy 

416 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`) 

417 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`) 

418 :returns y: `(..., ydim)`, the interpolated function values 

419 """ 

420 shape_1d, x = self._fmt_input(x) 

421 if yi is None: 

422 yi = self.yi.copy() 

423 if xi is None: 

424 xi = self.xi.copy() 

425 xdim = xi.shape[-1] 

426 ydim = yi.shape[-1] 

427 dims = list(range(xdim)) 

428 

429 nan_idx = np.any(np.isnan(yi), axis=-1) 

430 if np.any(nan_idx): 

431 # Use a simple linear regression fit to impute missing values (may have resulted from bad model outputs) 

432 imputer = Pipeline([('scaler', MaxAbsScaler()), ('model', Ridge(alpha=1))]) 

433 imputer.fit(xi[~nan_idx, :], yi[~nan_idx, :]) 

434 yi[nan_idx, :] = imputer.predict(xi[nan_idx, :]) 

435 

436 # Create ragged edge matrix of interpolation pts and weights 

437 grid_sizes = self.get_grid_sizes(self.beta) # For example: 

438 x_j = np.empty((xdim, max(grid_sizes))) # A= [#####-- 

439 w_j = np.empty((xdim, max(grid_sizes))) ####### 

440 x_j[:] = np.nan ###----] 

441 w_j[:] = np.nan 

442 for n in range(xdim): 

443 x_j[n, :grid_sizes[n]] = self.x_grids[n] 

444 w_j[n, :grid_sizes[n]] = self.weights[n] 

445 diff = x[..., np.newaxis] - x_j 

446 div_zero_idx = np.isclose(diff, 0, rtol=1e-4, atol=1e-8) 

447 diff[div_zero_idx] = 1 

448 quotient = w_j / diff # (..., xdim, Nx) 

449 qsum = np.nansum(quotient, axis=-1) # (..., xdim) 

450 

451 # Loop over multi-indices and compute tensor-product lagrange polynomials 

452 y = np.zeros(x.shape[:-1] + (ydim,), dtype=x.dtype) # (..., ydim) 

453 indices = [np.arange(grid_sizes[n]) for n in range(xdim)] 

454 for i, j in enumerate(itertools.product(*indices)): 

455 L_j = quotient[..., dims, j] / qsum # (..., xdim) 

456 other_pts = np.copy(div_zero_idx) 

457 other_pts[div_zero_idx[..., dims, j]] = False 

458 

459 # Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j 

460 L_j[div_zero_idx[..., dims, j]] = 1 

461 L_j[np.any(other_pts, axis=-1)] = 0 

462 

463 # Add multivariate basis polynomial contribution to interpolation output 

464 L_j = np.prod(L_j, axis=-1, keepdims=True) # (..., 1) 

465 y += L_j * yi[i, :] 

466 

467 return np.atleast_1d(np.squeeze(y)) if shape_1d else y 

468 

469 def grad(self, x: np.ndarray | float | list, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray: 

470 """Evaluate the gradient/Jacobian at points `x` using the interpolator. 

471 

472 :param x: `(..., xdim)`, the evaluation points, must be within domain of `self.xi` for accuracy 

473 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`) 

474 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`) 

475 :returns jac: `(..., ydim, xdim)`, the Jacobian at points `x` 

476 """ 

477 shape_1d, x = self._fmt_input(x) 

478 if yi is None: 

479 yi = self.yi.copy() 

480 if xi is None: 

481 xi = self.xi.copy() 

482 xdim = xi.shape[-1] 

483 ydim = yi.shape[-1] 

484 nan_idx = np.any(np.isnan(yi), axis=-1) 

485 if np.any(nan_idx): 

486 # Use a simple linear regression fit to impute missing values (may have resulted from bad model outputs) 

487 imputer = Pipeline([('scaler', MaxAbsScaler()), ('model', Ridge(alpha=1))]) 

488 imputer.fit(xi[~nan_idx, :], yi[~nan_idx, :]) 

489 yi[nan_idx, :] = imputer.predict(xi[nan_idx, :]) 

490 

491 # Create ragged edge matrix of interpolation pts and weights 

492 grid_sizes = self.get_grid_sizes(self.beta) # For example: 

493 x_j = np.empty((xdim, max(grid_sizes))) # A= [#####-- 

494 w_j = np.empty((xdim, max(grid_sizes))) ####### 

495 x_j[:] = np.nan ###----] 

496 w_j[:] = np.nan 

497 for n in range(xdim): 

498 x_j[n, :grid_sizes[n]] = self.x_grids[n] 

499 w_j[n, :grid_sizes[n]] = self.weights[n] 

500 

501 # Compute values ahead of time that will be needed for the gradient 

502 diff = x[..., np.newaxis] - x_j 

503 div_zero_idx = np.isclose(diff, 0, rtol=1e-4, atol=1e-8) 

504 diff[div_zero_idx] = 1 

505 quotient = w_j / diff # (..., xdim, Nx) 

506 qsum = np.nansum(quotient, axis=-1) # (..., xdim) 

507 sqsum = np.nansum(w_j / diff ** 2, axis=-1) # (..., xdim) 

508 

509 # Loop over multi-indices and compute derivative of tensor-product lagrange polynomials 

510 jac = np.zeros(x.shape[:-1] + (ydim, xdim), dtype=x.dtype) # (..., ydim, xdim) 

511 indices = [np.arange(grid_sizes[n]) for n in range(self.xdim())] 

512 for k in range(xdim): 

513 dims = [idx for idx in np.arange(xdim) if idx != k] 

514 for i, j in enumerate(itertools.product(*indices)): 

515 j_dims = [j[idx] for idx in dims] 

516 L_j = quotient[..., dims, j_dims] / qsum[..., dims] # (..., xdim-1) 

517 other_pts = np.copy(div_zero_idx) 

518 other_pts[div_zero_idx[..., list(np.arange(xdim)), j]] = False 

519 

520 # Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j 

521 L_j[div_zero_idx[..., dims, j_dims]] = 1 

522 L_j[np.any(other_pts[..., dims, :], axis=-1)] = 0 

523 

524 # Partial derivative of L_j with respect to x_k 

525 dLJ_dx = ((w_j[k, j[k]] / (qsum[..., k] * diff[..., k, j[k]])) * 

526 (sqsum[..., k] / qsum[..., k] - 1 / diff[..., k, j[k]])) 

527 

528 # Set derivatives when x is at the interpolation points (i.e. x==x_j) 

529 p_idx = [idx for idx in np.arange(grid_sizes[k]) if idx != j[k]] 

530 w_j_large = np.broadcast_to(w_j[k, :], x.shape[:-1] + w_j.shape[-1:]).copy() 

531 curr_j_idx = div_zero_idx[..., k, j[k]] 

532 other_j_idx = np.any(other_pts[..., k, :], axis=-1) 

533 dLJ_dx[curr_j_idx] = -np.nansum((w_j[k, p_idx] / w_j[k, j[k]]) / 

534 (x[curr_j_idx, k, np.newaxis] - x_j[k, p_idx]), axis=-1) 

535 dLJ_dx[other_j_idx] = ((w_j[k, j[k]] / w_j_large[other_pts[..., k, :]]) / 

536 (x[other_j_idx, k] - x_j[k, j[k]])) 

537 

538 dLJ_dx = np.expand_dims(dLJ_dx, axis=-1) * np.prod(L_j, axis=-1, keepdims=True) # (..., 1) 

539 

540 # Add contribution to the Jacobian 

541 jac[..., k] += dLJ_dx * yi[i, :] 

542 

543 return np.atleast_1d(np.squeeze(jac)) if shape_1d else jac 

544 

545 def hessian(self, x: np.ndarray | float | list, xi: np.ndarray = None, yi: np.ndarray = None) -> np.ndarray: 

546 """Evaluate the Hessian at points `x` using the interpolator. 

547 

548 :param x: `(..., xdim)`, the evaluation points, must be within domain of `self.xi` for accuracy 

549 :param xi: `(Ni, xdim)` optional, interpolation grid points to use (e.g. if `self.reduced=True`) 

550 :param yi: `(Ni, ydim)` optional, function values at xi to use (e.g. if `self.reduced=True`) 

551 :returns hess: `(..., ydim, xdim, xdim)`, the vector Hessian at points `x` 

552 """ 

553 shape_1d, x = self._fmt_input(x) 

554 if yi is None: 

555 yi = self.yi.copy() 

556 if xi is None: 

557 xi = self.xi.copy() 

558 xdim = xi.shape[-1] 

559 ydim = yi.shape[-1] 

560 nan_idx = np.any(np.isnan(yi), axis=-1) 

561 if np.any(nan_idx): 

562 # Use a simple linear regression fit to impute missing values (may have resulted from bad model outputs) 

563 imputer = Pipeline([('scaler', MaxAbsScaler()), ('model', Ridge(alpha=1))]) 

564 imputer.fit(xi[~nan_idx, :], yi[~nan_idx, :]) 

565 yi[nan_idx, :] = imputer.predict(xi[nan_idx, :]) 

566 

567 # Create ragged edge matrix of interpolation pts and weights 

568 grid_sizes = self.get_grid_sizes(self.beta) # For example: 

569 x_j = np.empty((xdim, max(grid_sizes))) # A= [#####-- 

570 w_j = np.empty((xdim, max(grid_sizes))) ####### 

571 x_j[:] = np.nan ###----] 

572 w_j[:] = np.nan 

573 for n in range(xdim): 

574 x_j[n, :grid_sizes[n]] = self.x_grids[n] 

575 w_j[n, :grid_sizes[n]] = self.weights[n] 

576 

577 # Compute values ahead of time that will be needed for the gradient 

578 diff = x[..., np.newaxis] - x_j 

579 div_zero_idx = np.isclose(diff, 0, rtol=1e-4, atol=1e-8) 

580 diff[div_zero_idx] = 1 

581 quotient = w_j / diff # (..., xdim, Nx) 

582 qsum = np.nansum(quotient, axis=-1) # (..., xdim) 

583 qsum_p = -np.nansum(w_j / diff ** 2, axis=-1) # (..., xdim) 

584 qsum_pp = 2 * np.nansum(w_j / diff ** 3, axis=-1) # (..., xdim) 

585 

586 # Loop over multi-indices and compute 2nd derivative of tensor-product lagrange polynomials 

587 hess = np.zeros(x.shape[:-1] + (ydim, xdim, xdim), dtype=x.dtype) # (..., ydim, xdim, xdim) 

588 indices = [np.arange(grid_sizes[n]) for n in range(self.xdim())] 

589 for m in range(xdim): 

590 for n in range(m, xdim): 

591 dims = [idx for idx in np.arange(xdim) if idx not in [m, n]] 

592 for i, j in enumerate(itertools.product(*indices)): 

593 j_dims = [j[idx] for idx in dims] 

594 L_j = quotient[..., dims, j_dims] / qsum[..., dims] # (..., xdim-2) 

595 other_pts = np.copy(div_zero_idx) 

596 other_pts[div_zero_idx[..., list(np.arange(xdim)), j]] = False 

597 

598 # Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j 

599 L_j[div_zero_idx[..., dims, j_dims]] = 1 

600 L_j[np.any(other_pts[..., dims, :], axis=-1)] = 0 

601 

602 # Cross-terms in Hessian 

603 if m != n: 

604 # Partial derivative of L_j with respect to x_m and x_n 

605 d2LJ_dx2 = np.ones(x.shape[:-1]) 

606 for k in [m, n]: 

607 dLJ_dx = ((w_j[k, j[k]] / (qsum[..., k] * diff[..., k, j[k]])) * 

608 (-qsum_p[..., k] / qsum[..., k] - 1 / diff[..., k, j[k]])) 

609 

610 # Set derivatives when x is at the interpolation points (i.e. x==x_j) 

611 p_idx = [idx for idx in np.arange(grid_sizes[k]) if idx != j[k]] 

612 w_j_large = np.broadcast_to(w_j[k, :], x.shape[:-1] + w_j.shape[-1:]).copy() 

613 curr_j_idx = div_zero_idx[..., k, j[k]] 

614 other_j_idx = np.any(other_pts[..., k, :], axis=-1) 

615 dLJ_dx[curr_j_idx] = -np.nansum((w_j[k, p_idx] / w_j[k, j[k]]) / 

616 (x[curr_j_idx, k, np.newaxis] - x_j[k, p_idx]), axis=-1) 

617 dLJ_dx[other_j_idx] = ((w_j[k, j[k]] / w_j_large[other_pts[..., k, :]]) / 

618 (x[other_j_idx, k] - x_j[k, j[k]])) 

619 

620 d2LJ_dx2 *= dLJ_dx 

621 

622 d2LJ_dx2 = np.expand_dims(d2LJ_dx2, axis=-1) * np.prod(L_j, axis=-1, keepdims=True) # (..., 1) 

623 hess[..., m, n] += d2LJ_dx2 * yi[i, :] 

624 hess[..., n, m] += d2LJ_dx2 * yi[i, :] 

625 

626 # Diagonal terms in Hessian: 

627 else: 

628 front_term = w_j[m, j[m]] / (qsum[..., m] * diff[..., m, j[m]]) 

629 first_term = (-qsum_pp[..., m] / qsum[..., m]) + 2*(qsum_p[..., m] / qsum[..., m]) ** 2 

630 second_term = (2*(qsum_p[..., m] / (qsum[..., m] * diff[..., m, j[m]])) 

631 + 2 / diff[..., m, j[m]] ** 2) 

632 d2LJ_dx2 = front_term * (first_term + second_term) 

633 

634 # Set derivatives when x is at the interpolation points (i.e. x==x_j) 

635 curr_j_idx = div_zero_idx[..., m, j[m]] 

636 other_j_idx = np.any(other_pts[..., m, :], axis=-1) 

637 if np.any(curr_j_idx) or np.any(other_j_idx): 

638 p_idx = [idx for idx in np.arange(grid_sizes[m]) if idx != j[m]] 

639 w_j_large = np.broadcast_to(w_j[m, :], x.shape[:-1] + w_j.shape[-1:]).copy() 

640 x_j_large = np.broadcast_to(x_j[m, :], x.shape[:-1] + x_j.shape[-1:]).copy() 

641 

642 # if these points are at the current j interpolation point 

643 d2LJ_dx2[curr_j_idx] = (2 * np.nansum((w_j[m, p_idx] / w_j[m, j[m]]) / 

644 (x[curr_j_idx, m, np.newaxis] - x_j[m, p_idx]), axis=-1) ** 2 + # noqa: E501 

645 2 * np.nansum((w_j[m, p_idx] / w_j[m, j[m]]) / 

646 (x[curr_j_idx, m, np.newaxis] - x_j[m, p_idx])**2, axis=-1)) # noqa: E501 

647 

648 # if these points are at any other interpolation point 

649 other_pts_inv = other_pts.copy() 

650 other_pts_inv[other_j_idx, m, :grid_sizes[m]] = np.invert(other_pts[other_j_idx, m, :grid_sizes[m]]) # noqa: E501 

651 curr_x_j = x_j_large[other_pts[..., m, :]].reshape((-1, 1)) 

652 other_x_j = x_j_large[other_pts_inv[..., m, :]].reshape((-1, len(p_idx))) 

653 curr_w_j = w_j_large[other_pts[..., m, :]].reshape((-1, 1)) 

654 other_w_j = w_j_large[other_pts_inv[..., m, :]].reshape((-1, len(p_idx))) 

655 curr_div = w_j[m, j[m]] / np.squeeze(curr_w_j, axis=-1) 

656 curr_diff = np.squeeze(curr_x_j, axis=-1) - x_j[m, j[m]] 

657 d2LJ_dx2[other_j_idx] = ((-2*curr_div / curr_diff) * (np.nansum( 

658 (other_w_j / curr_w_j) / (curr_x_j - other_x_j), axis=-1) + 1 / curr_diff)) 

659 

660 d2LJ_dx2 = np.expand_dims(d2LJ_dx2, axis=-1) * np.prod(L_j, axis=-1, keepdims=True) # (..., 1) 

661 hess[..., m, n] += d2LJ_dx2 * yi[i, :] 

662 

663 return np.atleast_1d(np.squeeze(hess)) if shape_1d else hess 

664 

665 @staticmethod 

666 def get_grid_sizes(beta: tuple, k: int = 2) -> list[int]: 

667 """Compute number of grid points in each input dimension. 

668 

669 :param beta: refinement level indices 

670 :param k: level-to-grid-size multiplier (probably just always `k=2`) 

671 :returns: list of grid sizes in each dimension 

672 """ 

673 return [k*beta[i] + 1 for i in range(len(beta))] 

674 

675 @staticmethod 

676 def leja_1d(N: int, z_bds: tuple, z_pts: np.ndarray = None, wt_fcn: callable = None) -> np.ndarray: 

677 """Find the next `N` points in the Leja sequence of `z_pts`. 

678 

679 :param N: number of new points to add to the sequence 

680 :param z_bds: bounds on the 1d domain 

681 :param z_pts: current univariate Leja sequence `(Nz,)`, start at middle of `z_bds` if `None` 

682 :param wt_fcn: weighting function, uses a constant weight if `None`, callable as `wt_fcn(z)` 

683 :returns: the Leja sequence `z_pts` augmented by `N` new points 

684 """ 

685 # if wt_fcn is None: 

686 wt_fcn = lambda z: 1 # UPDATE: ignore RV weighting, unbounded pdfs like Gaussian cause problems 

687 if z_pts is None: 

688 z_pts = (z_bds[1] + z_bds[0]) / 2 

689 N = N - 1 

690 z_pts = np.atleast_1d(z_pts).astype(np.float32) 

691 

692 # Construct Leja sequence by maximizing the Leja objective sequentially 

693 for i in range(N): 

694 obj_fun = lambda z: -wt_fcn(np.array(z).astype(np.float32)) * np.prod(np.abs(z - z_pts)) 

695 res = direct(obj_fun, [z_bds]) # Use global DIRECT optimization over 1d domain 

696 z_star = res.x 

697 z_pts = np.concatenate((z_pts, z_star)) 

698 

699 return z_pts