Coverage for src/amisc/component.py: 68%

496 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-29 21:38 +0000

1"""A Component is an `amisc` wrapper around a single discipline model. It manages surrogate construction and optionally 

2a hierarchy of modeling fidelities that may be available. Concrete component classes all inherit from the base 

3`ComponentSurrogate` class provided here. Components manage an array of `BaseInterpolator` objects to form a 

4multifidelity hierarchy. 

5 

6Includes: 

7 

8- `ComponentSurrogate`: the base class that is fundamental to the adaptive multi-index stochastic collocation strategy 

9- `SparseGridSurrogate`: an AMISC component that manages a hierarchy of `LagrangeInterpolator` objects 

10- `AnalyticalSurrogate`: a light wrapper around a single discipline model that does not require surrogate approximation 

11""" 

12import ast 

13import copy 

14import itertools 

15import os 

16import tempfile 

17from abc import ABC, abstractmethod 

18from concurrent.futures import ALL_COMPLETED, Executor, wait 

19from pathlib import Path 

20 

21import numpy as np 

22from joblib import delayed 

23from sklearn.linear_model import Ridge 

24from sklearn.pipeline import Pipeline 

25from sklearn.preprocessing import MaxAbsScaler 

26 

27from amisc import IndexSet, InterpResults, MiscTree 

28from amisc.interpolator import BaseInterpolator, LagrangeInterpolator 

29from amisc.rv import BaseRV 

30from amisc.utils import get_logger 

31 

32 

33class ComponentSurrogate(ABC): 

34 """The base multi-index stochastic collocation (MISC) surrogate class for a single discipline component model. 

35 

36 !!! Info "Multi-indices" 

37 A multi-index is a tuple of natural numbers, each specifying a level of fidelity. You will frequently see two 

38 multi-indices: `alpha` and `beta`. The `alpha` (or $\\alpha$) indices specify physical model fidelity and get 

39 passed to the model as an additional argument (e.g. things like discretization level, time step size, etc.). 

40 The `beta` (or $\\beta$) indices specify surrogate refinement level, so typically an indication of the amount of 

41 training data used. Each fidelity index in $\\alpha$ and $\\beta$ increase in refinement from $0$ up to 

42 `max_alpha` and `max_beta`. From the surrogate's perspective, the concatenation of $(\\alpha, \\beta)$ fully 

43 specifies a single fidelity "level". The `ComponentSurrogate` forms an approximation of the model by summing 

44 up over many of these concatenated sets of $(\\alpha, \\beta)$. These lists are stored in a data structure of 

45 `list[ tuple[ tuple, tuple ], ...]`. When $\\alpha$ or $\\beta$ are used as keys in a `dict`, they are cast to 

46 a Python `str` from a `tuple`. 

47 

48 :ivar index_set: the current active set of multi-indices in the MISC approximation 

49 :ivar candidate_set: all neighboring multi-indices that are candidates for inclusion in `index_set` 

50 :ivar x_vars: list of variables that define the input domain 

51 :ivar ydim: the number of outputs returned by the model 

52 :ivar _model: stores a ref to the model or function that is to be approximated, callable as `ret = model(x)` 

53 :ivar _model_args: additional arguments to supply to the model 

54 :ivar _model_kwargs: additional keyword arguments to supply to the model 

55 :ivar truth_alpha: the model fidelity indices to treat as the "ground truth" model 

56 :ivar max_refine: the maximum level of refinement for each fidelity index in $(\\alpha, \\beta)$ 

57 :ivar surrogates: keeps track of the `BaseInterpolator` associated with each set of $(\\alpha, \\beta)$ 

58 :ivar costs: keeps track of total cost associated with adding a single $(\\alpha, \\beta)$ to the MISC approximation 

59 :ivar misc_coeff: the combination technique coefficients for the MISC approximation 

60 

61 :vartype index_set: IndexSet 

62 :vartype candidate_set: IndexSet 

63 :vartype x_vars: list[BaseRV] 

64 :vartype ydim: int 

65 :vartype _model: callable[np.ndarray] -> dict 

66 :vartype _model_args: tuple 

67 :vartype _model_kwargs: dict 

68 :vartype truth_alpha: tuple[int, ...] 

69 :vartype max_refine: list[int, ...] 

70 :vartype surrogates: MiscTree 

71 :vartype costs: MiscTree 

72 :vartype misc_coeff: MiscTree 

73 """ 

74 

75 def __init__(self, x_vars: list[BaseRV] | BaseRV, model: callable, 

76 multi_index: IndexSet = None, 

77 truth_alpha: tuple = (), max_alpha: tuple = (), max_beta: tuple = (), 

78 log_file: str | Path = None, executor: Executor = None, 

79 model_args: tuple = (), model_kwargs: dict = None): 

80 """Construct the MISC surrogate and initialize with any multi-indices passed in. 

81 

82 !!! Info "Model specification" 

83 The model is a callable function of the form `ret = model(x, *args, **kwargs)`. The return value is a 

84 dictionary of the form `ret = {'y': y, 'files': files, 'cost': cost}`. In the return dictionary, you 

85 specify the raw model output `y` as an `np.ndarray` at a _minimum_. Optionally, you can specify paths to 

86 output files and the average model cost (in units of seconds of cpu time), and anything else you want. 

87 

88 !!! Warning 

89 If the model has multiple fidelities, then the function signature must be `model(x, alpha, *args, **kwargs)` 

90 ; the first argument after `x` will always be the fidelity indices `alpha`. The rest of `model_args` will 

91 be passed in after (you do not need to include `alpha` in `model_args`, it is done automatically). 

92 

93 :param x_vars: `[X1, X2, ...]` list of variables specifying bounds/pdfs for each input 

94 :param model: the function to approximate, callable as `ret = model(x, *args, **kwargs)` 

95 :param multi_index: `[((alpha1), (beta1)), ... ]` list of concatenated multi-indices $(\\alpha, \\beta)$ 

96 :param truth_alpha: specifies the highest model fidelity indices necessary for a "ground truth" comparison 

97 :param max_alpha: the maximum model refinement indices to allow, defaults to `(2,...)` if applicable 

98 :param max_beta: the maximum surrogate refinement indices, defaults to `(2,...)` of length `x_dim` 

99 :param log_file: specifies a log file (optional) 

100 :param executor: parallel executor used to add candidate indices in parallel (optional) 

101 :param model_args: optional args to pass when calling the model 

102 :param model_kwargs: optional kwargs to pass when calling the model 

103 """ 

104 self.logger = get_logger(self.__class__.__name__, log_file=log_file) 

105 self.log_file = log_file 

106 self.executor = executor 

107 self.training_flag = None # Keep track of which MISC coeffs are active 

108 # (True=active set, False=active+candidate sets, None=Neither/unknown) 

109 

110 multi_index = list() if multi_index is None else multi_index 

111 assert self.is_downward_closed(multi_index), 'Must be a downward closed set.' 

112 self.ydim = None 

113 self.index_set = [] # The active index set for the MISC approximation 

114 self.candidate_set = [] # Candidate indices for refinement 

115 self._model = model 

116 self._model_args = model_args 

117 self._model_kwargs = model_kwargs if model_kwargs is not None else {} 

118 self.truth_alpha = truth_alpha 

119 self.x_vars = x_vars if isinstance(x_vars, list) else [x_vars] 

120 max_alpha = truth_alpha if max_alpha == () else max_alpha 

121 max_beta = (2,)*len(self.x_vars) if max_beta == () else max_beta 

122 self.max_refine = list(max_alpha + max_beta) # Max refinement indices 

123 

124 # Initialize important tree-like structures 

125 self.surrogates = dict() # Maps alphas -> betas -> surrogates 

126 self.costs = dict() # Maps alphas -> betas -> wall clock run times 

127 self.misc_coeff = dict() # Maps alphas -> betas -> MISC coefficients 

128 

129 # Construct vectors of [0,1]^dim(alpha+beta) 

130 Nij = len(self.max_refine) 

131 self.ij = np.zeros((2 ** Nij, Nij), dtype=np.uint8) 

132 for i, ele in enumerate(itertools.product([0, 1], repeat=Nij)): 

133 self.ij[i, :] = ele 

134 

135 # Initialize any indices that were passed in 

136 multi_index = list() if multi_index is None else multi_index 

137 for alpha, beta in multi_index: 

138 self.activate_index(alpha, beta) 

139 

140 def activate_index(self, alpha: tuple, beta: tuple): 

141 """Add a multi-index to the active set and all neighbors to the candidate set. 

142 

143 :param alpha: A multi-index specifying model fidelity 

144 :param beta: A multi-index specifying surrogate fidelity 

145 """ 

146 # User is responsible for making sure index set is downward-closed 

147 alpha, beta = tuple([int(i) for i in alpha]), tuple([int(i) for i in beta]) # Make sure these are python ints 

148 self.add_surrogate(alpha, beta) 

149 ele = (alpha, beta) 

150 if ele in self.index_set: 

151 self.logger.warning(f'Multi-index {ele} is already in the active index set. Ignoring...') 

152 return 

153 

154 # Add all possible new candidates (distance of one unit vector away) 

155 ind = list(alpha + beta) 

156 new_candidates = [] 

157 for i in range(len(ind)): 

158 ind_new = ind.copy() 

159 ind_new[i] += 1 

160 

161 # Don't add if we surpass a refinement limit 

162 if np.any(np.array(ind_new) > np.array(self.max_refine)): 

163 continue 

164 

165 # Add the new index if it maintains downward-closedness 

166 new_cand = (tuple(ind_new[:len(alpha)]), tuple(ind_new[len(alpha):])) 

167 down_closed = True 

168 for j in range(len(ind)): 

169 ind_check = ind_new.copy() 

170 ind_check[j] -= 1 

171 if ind_check[j] >= 0: 

172 tup_check = (tuple(ind_check[:len(alpha)]), tuple(ind_check[len(alpha):])) 

173 if tup_check not in self.index_set and tup_check != ele: 

174 down_closed = False 

175 break 

176 if down_closed: 

177 new_candidates.append(new_cand) 

178 

179 # Build an interpolator for each new candidate 

180 if self.executor is None: # Sequential 

181 for a, b in new_candidates: 

182 self.add_surrogate(a, b) 

183 else: # Parallel 

184 temp_exc = self.executor 

185 self.executor = None 

186 for a, b in new_candidates: 

187 if str(a) not in self.surrogates: 

188 self.surrogates[str(a)] = dict() 

189 self.costs[str(a)] = dict() 

190 self.misc_coeff[str(a)] = dict() 

191 self.parallel_add_candidates(new_candidates, temp_exc) 

192 self.executor = temp_exc 

193 

194 # Move to the active index set 

195 if ele in self.candidate_set: 

196 self.candidate_set.remove(ele) 

197 self.index_set.append(ele) 

198 new_candidates = [cand for cand in new_candidates if cand not in self.candidate_set] 

199 self.candidate_set.extend(new_candidates) 

200 self.training_flag = None # Makes sure misc coeffs get recomputed next time 

201 

202 def add_surrogate(self, alpha: tuple, beta: tuple): 

203 """Build a `BaseInterpolator` object for a given $(\\alpha, \\beta)$ 

204 

205 :param alpha: A multi-index specifying model fidelity 

206 :param beta: A multi-index specifying surrogate fidelity 

207 """ 

208 # Create a dictionary for each alpha model to store multiple surrogate fidelities (beta) 

209 if str(alpha) not in self.surrogates: 

210 self.surrogates[str(alpha)] = dict() 

211 self.costs[str(alpha)] = dict() 

212 self.misc_coeff[str(alpha)] = dict() 

213 

214 # Create a new interpolator object for this multi-index (abstract method) 

215 if self.surrogates[str(alpha)].get(str(beta), None) is None: 

216 self.logger.info(f'Building interpolator for index {(alpha, beta)} ...') 

217 x_new_idx, x_new, interp = self.build_interpolator(alpha, beta) 

218 self.surrogates[str(alpha)][str(beta)] = interp 

219 cost = self.update_interpolator(x_new_idx, x_new, interp) # Awkward, but needed to separate the model evals 

220 self.costs[str(alpha)][str(beta)] = cost 

221 if self.ydim is None: 

222 self.ydim = interp.ydim() 

223 

224 def init_coarse(self): 

225 """Initialize the coarsest interpolation and add to the active index set""" 

226 alpha = (0,) * len(self.truth_alpha) 

227 beta = (0,) * len(self.max_refine[len(self.truth_alpha):]) 

228 self.activate_index(alpha, beta) 

229 

230 def iterate_candidates(self): 

231 """Iterate candidate indices one by one into the active index set. 

232 

233 :yields alpha, beta: the multi-indices of the current candidate that has been moved to active set 

234 """ 

235 for alpha, beta in list(self.candidate_set): 

236 # Temporarily add a candidate index to active set 

237 self.index_set.append((alpha, beta)) 

238 yield alpha, beta 

239 del self.index_set[-1] 

240 

241 def predict(self, x: np.ndarray | float, use_model: str | tuple = None, model_dir: str | Path = None, 

242 training: bool = False, index_set: IndexSet = None, ppool=None) -> np.ndarray: 

243 """Evaluate the MISC approximation at new points `x`. 

244 

245 !!! Note 

246 By default this will predict the MISC surrogate approximation. However, for convenience you can also specify 

247 `use_model` to call the underlying function instead. 

248 

249 :param x: `(..., x_dim)` the points to be interpolated, must be within input domain for accuracy 

250 :param use_model: 'best'=high-fidelity, 'worst'=low-fidelity, tuple=a specific `alpha`, None=surrogate (default) 

251 :param model_dir: directory to save output files if `use_model` is specified, ignored otherwise 

252 :param training: if `True`, then only compute with the active index set, otherwise use all candidates as well 

253 :param index_set: a list of concatenated $(\\alpha, \\beta)$ to override `self.index_set` if given, else ignore 

254 :param ppool: a joblib `Parallel` pool to loop over multi-indices in parallel 

255 :returns y: `(..., y_dim)` the surrogate approximation of the function (or the function itself if `use_model`) 

256 """ 

257 x = np.atleast_1d(x) 

258 if use_model is not None: 

259 return self._bypass_surrogate(x, use_model, model_dir) 

260 

261 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff 

262 

263 def run_batch(alpha, beta, y): 

264 comb_coeff = misc_coeff[str(alpha)][str(beta)] 

265 if np.abs(comb_coeff) > 0: 

266 func = self.surrogates[str(alpha)][str(beta)] 

267 y += int(comb_coeff) * func(x) 

268 

269 if ppool is not None: 

270 with tempfile.NamedTemporaryFile(suffix='.dat', mode='w+b', delete=False) as y_fd: 

271 pass 

272 y_ret = np.memmap(y_fd.name, dtype=x.dtype, mode='r+', shape=x.shape[:-1] + (self.ydim,)) 

273 ppool(delayed(run_batch)(alpha, beta, y_ret) for alpha, beta in index_set) 

274 y = np.empty(y_ret.shape, dtype=x.dtype) 

275 y[:] = y_ret[:] 

276 del y_ret 

277 os.unlink(y_fd.name) 

278 else: 

279 y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype) 

280 for alpha, beta in index_set: 

281 run_batch(alpha, beta, y) 

282 

283 return y 

284 

285 def grad(self, x: np.ndarray | float | list, training: bool = False, index_set: IndexSet = None) -> np.ndarray: 

286 """Evaluate the derivative/Jacobian of the MISC approximation at new points `x`. 

287 

288 :param x: `(..., x_dim)` the evaluation points, must be within input domain for accuracy 

289 :param training: if `True`, then only compute with the active index set, otherwise use all candidates as well 

290 :param index_set: a list of concatenated $(\\alpha, \\beta)$ to override `self.index_set` if given, else ignore 

291 :returns: `(..., y_dim, x_dim)` the Jacobian of the surrogate approximation 

292 """ 

293 x = np.atleast_1d(x) 

294 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff 

295 

296 jac = np.zeros(x.shape[:-1] + (self.ydim, len(self.x_vars)), dtype=x.dtype) 

297 for alpha, beta in index_set: 

298 comb_coeff = misc_coeff[str(alpha)][str(beta)] 

299 if np.abs(comb_coeff) > 0: 

300 interp = self.surrogates[str(alpha)][str(beta)] 

301 jac += int(comb_coeff) * interp.grad(x) 

302 

303 return jac 

304 

305 def hessian(self, x: np.ndarray | float | list, training: bool = False, index_set: IndexSet = None) -> np.ndarray: 

306 """Evaluate the Hessian of the MISC approximation at new points `x`. 

307 

308 :param x: `(..., x_dim)` the evaluation points, must be within input domain for accuracy 

309 :param training: if `True`, then only compute with the active index set, otherwise use all candidates as well 

310 :param index_set: a list of concatenated $(\\alpha, \\beta)$ to override `self.index_set` if given, else ignore 

311 :returns: `(..., y_dim, x_dim, x_dim)` the Hessian of the surrogate approximation 

312 """ 

313 x = np.atleast_1d(x) 

314 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff 

315 

316 hess = np.zeros(x.shape[:-1] + (self.ydim, len(self.x_vars), len(self.x_vars)), x.dtype) 

317 for alpha, beta in index_set: 

318 comb_coeff = misc_coeff[str(alpha)][str(beta)] 

319 if np.abs(comb_coeff) > 0: 

320 interp = self.surrogates[str(alpha)][str(beta)] 

321 hess += int(comb_coeff) * interp.hessian(x) 

322 

323 return hess 

324 

325 def __call__(self, *args, **kwargs): 

326 """Here for convenience so you can also do `ret = surrogate(x)`, just like the `BaseInterpolator`.""" 

327 return self.predict(*args, **kwargs) 

328 

329 def update_misc_coeffs(self, index_set: IndexSet = None) -> MiscTree: 

330 """Update the combination technique coeffs for MISC using the given index set. 

331 

332 :param index_set: the index set to consider when computing the MISC coefficients, defaults to the active set 

333 :returns: the MISC coefficients for the given index set ($\\alpha$ -> $\\beta$ -> coeff) 

334 """ 

335 if index_set is None: 

336 index_set = self.index_set 

337 

338 # Construct a (N_indices, dim(alpha+beta)) refactor of the index_set for arrayed computations 

339 index_mat = np.zeros((len(index_set), len(self.max_refine)), dtype=np.uint8) 

340 for i, (alpha, beta) in enumerate(index_set): 

341 index_mat[i, :] = alpha + beta 

342 index_mat = np.expand_dims(index_mat, axis=0) # (1, Ns, Nij) 

343 

344 misc_coeff = dict() 

345 for alpha, beta in index_set: 

346 # Add permutations of [0, 1] to (alpha, beta) 

347 alpha_beta = np.array(alpha+beta, dtype=np.uint8)[np.newaxis, :] # (1, Nij) 

348 new_indices = np.expand_dims(alpha_beta + self.ij, axis=1) # (2**Nij, 1, Nij) 

349 

350 # Find which indices are in the index_set (using np broadcasting comparison) 

351 diff = new_indices - index_mat # (2**Nij, Ns, Nij) 

352 idx = np.count_nonzero(diff, axis=-1) == 0 # (2**Nij, Ns) 

353 idx = np.any(idx, axis=-1) # (2**Nij,) 

354 ij_use = self.ij[idx, :] # (*, Nij) 

355 l1_norm = np.sum(np.abs(ij_use), axis=-1) # (*,) 

356 coeff = np.sum((-1.) ** l1_norm) # float 

357 

358 # Save misc coeff to a dict() tree structure 

359 if misc_coeff.get(str(alpha)) is None: 

360 misc_coeff[str(alpha)] = dict() 

361 misc_coeff[str(alpha)][str(beta)] = coeff 

362 self.misc_coeff[str(alpha)][str(beta)] = coeff 

363 

364 return misc_coeff 

365 

366 def get_sub_surrogate(self, alpha: tuple, beta: tuple) -> BaseInterpolator: 

367 """Get the specific sub-surrogate corresponding to the $(\\alpha, \\beta)$ fidelity. 

368 

369 :param alpha: A multi-index specifying model fidelity 

370 :param beta: A multi-index specifying surrogate fidelity 

371 :returns: the corresponding `BaseInterpolator` object 

372 """ 

373 return self.surrogates[str(alpha)][str(beta)] 

374 

375 def get_cost(self, alpha: tuple, beta: tuple) -> float: 

376 """Return the total cost (wall time s) required to add $(\\alpha, \\beta)$ to the MISC approximation. 

377 

378 :param alpha: A multi-index specifying model fidelity 

379 :param beta: A multi-index specifying surrogate fidelity 

380 """ 

381 try: 

382 return self.costs[str(alpha)][str(beta)] 

383 except Exception: 

384 return 0.0 

385 

386 def update_input_bds(self, idx: int, bds: tuple): 

387 """Update the bounds of the input variable at the given index. 

388 

389 :param idx: the index of the input variable to update 

390 :param bds: the new bounds 

391 """ 

392 self.x_vars[int(idx)].update_bounds(*bds) 

393 

394 # Update the bounds in all associated surrogates 

395 for alpha in self.surrogates: 

396 for beta in self.surrogates[alpha]: 

397 self.surrogates[alpha][beta].update_input_bds(idx, bds) 

398 

399 def save_enabled(self): 

400 """Return whether this model wants to save outputs to file. 

401 

402 !!! Note 

403 You can specify that a model wants to save outputs to file by providing an `'output_dir'` kwarg. 

404 """ 

405 return self._model_kwargs.get('output_dir') is not None 

406 

407 def _set_output_dir(self, output_dir: str | Path): 

408 """Update the component model output directory. 

409 

410 :param output_dir: the new directory for model output files 

411 """ 

412 if output_dir is not None: 

413 output_dir = str(Path(output_dir).resolve()) 

414 self._model_kwargs['output_dir'] = output_dir 

415 for alpha in self.surrogates: 

416 for beta in self.surrogates[alpha]: 

417 self.surrogates[alpha][beta]._model_kwargs['output_dir'] = output_dir 

418 

419 def __repr__(self): 

420 """Shows all multi-indices in the current approximation and their corresponding MISC coefficients.""" 

421 s = f'Inputs \u2014 {[str(var) for var in self.x_vars]}\n' 

422 if self.training_flag is None: 

423 self.update_misc_coeffs() 

424 self.training_flag = True 

425 

426 if self.training_flag: 

427 s += '(Training mode)\n' 

428 for alpha, beta in self.index_set: 

429 s += f"[{int(self.misc_coeff[str(alpha)][str(beta)])}] \u2014 {alpha}, {beta}\n" 

430 for alpha, beta in self.candidate_set: 

431 s += f"[-] \u2014 {alpha}, {beta}\n" 

432 else: 

433 s += '(Evaluation mode)\n' 

434 for alpha, beta in self.index_set + self.candidate_set: 

435 s += f"[{int(self.misc_coeff[str(alpha)][str(beta)])}] \u2014 {alpha}, {beta}\n" 

436 return s 

437 

438 def __str__(self): 

439 """Everyone will view these objects the same way.""" 

440 return self.__repr__() 

441 

442 def _bypass_surrogate(self, x, use_model, model_dir): 

443 """Bypass surrogate evaluation and use the specified model""" 

444 output_dir = self._model_kwargs.get('output_dir') 

445 if self.save_enabled(): 

446 self._model_kwargs['output_dir'] = model_dir 

447 

448 alpha_use = {'best': self.truth_alpha, 'worst': (0,) * len(self.truth_alpha)}.get(use_model, use_model) 

449 kwargs = copy.deepcopy(self._model_kwargs) 

450 if len(alpha_use) > 0: 

451 kwargs['alpha'] = alpha_use 

452 ret = self._model(x, *self._model_args, **kwargs) 

453 

454 if output_dir is not None: 

455 self._model_kwargs['output_dir'] = output_dir 

456 

457 if not isinstance(ret, dict): 

458 self.logger.warning(f"Function {self._model} did not return a dict of the form {{'y': y}}. Please make sure" 

459 f" you do so to avoid conflicts. Returning the value directly instead...") 

460 

461 return ret['y'] if isinstance(ret, dict) else ret 

462 

463 def _combination(self, index_set, training): 

464 """Decide which index set and corresponding misc coefficients to use.""" 

465 misc_coeff = copy.deepcopy(self.misc_coeff) 

466 if index_set is None: 

467 # Use active indices + candidate indices depending on training mode 

468 index_set = self.index_set if training else self.index_set + self.candidate_set 

469 

470 # Decide when to update misc coefficients 

471 if self.training_flag is None: 

472 misc_coeff = self.update_misc_coeffs(index_set) # On initialization or reset 

473 else: 

474 if (not self.training_flag and training) or (self.training_flag and not training): 

475 misc_coeff = self.update_misc_coeffs(index_set) # Logical XOR cases for training mode 

476 

477 # Save an indication of what state the MISC coefficients are in (i.e. training or eval mode) 

478 self.training_flag = training 

479 else: 

480 # If we passed in an index set, always recompute misc coeff and toggle for reset on next call 

481 misc_coeff = self.update_misc_coeffs(index_set) 

482 self.training_flag = None 

483 

484 return index_set, misc_coeff 

485 

486 @staticmethod 

487 def is_one_level_refinement(beta_old: tuple, beta_new: tuple) -> bool: 

488 """Check if a new `beta` multi-index is a one-level refinement from a previous `beta`. 

489 

490 !!! Example 

491 Refining from `(0, 1, 2)` to the new multi-index `(1, 1, 2)` is a one-level refinement. But refining to 

492 either `(2, 1, 2)` or `(1, 2, 2)` are not, since more than one refinement occurs at the same time. 

493 

494 :param beta_old: the starting multi-index 

495 :param beta_new: the new refined multi-index 

496 :returns: whether `beta_new` is a one-level refinement from `beta_old` 

497 """ 

498 level_diff = np.array(beta_new, dtype=int) - np.array(beta_old, dtype=int) 

499 ind = np.nonzero(level_diff)[0] 

500 return ind.shape[0] == 1 and level_diff[ind] == 1 

501 

502 @staticmethod 

503 def is_downward_closed(indices: IndexSet) -> bool: 

504 """Return if a list of $(\\alpha, \\beta)$ multi-indices is downward-closed. 

505 

506 MISC approximations require a downward-closed set in order to use the combination-technique formula for the 

507 coefficients (as implemented here). 

508 

509 !!! Example 

510 The list `[( (0,), (0,) ), ( (1,), (0,) ), ( (1,), (1,) )]` is downward-closed. You can visualize this as 

511 building a stack of cubes: in order to place a cube, all adjacent cubes must be present (does the logo 

512 make sense now?). 

513 

514 :param indices: list() of (`alpha`, `beta`) multi-indices 

515 :returns: whether the set of indices is downward-closed 

516 """ 

517 # Iterate over every multi-index 

518 for alpha, beta in indices: 

519 # Every smaller multi-index must also be included in the indices list 

520 sub_sets = [np.arange(tuple(alpha+beta)[i]+1) for i in range(len(alpha) + len(beta))] 

521 for ele in itertools.product(*sub_sets): 

522 tup = (tuple(ele[:len(alpha)]), tuple(ele[len(alpha):])) 

523 if tup not in indices: 

524 return False 

525 return True 

526 

527 @abstractmethod 

528 def build_interpolator(self, alpha: tuple, beta: tuple) -> InterpResults: 

529 """Return a `BaseInterpolator` object and new refinement points for a given $(\\alpha, \\beta)$ multi-index. 

530 

531 :param alpha: A multi-index specifying model fidelity 

532 :param beta: A multi-index specifying surrogate fidelity 

533 :returns: `idx`, `x`, `interp` - list of new grid indices, the new grid points `(N_new, x_dim)`, and the 

534 `BaseInterpolator` object. Similar to `BaseInterpolator.refine()`. 

535 """ 

536 pass 

537 

538 @abstractmethod 

539 def update_interpolator(self, x_new_idx: list[int | tuple | str], 

540 x_new: np.ndarray, interp: BaseInterpolator) -> float: 

541 """Secondary method to actually compute and save model evaluations within the interpolator. 

542 

543 !!! Note 

544 This distinction with `build_interpolator` was necessary to separately construct the interpolator and be 

545 able to evaluate the model at the new interpolation points. You can see that `parallel_add_candidates` 

546 uses this distinction to compute the model in parallel on MPI workers, for example. 

547 

548 :param x_new_idx: list of new grid point indices 

549 :param x_new: `(N_new, x_dim)`, the new grid point locations 

550 :param interp: the `BaseInterpolator` object to compute model evaluations with 

551 :returns cost: the cost (in wall time seconds) required to add this `BaseInterpolator` object 

552 """ 

553 pass 

554 

555 @abstractmethod 

556 def parallel_add_candidates(self, candidates: IndexSet, executor: Executor): 

557 """Defines a function to handle adding candidate indices in parallel. 

558 

559 !!! Note 

560 While `build_interpolator` can make changes to 'self', these changes will not be saved in the master task 

561 if running in parallel over MPI workers, for example. This method is a workaround so that all required 

562 mutable changes to 'self' are made in the master task, before distributing tasks to parallel workers 

563 using this method. You can pass if you don't plan to add candidates in parallel. 

564 

565 :param candidates: list of [(alpha, beta),...] multi-indices 

566 :param executor: the executor used to iterate candidates in parallel 

567 """ 

568 pass 

569 

570 

571class SparseGridSurrogate(ComponentSurrogate): 

572 """Concrete MISC surrogate class that maintains a sparse grid composed of smaller tensor-product grids. 

573 

574 !!! Note 

575 MISC itself can be thought of as an extension to the well-known sparse grid technique, so this class 

576 readily integrates with the MISC implementation in `ComponentSurrogate`. Sparse grids limit the curse 

577 of dimensionality up to about `dim = 10-15` for the input space (which would otherwise be infeasible with a 

578 normal full tensor-product grid of the same size). 

579 

580 !!! Info "About points in a sparse grid" 

581 A sparse grid approximates a full tensor-product grid $(N_1, N_2, ..., N_d)$, where $N_i$ is the number of grid 

582 points along dimension $i$, for a $d$-dimensional space. Each point is uniquely identified in the sparse grid 

583 by a list of indices $(j_1, j_2, ..., j_d)$, where $j_i = 0 ... N_i$. We refer to this unique identifier as a 

584 "grid coordinate". In the `HashSG` data structure, we use a `str(tuple(coord))` representation to uniquely 

585 identify the coordinate in a hash DS like Python's `dict`. 

586 

587 :cvar HashSG: a type alias for the hash storage of the sparse grid data (a tree-like DS using dicts) 

588 :ivar curr_max_beta: the current maximum $\\beta$ refinement indices in the sparse grid (for each $\\alpha$) 

589 :ivar x_grids: maps $\\alpha$ indices to a list of 1d grids corresponding to `curr_max_beta` 

590 :ivar xi_map: the sparse grid interpolation points 

591 :ivar yi_map: the function values at all sparse grid points 

592 :ivar yi_nan_map: imputed function values to use when `yi_map` contains `nan` data (sometimes the model fails...) 

593 :ivar yi_files: optional filenames corresponding to the sparse grid `yi_map` data 

594 

595 :vartype HashSG: dict[str: dict[str: np.ndarray | str]] 

596 :vartype curr_max_beta: dict[str: list[int]] 

597 :vartype x_grids: dict[str: np.ndarray] 

598 :vartype xi_map: HashSG 

599 :vartype yi_map: HashSG 

600 :vartype yi_nan_map: HashSG 

601 :vartype yi_files: HashSG 

602 """ 

603 

604 HashSG = dict[str: dict[str: np.ndarray | str]] 

605 

606 def __init__(self, *args, **kwargs): 

607 # Initialize tree-like hash structures for maintaining a sparse grid of smaller tensor-product grids 

608 self.curr_max_beta = dict() # Maps alphas -> current max refinement indices 

609 self.x_grids = dict() # Maps alphas -> list of ndarrays specifying 1d grids corresponding to max_beta 

610 self.xi_map = dict() # Maps alphas -> grid point coords -> interpolation points 

611 self.yi_map = dict() # Maps alphas -> grid point coords -> interpolation function values 

612 self.yi_nan_map = dict() # Maps alphas -> grid point coords -> interpolated yi values when yi=nan 

613 self.yi_files = dict() # Maps alphas -> grid point coords -> model output files (optional) 

614 super().__init__(*args, **kwargs) 

615 

616 # Override 

617 def predict(self, x, use_model=None, model_dir=None, training=False, index_set=None, ppool=None): 

618 """Need to override `super()` to allow passing in interpolation grids `xi` and `yi`.""" 

619 x = np.atleast_1d(x) 

620 if use_model is not None: 

621 return self._bypass_surrogate(x, use_model, model_dir) 

622 

623 index_set, misc_coeff = self._combination(index_set, training) 

624 

625 def run_batch(alpha, beta, y): 

626 comb_coeff = misc_coeff[str(alpha)][str(beta)] 

627 if np.abs(comb_coeff) > 0: 

628 # Gather the xi/yi interpolation points/qoi_ind for this sub tensor-product grid 

629 interp = self.surrogates[str(alpha)][str(beta)] 

630 xi, yi = self.get_tensor_grid(alpha, beta) 

631 

632 # Add this sub tensor-product grid to the MISC approximation 

633 y += int(comb_coeff) * interp(x, xi=xi, yi=yi) 

634 

635 if ppool is not None: 

636 with tempfile.NamedTemporaryFile(suffix='.dat', mode='w+b', delete=False) as y_fd: 

637 pass 

638 y_ret = np.memmap(y_fd.name, dtype=x.dtype, mode='r+', shape=x.shape[:-1] + (self.ydim,)) 

639 ppool(delayed(run_batch)(alpha, beta, y_ret) for alpha, beta in index_set) 

640 y = np.empty(y_ret.shape, dtype=x.dtype) 

641 y[:] = y_ret[:] 

642 del y_ret 

643 os.unlink(y_fd.name) 

644 else: 

645 y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype) 

646 for alpha, beta in index_set: 

647 run_batch(alpha, beta, y) 

648 

649 return y 

650 

651 # Override 

652 def grad(self, x, training=False, index_set=None): 

653 """Need to override `super()` to allow passing in interpolation grids `xi` and `yi`.""" 

654 x = np.atleast_1d(x) 

655 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff 

656 

657 jac = np.zeros(x.shape[:-1] + (self.ydim, len(self.x_vars)), dtype=x.dtype) 

658 for alpha, beta in index_set: 

659 comb_coeff = misc_coeff[str(alpha)][str(beta)] 

660 if np.abs(comb_coeff) > 0: 

661 # Gather the xi/yi interpolation points/qoi_ind for this sub tensor-product grid 

662 interp = self.surrogates[str(alpha)][str(beta)] 

663 xi, yi = self.get_tensor_grid(alpha, beta) 

664 

665 jac += int(comb_coeff) * interp.grad(x, xi=xi, yi=yi) 

666 

667 return jac 

668 

669 # Override 

670 def hessian(self, x, training=False, index_set=None): 

671 """Need to override `super()` to allow passing in interpolation grids `xi` and `yi`.""" 

672 x = np.atleast_1d(x) 

673 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff 

674 

675 hess = np.zeros(x.shape[:-1] + (self.ydim, len(self.x_vars), len(self.x_vars)), dtype=x.dtype) 

676 for alpha, beta in index_set: 

677 comb_coeff = misc_coeff[str(alpha)][str(beta)] 

678 if np.abs(comb_coeff) > 0: 

679 # Gather the xi/yi interpolation points/qoi_ind for this sub tensor-product grid 

680 interp = self.surrogates[str(alpha)][str(beta)] 

681 xi, yi = self.get_tensor_grid(alpha, beta) 

682 

683 hess += int(comb_coeff) * interp.hessian(x, xi=xi, yi=yi) 

684 

685 return hess 

686 

687 def get_tensor_grid(self, alpha: tuple, beta: tuple, update_nan: bool = True) -> tuple[np.ndarray, np.ndarray]: 

688 """Construct the `xi/yi` sub tensor-product grids for a given $(\\alpha, \\beta)$ multi-index. 

689 

690 :param alpha: model fidelity multi-index 

691 :param beta: surrogate fidelity multi-index 

692 :param update_nan: try to fill `nan` with imputed values, otherwise just return the `nans` in place 

693 :returns: `xi, yi`, of size `(prod(grid_sizes), x_dim)` and `(prod(grid_sizes), y_dim)` respectively, the 

694 interpolation grid points and corresponding function values for this tensor-product grid 

695 """ 

696 interp = self.surrogates[str(alpha)][str(beta)] 

697 grid_sizes = interp.get_grid_sizes(beta) 

698 coords = [list(range(grid_sizes[n])) for n in range(interp.xdim())] 

699 xi = np.zeros((np.prod(grid_sizes), interp.xdim()), dtype=np.float32) 

700 yi = np.zeros((np.prod(grid_sizes), self.ydim), dtype=np.float32) 

701 for i, coord in enumerate(itertools.product(*coords)): 

702 xi[i, :] = self.xi_map[str(alpha)][str(coord)] 

703 yi_curr = self.yi_map[str(alpha)][str(coord)] 

704 if update_nan and np.any(np.isnan(yi_curr)): 

705 # Try to replace NaN values if they are stored 

706 yi_curr = self.yi_nan_map[str(alpha)].get(str(coord), yi_curr) 

707 yi[i, :] = yi_curr 

708 

709 return xi, yi 

710 

711 def get_training_data(self) -> tuple[dict[str: np.ndarray], dict[str: np.ndarray]]: 

712 """Grab all `x,y` training data stored in the sparse grid for each model fidelity level $\\alpha$. 

713 

714 :returns: `xi`, `yi`, each a `dict` mapping `alpha` indices to `np.ndarrays` 

715 """ 

716 xi, yi = dict(), dict() 

717 for alpha, x_map in self.xi_map.items(): 

718 x = np.zeros((len(x_map), len(self.x_vars))) 

719 y = np.zeros((len(x_map), self.ydim)) 

720 for i, (coord, x_coord) in enumerate(x_map.items()): 

721 x[i, :] = x_coord 

722 y[i, :] = self.yi_nan_map[alpha].get(coord, self.yi_map[alpha][coord]) 

723 

724 xi[alpha] = x 

725 yi[alpha] = y 

726 

727 return xi, yi 

728 

729 def update_yi(self, alpha: tuple, beta: tuple, yi_dict: dict[str: np.ndarray]): 

730 """Helper method to update `yi` values, accounting for possible `nans` by regression imputation. 

731 

732 :param alpha: the model fidelity indices 

733 :param beta: the surrogate fidelity indices 

734 :param yi_dict: a `dict` mapping `str(coord)` grid coordinates to function values 

735 """ 

736 self.yi_map[str(alpha)].update(yi_dict) 

737 imputer, xdim = None, len(self.x_vars) 

738 for grid_coord, yi in yi_dict.items(): 

739 if np.any(np.isnan(yi)): 

740 if imputer is None: 

741 # Grab all 'good' interpolation points and train a simple linear regression fit 

742 xi_mat, yi_mat = np.zeros((0, xdim)), np.zeros((0, self.ydim)) 

743 for coord, xi in self.xi_map[str(alpha)].items(): 

744 if coord not in self.yi_nan_map[str(alpha)] and coord in self.yi_map[str(alpha)]: 

745 yi_add = self.yi_map[str(alpha)][str(coord)] 

746 xi_mat = np.concatenate((xi_mat, xi.reshape((1, xdim))), axis=0) 

747 yi_mat = np.concatenate((yi_mat, yi_add.reshape((1, self.ydim))), axis=0) 

748 nan_idx = np.any(np.isnan(yi_mat), axis=-1) 

749 xi_mat = xi_mat[~nan_idx, :] 

750 yi_mat = yi_mat[~nan_idx, :] 

751 imputer = Pipeline([('scaler', MaxAbsScaler()), ('model', Ridge(alpha=1))]) 

752 imputer.fit(xi_mat, yi_mat) 

753 x_interp = self.xi_map[str(alpha)][str(grid_coord)].reshape((1, xdim)) 

754 y_interp = np.atleast_1d(np.squeeze(imputer.predict(x_interp))) 

755 nan_idx = np.isnan(yi) 

756 y_interp[~nan_idx] = yi[~nan_idx] # Only keep imputed values where yi is nan 

757 self.yi_nan_map[str(alpha)][str(grid_coord)] = y_interp 

758 

759 # Go back and try to re-interpolate old nan values as more points get added to the grid 

760 if imputer is not None: 

761 for grid_coord in list(self.yi_nan_map[str(alpha)].keys()): 

762 if grid_coord not in yi_dict: 

763 x_interp = self.xi_map[str(alpha)][str(grid_coord)].reshape((1, xdim)) 

764 y_interp = imputer.predict(x_interp) 

765 self.yi_nan_map[str(alpha)][str(grid_coord)] = np.atleast_1d(np.squeeze(y_interp)) 

766 

767 # Override 

768 def get_sub_surrogate(self, alpha: tuple, beta: tuple, include_grid: bool = False) -> BaseInterpolator: 

769 """Get the specific sub-surrogate corresponding to the $(\\alpha, \\beta)$ fidelity. 

770 

771 :param alpha: A multi-index specifying model fidelity 

772 :param beta: A multi-index specifying surrogate fidelity 

773 :param include_grid: whether to add the `xi/yi` interpolation points to the returned `BaseInterpolator` object 

774 :returns: the `BaseInterpolator` object corresponding to $(\\alpha, \\beta)$ 

775 """ 

776 interp = super().get_sub_surrogate(alpha, beta) 

777 if include_grid: 

778 interp.xi, interp.yi = self.get_tensor_grid(alpha, beta) 

779 return interp 

780 

781 def build_interpolator(self, alpha, beta): 

782 """Abstract method implementation for constructing the tensor-product grid interpolator.""" 

783 # Create a new tensor-product grid interpolator for the base index (0, 0, ...) 

784 if np.sum(beta) == 0: 

785 kwargs = copy.deepcopy(self._model_kwargs) 

786 if len(alpha) > 0: 

787 kwargs['alpha'] = alpha 

788 interp = LagrangeInterpolator(beta, self.x_vars, model=self._model, model_args=self._model_args, 

789 model_kwargs=kwargs, init_grids=True, reduced=True) 

790 x_pt = np.array([float(interp.x_grids[n][beta[n]]) for n in range(interp.xdim())], dtype=np.float32) 

791 self.curr_max_beta[str(alpha)] = list(beta) 

792 self.x_grids[str(alpha)] = copy.deepcopy(interp.x_grids) 

793 self.xi_map[str(alpha)] = {str(beta): x_pt} 

794 self.yi_map[str(alpha)] = dict() 

795 self.yi_nan_map[str(alpha)] = dict() 

796 if self.save_enabled(): 

797 self.yi_files[str(alpha)] = dict() 

798 

799 return [beta], x_pt.reshape((1, len(self.x_vars))), interp 

800 # Otherwise, all other indices are a refinement of previous grids 

801 

802 # Look for first multi-index neighbor that is one level of refinement away 

803 refine_tup = None 

804 for beta_old_str in list(self.surrogates[str(alpha)].keys()): 

805 beta_old = ast.literal_eval(beta_old_str) 

806 if self.is_one_level_refinement(beta_old, beta): 

807 idx_refine = int(np.nonzero(np.array(beta, dtype=int) - np.array(beta_old, dtype=int))[0][0]) 

808 refine_level = beta[idx_refine] 

809 if refine_level > self.curr_max_beta[str(alpha)][idx_refine]: 

810 # Generate next refinement grid and save (refine_tup = tuple(x_new_idx, x_new, interp)) 

811 refine_tup = self.surrogates[str(alpha)][beta_old_str].refine(beta, auto=False) 

812 self.curr_max_beta[str(alpha)][idx_refine] = refine_level 

813 self.x_grids[str(alpha)][idx_refine] = copy.deepcopy(refine_tup[2].x_grids[idx_refine]) 

814 else: 

815 # Access the refinement grid from memory (it is already computed) 

816 num_pts = self.surrogates[str(alpha)][beta_old_str].get_grid_sizes(beta)[idx_refine] 

817 x_refine = self.x_grids[str(alpha)][idx_refine][:num_pts] 

818 refine_tup = self.surrogates[str(alpha)][beta_old_str].refine(beta, x_refine=x_refine, 

819 auto=False) 

820 break # Only need to grab one neighbor 

821 

822 # Gather new interpolation grid points 

823 x_new_idx, x_new, interp = refine_tup 

824 xn_coord = [] # Save multi-index coordinates of points to compute model at for refinement 

825 xn_pts = np.zeros((0, interp.xdim()), dtype=np.float32) # Save physical x location of new points 

826 for i, multi_idx in enumerate(x_new_idx): 

827 if str(multi_idx) not in self.yi_map[str(alpha)]: 

828 # We have not computed this grid coordinate yet 

829 xn_coord.append(multi_idx) 

830 xn_pts = np.concatenate((xn_pts, x_new[i, np.newaxis, :]), axis=0) # (N_new, xdim) 

831 self.xi_map[str(alpha)][str(multi_idx)] = x_new[i, :] 

832 

833 return xn_coord, xn_pts, interp 

834 

835 def update_interpolator(self, x_new_idx, x_new, interp): 

836 """Awkward solution, I know, but actually compute and save the model evaluations here.""" 

837 # Compute and store model output at new refinement points in a hash structure 

838 yi_ret = interp.set_yi(x_new=(x_new_idx, x_new)) 

839 

840 if self.ydim is None: 

841 for coord_str, yi in yi_ret['y'].items(): 

842 self.ydim = yi.shape[0] 

843 break 

844 

845 alpha = interp._model_kwargs.get('alpha', ()) 

846 self.update_yi(alpha, interp.beta, yi_ret['y']) 

847 if self.save_enabled(): 

848 self.yi_files[str(alpha)].update(yi_ret['files']) 

849 cost = interp.model_cost * len(x_new_idx) 

850 

851 return cost 

852 

853 def parallel_add_candidates(self, candidates: IndexSet, executor: Executor): 

854 """Work-around to make sure mutable instance variable changes are made before/after 

855 splitting tasks using this method over parallel (potentially MPI) workers. You can pass if you are not 

856 interested in such parallel ideas. 

857 

858 !!! Warning 

859 MPI workers cannot save changes to `self` so this method should only distribute static tasks to the workers. 

860 

861 :param candidates: list of [(alpha, beta),...] multi-indices 

862 :param executor: the executor used to iterate candidates in parallel 

863 """ 

864 # Do sequential tasks first (i.e. make mutable changes to self), build up parallel task args 

865 task_args = [] 

866 for alpha, beta in candidates: 

867 x_new_idx, x_new, interp = self.build_interpolator(alpha, beta) 

868 task_args.append((alpha, beta, x_new_idx, x_new, interp)) 

869 

870 def parallel_task(alpha, beta, x_new_idx, x_new, interp): 

871 # Must return anything you want changed in self or interp (mutable changes aren't saved over MPI workers) 

872 logger = get_logger(self.__class__.__name__, log_file=self.log_file, stdout=False) 

873 logger.info(f'Building interpolator for index {(alpha, beta)} ...') 

874 yi_ret = interp.set_yi(x_new=(x_new_idx, x_new)) 

875 model_cost = interp.model_cost if interp.model_cost is not None else 1 

876 return yi_ret, model_cost 

877 

878 # Wait for all parallel workers to return 

879 fs = [executor.submit(parallel_task, *args) for args in task_args] 

880 wait(fs, timeout=None, return_when=ALL_COMPLETED) 

881 

882 # Update self and interp with the results from all workers (and check for errors) 

883 for i, future in enumerate(fs): 

884 try: 

885 a = task_args[i][0] 

886 b = task_args[i][1] 

887 x_new_idx = task_args[i][2] 

888 interp = task_args[i][4] 

889 yi_ret, model_cost = future.result() 

890 interp.model_cost = model_cost 

891 self.surrogates[str(a)][str(b)] = interp 

892 self.update_yi(a, b, yi_ret['y']) 

893 if self.save_enabled(): 

894 self.yi_files[str(a)].update(yi_ret['files']) 

895 self.costs[str(a)][str(b)] = interp.model_cost * len(x_new_idx) 

896 

897 if self.ydim is None: 

898 for coord_str, yi in self.yi_map[str(a)].items(): 

899 self.ydim = yi.shape[0] 

900 break 

901 except: 

902 self.logger.error(f'An exception occurred in a thread handling build_interpolator{candidates[i]}') 

903 raise 

904 

905 

906class AnalyticalSurrogate(ComponentSurrogate): 

907 """Concrete "surrogate" class that just uses the analytical model (i.e. bypasses surrogate evaluation).""" 

908 

909 def __init__(self, x_vars, model, *args, **kwargs): 

910 """Initializes a stand-in `ComponentSurrogate` with all unnecessary fields set to empty. 

911 

912 !!! Warning 

913 This overwrites anything passed in for `truth_alpha`, `max_alpha`, `max_beta`, or `multi_index` since 

914 these are not used for an analytical model. 

915 """ 

916 kwargs['truth_alpha'] = () 

917 kwargs['max_alpha'] = () 

918 kwargs['max_beta'] = () 

919 kwargs['multi_index'] = [] 

920 super().__init__(x_vars, model, *args, **kwargs) 

921 

922 # Override 

923 def predict(self, x: np.ndarray | float, **kwargs) -> np.ndarray: 

924 """Evaluate the analytical model at points `x`, ignore extra `**kwargs` passed in. 

925 

926 :param x: `(..., x_dim)` the points to be evaluated 

927 :returns y: `(..., y_dim)` the exact model output at the input points 

928 """ 

929 ret = self._model(x, *self._model_args, **self._model_kwargs) 

930 

931 if not isinstance(ret, dict): 

932 self.logger.warning(f"Function {self._model} did not return a dict of the form {{'y': y}}. Please make sure" 

933 f" you do so to avoid conflicts. Returning the value directly instead...") 

934 

935 return ret['y'] if isinstance(ret, dict) else ret 

936 

937 # Override 

938 def grad(self, x, training=False, index_set=None): 

939 """Use auto-diff to compute derivative of an analytical model. Model must be implemented with `numpy`. 

940 

941 !!! Warning "Not implemented yet" 

942 Hypothetically, auto-diff libraries like `jax` could be used to write a generic gradient function here for 

943 analytical models implemented directly in Python/numpy. But there are a lot of quirks that should be worked 

944 out first. 

945 """ 

946 raise NotImplementedError('Need to implement a generic auto-diff function here, like using jax for example.') 

947 

948 def hessian(self, x, training=False, index_set=None): 

949 """Use auto-diff to compute derivative of an analytical model. Model must be implemented with `numpy`. 

950 

951 !!! Warning "Not implemented yet" 

952 Hypothetically, auto-diff libraries like `jax` could be used to write a generic Hessian function here for 

953 analytical models implemented directly in Python/numpy. But there are a lot of quirks that should be worked 

954 out first. 

955 """ 

956 raise NotImplementedError('Need to implement a generic auto-diff function here, like using jax for example.') 

957 

958 # Override 

959 def activate_index(self, *args): 

960 """Do nothing""" 

961 pass 

962 

963 # Override 

964 def add_surrogate(self, *args): 

965 """Do nothing""" 

966 pass 

967 

968 # Override 

969 def init_coarse(self): 

970 """Do nothing""" 

971 pass 

972 

973 # Override 

974 def update_misc_coeffs(self, **kwargs): 

975 """Do nothing""" 

976 pass 

977 

978 # Override 

979 def get_sub_surrogate(self, *args): 

980 """Nothing to return for analytical model""" 

981 return None 

982 

983 # Override 

984 def get_cost(self, *args): 

985 """Return no cost for analytical model""" 

986 return 0 

987 

988 def build_interpolator(self, *args): 

989 """Abstract method implementation, return none for an analytical model""" 

990 return None 

991 

992 def update_interpolator(self, *args): 

993 """Abstract method implementation, return `cost=0` for an analytical model""" 

994 return 0 

995 

996 def parallel_add_candidates(self, *args): 

997 """Abstract method implementation, do nothing""" 

998 pass