Coverage for src/amisc/component.py: 68%
496 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-29 21:38 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-29 21:38 +0000
1"""A Component is an `amisc` wrapper around a single discipline model. It manages surrogate construction and optionally
2a hierarchy of modeling fidelities that may be available. Concrete component classes all inherit from the base
3`ComponentSurrogate` class provided here. Components manage an array of `BaseInterpolator` objects to form a
4multifidelity hierarchy.
6Includes:
8- `ComponentSurrogate`: the base class that is fundamental to the adaptive multi-index stochastic collocation strategy
9- `SparseGridSurrogate`: an AMISC component that manages a hierarchy of `LagrangeInterpolator` objects
10- `AnalyticalSurrogate`: a light wrapper around a single discipline model that does not require surrogate approximation
11"""
12import ast
13import copy
14import itertools
15import os
16import tempfile
17from abc import ABC, abstractmethod
18from concurrent.futures import ALL_COMPLETED, Executor, wait
19from pathlib import Path
21import numpy as np
22from joblib import delayed
23from sklearn.linear_model import Ridge
24from sklearn.pipeline import Pipeline
25from sklearn.preprocessing import MaxAbsScaler
27from amisc import IndexSet, InterpResults, MiscTree
28from amisc.interpolator import BaseInterpolator, LagrangeInterpolator
29from amisc.rv import BaseRV
30from amisc.utils import get_logger
33class ComponentSurrogate(ABC):
34 """The base multi-index stochastic collocation (MISC) surrogate class for a single discipline component model.
36 !!! Info "Multi-indices"
37 A multi-index is a tuple of natural numbers, each specifying a level of fidelity. You will frequently see two
38 multi-indices: `alpha` and `beta`. The `alpha` (or $\\alpha$) indices specify physical model fidelity and get
39 passed to the model as an additional argument (e.g. things like discretization level, time step size, etc.).
40 The `beta` (or $\\beta$) indices specify surrogate refinement level, so typically an indication of the amount of
41 training data used. Each fidelity index in $\\alpha$ and $\\beta$ increase in refinement from $0$ up to
42 `max_alpha` and `max_beta`. From the surrogate's perspective, the concatenation of $(\\alpha, \\beta)$ fully
43 specifies a single fidelity "level". The `ComponentSurrogate` forms an approximation of the model by summing
44 up over many of these concatenated sets of $(\\alpha, \\beta)$. These lists are stored in a data structure of
45 `list[ tuple[ tuple, tuple ], ...]`. When $\\alpha$ or $\\beta$ are used as keys in a `dict`, they are cast to
46 a Python `str` from a `tuple`.
48 :ivar index_set: the current active set of multi-indices in the MISC approximation
49 :ivar candidate_set: all neighboring multi-indices that are candidates for inclusion in `index_set`
50 :ivar x_vars: list of variables that define the input domain
51 :ivar ydim: the number of outputs returned by the model
52 :ivar _model: stores a ref to the model or function that is to be approximated, callable as `ret = model(x)`
53 :ivar _model_args: additional arguments to supply to the model
54 :ivar _model_kwargs: additional keyword arguments to supply to the model
55 :ivar truth_alpha: the model fidelity indices to treat as the "ground truth" model
56 :ivar max_refine: the maximum level of refinement for each fidelity index in $(\\alpha, \\beta)$
57 :ivar surrogates: keeps track of the `BaseInterpolator` associated with each set of $(\\alpha, \\beta)$
58 :ivar costs: keeps track of total cost associated with adding a single $(\\alpha, \\beta)$ to the MISC approximation
59 :ivar misc_coeff: the combination technique coefficients for the MISC approximation
61 :vartype index_set: IndexSet
62 :vartype candidate_set: IndexSet
63 :vartype x_vars: list[BaseRV]
64 :vartype ydim: int
65 :vartype _model: callable[np.ndarray] -> dict
66 :vartype _model_args: tuple
67 :vartype _model_kwargs: dict
68 :vartype truth_alpha: tuple[int, ...]
69 :vartype max_refine: list[int, ...]
70 :vartype surrogates: MiscTree
71 :vartype costs: MiscTree
72 :vartype misc_coeff: MiscTree
73 """
75 def __init__(self, x_vars: list[BaseRV] | BaseRV, model: callable,
76 multi_index: IndexSet = None,
77 truth_alpha: tuple = (), max_alpha: tuple = (), max_beta: tuple = (),
78 log_file: str | Path = None, executor: Executor = None,
79 model_args: tuple = (), model_kwargs: dict = None):
80 """Construct the MISC surrogate and initialize with any multi-indices passed in.
82 !!! Info "Model specification"
83 The model is a callable function of the form `ret = model(x, *args, **kwargs)`. The return value is a
84 dictionary of the form `ret = {'y': y, 'files': files, 'cost': cost}`. In the return dictionary, you
85 specify the raw model output `y` as an `np.ndarray` at a _minimum_. Optionally, you can specify paths to
86 output files and the average model cost (in units of seconds of cpu time), and anything else you want.
88 !!! Warning
89 If the model has multiple fidelities, then the function signature must be `model(x, alpha, *args, **kwargs)`
90 ; the first argument after `x` will always be the fidelity indices `alpha`. The rest of `model_args` will
91 be passed in after (you do not need to include `alpha` in `model_args`, it is done automatically).
93 :param x_vars: `[X1, X2, ...]` list of variables specifying bounds/pdfs for each input
94 :param model: the function to approximate, callable as `ret = model(x, *args, **kwargs)`
95 :param multi_index: `[((alpha1), (beta1)), ... ]` list of concatenated multi-indices $(\\alpha, \\beta)$
96 :param truth_alpha: specifies the highest model fidelity indices necessary for a "ground truth" comparison
97 :param max_alpha: the maximum model refinement indices to allow, defaults to `(2,...)` if applicable
98 :param max_beta: the maximum surrogate refinement indices, defaults to `(2,...)` of length `x_dim`
99 :param log_file: specifies a log file (optional)
100 :param executor: parallel executor used to add candidate indices in parallel (optional)
101 :param model_args: optional args to pass when calling the model
102 :param model_kwargs: optional kwargs to pass when calling the model
103 """
104 self.logger = get_logger(self.__class__.__name__, log_file=log_file)
105 self.log_file = log_file
106 self.executor = executor
107 self.training_flag = None # Keep track of which MISC coeffs are active
108 # (True=active set, False=active+candidate sets, None=Neither/unknown)
110 multi_index = list() if multi_index is None else multi_index
111 assert self.is_downward_closed(multi_index), 'Must be a downward closed set.'
112 self.ydim = None
113 self.index_set = [] # The active index set for the MISC approximation
114 self.candidate_set = [] # Candidate indices for refinement
115 self._model = model
116 self._model_args = model_args
117 self._model_kwargs = model_kwargs if model_kwargs is not None else {}
118 self.truth_alpha = truth_alpha
119 self.x_vars = x_vars if isinstance(x_vars, list) else [x_vars]
120 max_alpha = truth_alpha if max_alpha == () else max_alpha
121 max_beta = (2,)*len(self.x_vars) if max_beta == () else max_beta
122 self.max_refine = list(max_alpha + max_beta) # Max refinement indices
124 # Initialize important tree-like structures
125 self.surrogates = dict() # Maps alphas -> betas -> surrogates
126 self.costs = dict() # Maps alphas -> betas -> wall clock run times
127 self.misc_coeff = dict() # Maps alphas -> betas -> MISC coefficients
129 # Construct vectors of [0,1]^dim(alpha+beta)
130 Nij = len(self.max_refine)
131 self.ij = np.zeros((2 ** Nij, Nij), dtype=np.uint8)
132 for i, ele in enumerate(itertools.product([0, 1], repeat=Nij)):
133 self.ij[i, :] = ele
135 # Initialize any indices that were passed in
136 multi_index = list() if multi_index is None else multi_index
137 for alpha, beta in multi_index:
138 self.activate_index(alpha, beta)
140 def activate_index(self, alpha: tuple, beta: tuple):
141 """Add a multi-index to the active set and all neighbors to the candidate set.
143 :param alpha: A multi-index specifying model fidelity
144 :param beta: A multi-index specifying surrogate fidelity
145 """
146 # User is responsible for making sure index set is downward-closed
147 alpha, beta = tuple([int(i) for i in alpha]), tuple([int(i) for i in beta]) # Make sure these are python ints
148 self.add_surrogate(alpha, beta)
149 ele = (alpha, beta)
150 if ele in self.index_set:
151 self.logger.warning(f'Multi-index {ele} is already in the active index set. Ignoring...')
152 return
154 # Add all possible new candidates (distance of one unit vector away)
155 ind = list(alpha + beta)
156 new_candidates = []
157 for i in range(len(ind)):
158 ind_new = ind.copy()
159 ind_new[i] += 1
161 # Don't add if we surpass a refinement limit
162 if np.any(np.array(ind_new) > np.array(self.max_refine)):
163 continue
165 # Add the new index if it maintains downward-closedness
166 new_cand = (tuple(ind_new[:len(alpha)]), tuple(ind_new[len(alpha):]))
167 down_closed = True
168 for j in range(len(ind)):
169 ind_check = ind_new.copy()
170 ind_check[j] -= 1
171 if ind_check[j] >= 0:
172 tup_check = (tuple(ind_check[:len(alpha)]), tuple(ind_check[len(alpha):]))
173 if tup_check not in self.index_set and tup_check != ele:
174 down_closed = False
175 break
176 if down_closed:
177 new_candidates.append(new_cand)
179 # Build an interpolator for each new candidate
180 if self.executor is None: # Sequential
181 for a, b in new_candidates:
182 self.add_surrogate(a, b)
183 else: # Parallel
184 temp_exc = self.executor
185 self.executor = None
186 for a, b in new_candidates:
187 if str(a) not in self.surrogates:
188 self.surrogates[str(a)] = dict()
189 self.costs[str(a)] = dict()
190 self.misc_coeff[str(a)] = dict()
191 self.parallel_add_candidates(new_candidates, temp_exc)
192 self.executor = temp_exc
194 # Move to the active index set
195 if ele in self.candidate_set:
196 self.candidate_set.remove(ele)
197 self.index_set.append(ele)
198 new_candidates = [cand for cand in new_candidates if cand not in self.candidate_set]
199 self.candidate_set.extend(new_candidates)
200 self.training_flag = None # Makes sure misc coeffs get recomputed next time
202 def add_surrogate(self, alpha: tuple, beta: tuple):
203 """Build a `BaseInterpolator` object for a given $(\\alpha, \\beta)$
205 :param alpha: A multi-index specifying model fidelity
206 :param beta: A multi-index specifying surrogate fidelity
207 """
208 # Create a dictionary for each alpha model to store multiple surrogate fidelities (beta)
209 if str(alpha) not in self.surrogates:
210 self.surrogates[str(alpha)] = dict()
211 self.costs[str(alpha)] = dict()
212 self.misc_coeff[str(alpha)] = dict()
214 # Create a new interpolator object for this multi-index (abstract method)
215 if self.surrogates[str(alpha)].get(str(beta), None) is None:
216 self.logger.info(f'Building interpolator for index {(alpha, beta)} ...')
217 x_new_idx, x_new, interp = self.build_interpolator(alpha, beta)
218 self.surrogates[str(alpha)][str(beta)] = interp
219 cost = self.update_interpolator(x_new_idx, x_new, interp) # Awkward, but needed to separate the model evals
220 self.costs[str(alpha)][str(beta)] = cost
221 if self.ydim is None:
222 self.ydim = interp.ydim()
224 def init_coarse(self):
225 """Initialize the coarsest interpolation and add to the active index set"""
226 alpha = (0,) * len(self.truth_alpha)
227 beta = (0,) * len(self.max_refine[len(self.truth_alpha):])
228 self.activate_index(alpha, beta)
230 def iterate_candidates(self):
231 """Iterate candidate indices one by one into the active index set.
233 :yields alpha, beta: the multi-indices of the current candidate that has been moved to active set
234 """
235 for alpha, beta in list(self.candidate_set):
236 # Temporarily add a candidate index to active set
237 self.index_set.append((alpha, beta))
238 yield alpha, beta
239 del self.index_set[-1]
241 def predict(self, x: np.ndarray | float, use_model: str | tuple = None, model_dir: str | Path = None,
242 training: bool = False, index_set: IndexSet = None, ppool=None) -> np.ndarray:
243 """Evaluate the MISC approximation at new points `x`.
245 !!! Note
246 By default this will predict the MISC surrogate approximation. However, for convenience you can also specify
247 `use_model` to call the underlying function instead.
249 :param x: `(..., x_dim)` the points to be interpolated, must be within input domain for accuracy
250 :param use_model: 'best'=high-fidelity, 'worst'=low-fidelity, tuple=a specific `alpha`, None=surrogate (default)
251 :param model_dir: directory to save output files if `use_model` is specified, ignored otherwise
252 :param training: if `True`, then only compute with the active index set, otherwise use all candidates as well
253 :param index_set: a list of concatenated $(\\alpha, \\beta)$ to override `self.index_set` if given, else ignore
254 :param ppool: a joblib `Parallel` pool to loop over multi-indices in parallel
255 :returns y: `(..., y_dim)` the surrogate approximation of the function (or the function itself if `use_model`)
256 """
257 x = np.atleast_1d(x)
258 if use_model is not None:
259 return self._bypass_surrogate(x, use_model, model_dir)
261 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff
263 def run_batch(alpha, beta, y):
264 comb_coeff = misc_coeff[str(alpha)][str(beta)]
265 if np.abs(comb_coeff) > 0:
266 func = self.surrogates[str(alpha)][str(beta)]
267 y += int(comb_coeff) * func(x)
269 if ppool is not None:
270 with tempfile.NamedTemporaryFile(suffix='.dat', mode='w+b', delete=False) as y_fd:
271 pass
272 y_ret = np.memmap(y_fd.name, dtype=x.dtype, mode='r+', shape=x.shape[:-1] + (self.ydim,))
273 ppool(delayed(run_batch)(alpha, beta, y_ret) for alpha, beta in index_set)
274 y = np.empty(y_ret.shape, dtype=x.dtype)
275 y[:] = y_ret[:]
276 del y_ret
277 os.unlink(y_fd.name)
278 else:
279 y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype)
280 for alpha, beta in index_set:
281 run_batch(alpha, beta, y)
283 return y
285 def grad(self, x: np.ndarray | float | list, training: bool = False, index_set: IndexSet = None) -> np.ndarray:
286 """Evaluate the derivative/Jacobian of the MISC approximation at new points `x`.
288 :param x: `(..., x_dim)` the evaluation points, must be within input domain for accuracy
289 :param training: if `True`, then only compute with the active index set, otherwise use all candidates as well
290 :param index_set: a list of concatenated $(\\alpha, \\beta)$ to override `self.index_set` if given, else ignore
291 :returns: `(..., y_dim, x_dim)` the Jacobian of the surrogate approximation
292 """
293 x = np.atleast_1d(x)
294 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff
296 jac = np.zeros(x.shape[:-1] + (self.ydim, len(self.x_vars)), dtype=x.dtype)
297 for alpha, beta in index_set:
298 comb_coeff = misc_coeff[str(alpha)][str(beta)]
299 if np.abs(comb_coeff) > 0:
300 interp = self.surrogates[str(alpha)][str(beta)]
301 jac += int(comb_coeff) * interp.grad(x)
303 return jac
305 def hessian(self, x: np.ndarray | float | list, training: bool = False, index_set: IndexSet = None) -> np.ndarray:
306 """Evaluate the Hessian of the MISC approximation at new points `x`.
308 :param x: `(..., x_dim)` the evaluation points, must be within input domain for accuracy
309 :param training: if `True`, then only compute with the active index set, otherwise use all candidates as well
310 :param index_set: a list of concatenated $(\\alpha, \\beta)$ to override `self.index_set` if given, else ignore
311 :returns: `(..., y_dim, x_dim, x_dim)` the Hessian of the surrogate approximation
312 """
313 x = np.atleast_1d(x)
314 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff
316 hess = np.zeros(x.shape[:-1] + (self.ydim, len(self.x_vars), len(self.x_vars)), x.dtype)
317 for alpha, beta in index_set:
318 comb_coeff = misc_coeff[str(alpha)][str(beta)]
319 if np.abs(comb_coeff) > 0:
320 interp = self.surrogates[str(alpha)][str(beta)]
321 hess += int(comb_coeff) * interp.hessian(x)
323 return hess
325 def __call__(self, *args, **kwargs):
326 """Here for convenience so you can also do `ret = surrogate(x)`, just like the `BaseInterpolator`."""
327 return self.predict(*args, **kwargs)
329 def update_misc_coeffs(self, index_set: IndexSet = None) -> MiscTree:
330 """Update the combination technique coeffs for MISC using the given index set.
332 :param index_set: the index set to consider when computing the MISC coefficients, defaults to the active set
333 :returns: the MISC coefficients for the given index set ($\\alpha$ -> $\\beta$ -> coeff)
334 """
335 if index_set is None:
336 index_set = self.index_set
338 # Construct a (N_indices, dim(alpha+beta)) refactor of the index_set for arrayed computations
339 index_mat = np.zeros((len(index_set), len(self.max_refine)), dtype=np.uint8)
340 for i, (alpha, beta) in enumerate(index_set):
341 index_mat[i, :] = alpha + beta
342 index_mat = np.expand_dims(index_mat, axis=0) # (1, Ns, Nij)
344 misc_coeff = dict()
345 for alpha, beta in index_set:
346 # Add permutations of [0, 1] to (alpha, beta)
347 alpha_beta = np.array(alpha+beta, dtype=np.uint8)[np.newaxis, :] # (1, Nij)
348 new_indices = np.expand_dims(alpha_beta + self.ij, axis=1) # (2**Nij, 1, Nij)
350 # Find which indices are in the index_set (using np broadcasting comparison)
351 diff = new_indices - index_mat # (2**Nij, Ns, Nij)
352 idx = np.count_nonzero(diff, axis=-1) == 0 # (2**Nij, Ns)
353 idx = np.any(idx, axis=-1) # (2**Nij,)
354 ij_use = self.ij[idx, :] # (*, Nij)
355 l1_norm = np.sum(np.abs(ij_use), axis=-1) # (*,)
356 coeff = np.sum((-1.) ** l1_norm) # float
358 # Save misc coeff to a dict() tree structure
359 if misc_coeff.get(str(alpha)) is None:
360 misc_coeff[str(alpha)] = dict()
361 misc_coeff[str(alpha)][str(beta)] = coeff
362 self.misc_coeff[str(alpha)][str(beta)] = coeff
364 return misc_coeff
366 def get_sub_surrogate(self, alpha: tuple, beta: tuple) -> BaseInterpolator:
367 """Get the specific sub-surrogate corresponding to the $(\\alpha, \\beta)$ fidelity.
369 :param alpha: A multi-index specifying model fidelity
370 :param beta: A multi-index specifying surrogate fidelity
371 :returns: the corresponding `BaseInterpolator` object
372 """
373 return self.surrogates[str(alpha)][str(beta)]
375 def get_cost(self, alpha: tuple, beta: tuple) -> float:
376 """Return the total cost (wall time s) required to add $(\\alpha, \\beta)$ to the MISC approximation.
378 :param alpha: A multi-index specifying model fidelity
379 :param beta: A multi-index specifying surrogate fidelity
380 """
381 try:
382 return self.costs[str(alpha)][str(beta)]
383 except Exception:
384 return 0.0
386 def update_input_bds(self, idx: int, bds: tuple):
387 """Update the bounds of the input variable at the given index.
389 :param idx: the index of the input variable to update
390 :param bds: the new bounds
391 """
392 self.x_vars[int(idx)].update_bounds(*bds)
394 # Update the bounds in all associated surrogates
395 for alpha in self.surrogates:
396 for beta in self.surrogates[alpha]:
397 self.surrogates[alpha][beta].update_input_bds(idx, bds)
399 def save_enabled(self):
400 """Return whether this model wants to save outputs to file.
402 !!! Note
403 You can specify that a model wants to save outputs to file by providing an `'output_dir'` kwarg.
404 """
405 return self._model_kwargs.get('output_dir') is not None
407 def _set_output_dir(self, output_dir: str | Path):
408 """Update the component model output directory.
410 :param output_dir: the new directory for model output files
411 """
412 if output_dir is not None:
413 output_dir = str(Path(output_dir).resolve())
414 self._model_kwargs['output_dir'] = output_dir
415 for alpha in self.surrogates:
416 for beta in self.surrogates[alpha]:
417 self.surrogates[alpha][beta]._model_kwargs['output_dir'] = output_dir
419 def __repr__(self):
420 """Shows all multi-indices in the current approximation and their corresponding MISC coefficients."""
421 s = f'Inputs \u2014 {[str(var) for var in self.x_vars]}\n'
422 if self.training_flag is None:
423 self.update_misc_coeffs()
424 self.training_flag = True
426 if self.training_flag:
427 s += '(Training mode)\n'
428 for alpha, beta in self.index_set:
429 s += f"[{int(self.misc_coeff[str(alpha)][str(beta)])}] \u2014 {alpha}, {beta}\n"
430 for alpha, beta in self.candidate_set:
431 s += f"[-] \u2014 {alpha}, {beta}\n"
432 else:
433 s += '(Evaluation mode)\n'
434 for alpha, beta in self.index_set + self.candidate_set:
435 s += f"[{int(self.misc_coeff[str(alpha)][str(beta)])}] \u2014 {alpha}, {beta}\n"
436 return s
438 def __str__(self):
439 """Everyone will view these objects the same way."""
440 return self.__repr__()
442 def _bypass_surrogate(self, x, use_model, model_dir):
443 """Bypass surrogate evaluation and use the specified model"""
444 output_dir = self._model_kwargs.get('output_dir')
445 if self.save_enabled():
446 self._model_kwargs['output_dir'] = model_dir
448 alpha_use = {'best': self.truth_alpha, 'worst': (0,) * len(self.truth_alpha)}.get(use_model, use_model)
449 kwargs = copy.deepcopy(self._model_kwargs)
450 if len(alpha_use) > 0:
451 kwargs['alpha'] = alpha_use
452 ret = self._model(x, *self._model_args, **kwargs)
454 if output_dir is not None:
455 self._model_kwargs['output_dir'] = output_dir
457 if not isinstance(ret, dict):
458 self.logger.warning(f"Function {self._model} did not return a dict of the form {{'y': y}}. Please make sure"
459 f" you do so to avoid conflicts. Returning the value directly instead...")
461 return ret['y'] if isinstance(ret, dict) else ret
463 def _combination(self, index_set, training):
464 """Decide which index set and corresponding misc coefficients to use."""
465 misc_coeff = copy.deepcopy(self.misc_coeff)
466 if index_set is None:
467 # Use active indices + candidate indices depending on training mode
468 index_set = self.index_set if training else self.index_set + self.candidate_set
470 # Decide when to update misc coefficients
471 if self.training_flag is None:
472 misc_coeff = self.update_misc_coeffs(index_set) # On initialization or reset
473 else:
474 if (not self.training_flag and training) or (self.training_flag and not training):
475 misc_coeff = self.update_misc_coeffs(index_set) # Logical XOR cases for training mode
477 # Save an indication of what state the MISC coefficients are in (i.e. training or eval mode)
478 self.training_flag = training
479 else:
480 # If we passed in an index set, always recompute misc coeff and toggle for reset on next call
481 misc_coeff = self.update_misc_coeffs(index_set)
482 self.training_flag = None
484 return index_set, misc_coeff
486 @staticmethod
487 def is_one_level_refinement(beta_old: tuple, beta_new: tuple) -> bool:
488 """Check if a new `beta` multi-index is a one-level refinement from a previous `beta`.
490 !!! Example
491 Refining from `(0, 1, 2)` to the new multi-index `(1, 1, 2)` is a one-level refinement. But refining to
492 either `(2, 1, 2)` or `(1, 2, 2)` are not, since more than one refinement occurs at the same time.
494 :param beta_old: the starting multi-index
495 :param beta_new: the new refined multi-index
496 :returns: whether `beta_new` is a one-level refinement from `beta_old`
497 """
498 level_diff = np.array(beta_new, dtype=int) - np.array(beta_old, dtype=int)
499 ind = np.nonzero(level_diff)[0]
500 return ind.shape[0] == 1 and level_diff[ind] == 1
502 @staticmethod
503 def is_downward_closed(indices: IndexSet) -> bool:
504 """Return if a list of $(\\alpha, \\beta)$ multi-indices is downward-closed.
506 MISC approximations require a downward-closed set in order to use the combination-technique formula for the
507 coefficients (as implemented here).
509 !!! Example
510 The list `[( (0,), (0,) ), ( (1,), (0,) ), ( (1,), (1,) )]` is downward-closed. You can visualize this as
511 building a stack of cubes: in order to place a cube, all adjacent cubes must be present (does the logo
512 make sense now?).
514 :param indices: list() of (`alpha`, `beta`) multi-indices
515 :returns: whether the set of indices is downward-closed
516 """
517 # Iterate over every multi-index
518 for alpha, beta in indices:
519 # Every smaller multi-index must also be included in the indices list
520 sub_sets = [np.arange(tuple(alpha+beta)[i]+1) for i in range(len(alpha) + len(beta))]
521 for ele in itertools.product(*sub_sets):
522 tup = (tuple(ele[:len(alpha)]), tuple(ele[len(alpha):]))
523 if tup not in indices:
524 return False
525 return True
527 @abstractmethod
528 def build_interpolator(self, alpha: tuple, beta: tuple) -> InterpResults:
529 """Return a `BaseInterpolator` object and new refinement points for a given $(\\alpha, \\beta)$ multi-index.
531 :param alpha: A multi-index specifying model fidelity
532 :param beta: A multi-index specifying surrogate fidelity
533 :returns: `idx`, `x`, `interp` - list of new grid indices, the new grid points `(N_new, x_dim)`, and the
534 `BaseInterpolator` object. Similar to `BaseInterpolator.refine()`.
535 """
536 pass
538 @abstractmethod
539 def update_interpolator(self, x_new_idx: list[int | tuple | str],
540 x_new: np.ndarray, interp: BaseInterpolator) -> float:
541 """Secondary method to actually compute and save model evaluations within the interpolator.
543 !!! Note
544 This distinction with `build_interpolator` was necessary to separately construct the interpolator and be
545 able to evaluate the model at the new interpolation points. You can see that `parallel_add_candidates`
546 uses this distinction to compute the model in parallel on MPI workers, for example.
548 :param x_new_idx: list of new grid point indices
549 :param x_new: `(N_new, x_dim)`, the new grid point locations
550 :param interp: the `BaseInterpolator` object to compute model evaluations with
551 :returns cost: the cost (in wall time seconds) required to add this `BaseInterpolator` object
552 """
553 pass
555 @abstractmethod
556 def parallel_add_candidates(self, candidates: IndexSet, executor: Executor):
557 """Defines a function to handle adding candidate indices in parallel.
559 !!! Note
560 While `build_interpolator` can make changes to 'self', these changes will not be saved in the master task
561 if running in parallel over MPI workers, for example. This method is a workaround so that all required
562 mutable changes to 'self' are made in the master task, before distributing tasks to parallel workers
563 using this method. You can pass if you don't plan to add candidates in parallel.
565 :param candidates: list of [(alpha, beta),...] multi-indices
566 :param executor: the executor used to iterate candidates in parallel
567 """
568 pass
571class SparseGridSurrogate(ComponentSurrogate):
572 """Concrete MISC surrogate class that maintains a sparse grid composed of smaller tensor-product grids.
574 !!! Note
575 MISC itself can be thought of as an extension to the well-known sparse grid technique, so this class
576 readily integrates with the MISC implementation in `ComponentSurrogate`. Sparse grids limit the curse
577 of dimensionality up to about `dim = 10-15` for the input space (which would otherwise be infeasible with a
578 normal full tensor-product grid of the same size).
580 !!! Info "About points in a sparse grid"
581 A sparse grid approximates a full tensor-product grid $(N_1, N_2, ..., N_d)$, where $N_i$ is the number of grid
582 points along dimension $i$, for a $d$-dimensional space. Each point is uniquely identified in the sparse grid
583 by a list of indices $(j_1, j_2, ..., j_d)$, where $j_i = 0 ... N_i$. We refer to this unique identifier as a
584 "grid coordinate". In the `HashSG` data structure, we use a `str(tuple(coord))` representation to uniquely
585 identify the coordinate in a hash DS like Python's `dict`.
587 :cvar HashSG: a type alias for the hash storage of the sparse grid data (a tree-like DS using dicts)
588 :ivar curr_max_beta: the current maximum $\\beta$ refinement indices in the sparse grid (for each $\\alpha$)
589 :ivar x_grids: maps $\\alpha$ indices to a list of 1d grids corresponding to `curr_max_beta`
590 :ivar xi_map: the sparse grid interpolation points
591 :ivar yi_map: the function values at all sparse grid points
592 :ivar yi_nan_map: imputed function values to use when `yi_map` contains `nan` data (sometimes the model fails...)
593 :ivar yi_files: optional filenames corresponding to the sparse grid `yi_map` data
595 :vartype HashSG: dict[str: dict[str: np.ndarray | str]]
596 :vartype curr_max_beta: dict[str: list[int]]
597 :vartype x_grids: dict[str: np.ndarray]
598 :vartype xi_map: HashSG
599 :vartype yi_map: HashSG
600 :vartype yi_nan_map: HashSG
601 :vartype yi_files: HashSG
602 """
604 HashSG = dict[str: dict[str: np.ndarray | str]]
606 def __init__(self, *args, **kwargs):
607 # Initialize tree-like hash structures for maintaining a sparse grid of smaller tensor-product grids
608 self.curr_max_beta = dict() # Maps alphas -> current max refinement indices
609 self.x_grids = dict() # Maps alphas -> list of ndarrays specifying 1d grids corresponding to max_beta
610 self.xi_map = dict() # Maps alphas -> grid point coords -> interpolation points
611 self.yi_map = dict() # Maps alphas -> grid point coords -> interpolation function values
612 self.yi_nan_map = dict() # Maps alphas -> grid point coords -> interpolated yi values when yi=nan
613 self.yi_files = dict() # Maps alphas -> grid point coords -> model output files (optional)
614 super().__init__(*args, **kwargs)
616 # Override
617 def predict(self, x, use_model=None, model_dir=None, training=False, index_set=None, ppool=None):
618 """Need to override `super()` to allow passing in interpolation grids `xi` and `yi`."""
619 x = np.atleast_1d(x)
620 if use_model is not None:
621 return self._bypass_surrogate(x, use_model, model_dir)
623 index_set, misc_coeff = self._combination(index_set, training)
625 def run_batch(alpha, beta, y):
626 comb_coeff = misc_coeff[str(alpha)][str(beta)]
627 if np.abs(comb_coeff) > 0:
628 # Gather the xi/yi interpolation points/qoi_ind for this sub tensor-product grid
629 interp = self.surrogates[str(alpha)][str(beta)]
630 xi, yi = self.get_tensor_grid(alpha, beta)
632 # Add this sub tensor-product grid to the MISC approximation
633 y += int(comb_coeff) * interp(x, xi=xi, yi=yi)
635 if ppool is not None:
636 with tempfile.NamedTemporaryFile(suffix='.dat', mode='w+b', delete=False) as y_fd:
637 pass
638 y_ret = np.memmap(y_fd.name, dtype=x.dtype, mode='r+', shape=x.shape[:-1] + (self.ydim,))
639 ppool(delayed(run_batch)(alpha, beta, y_ret) for alpha, beta in index_set)
640 y = np.empty(y_ret.shape, dtype=x.dtype)
641 y[:] = y_ret[:]
642 del y_ret
643 os.unlink(y_fd.name)
644 else:
645 y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype)
646 for alpha, beta in index_set:
647 run_batch(alpha, beta, y)
649 return y
651 # Override
652 def grad(self, x, training=False, index_set=None):
653 """Need to override `super()` to allow passing in interpolation grids `xi` and `yi`."""
654 x = np.atleast_1d(x)
655 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff
657 jac = np.zeros(x.shape[:-1] + (self.ydim, len(self.x_vars)), dtype=x.dtype)
658 for alpha, beta in index_set:
659 comb_coeff = misc_coeff[str(alpha)][str(beta)]
660 if np.abs(comb_coeff) > 0:
661 # Gather the xi/yi interpolation points/qoi_ind for this sub tensor-product grid
662 interp = self.surrogates[str(alpha)][str(beta)]
663 xi, yi = self.get_tensor_grid(alpha, beta)
665 jac += int(comb_coeff) * interp.grad(x, xi=xi, yi=yi)
667 return jac
669 # Override
670 def hessian(self, x, training=False, index_set=None):
671 """Need to override `super()` to allow passing in interpolation grids `xi` and `yi`."""
672 x = np.atleast_1d(x)
673 index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff
675 hess = np.zeros(x.shape[:-1] + (self.ydim, len(self.x_vars), len(self.x_vars)), dtype=x.dtype)
676 for alpha, beta in index_set:
677 comb_coeff = misc_coeff[str(alpha)][str(beta)]
678 if np.abs(comb_coeff) > 0:
679 # Gather the xi/yi interpolation points/qoi_ind for this sub tensor-product grid
680 interp = self.surrogates[str(alpha)][str(beta)]
681 xi, yi = self.get_tensor_grid(alpha, beta)
683 hess += int(comb_coeff) * interp.hessian(x, xi=xi, yi=yi)
685 return hess
687 def get_tensor_grid(self, alpha: tuple, beta: tuple, update_nan: bool = True) -> tuple[np.ndarray, np.ndarray]:
688 """Construct the `xi/yi` sub tensor-product grids for a given $(\\alpha, \\beta)$ multi-index.
690 :param alpha: model fidelity multi-index
691 :param beta: surrogate fidelity multi-index
692 :param update_nan: try to fill `nan` with imputed values, otherwise just return the `nans` in place
693 :returns: `xi, yi`, of size `(prod(grid_sizes), x_dim)` and `(prod(grid_sizes), y_dim)` respectively, the
694 interpolation grid points and corresponding function values for this tensor-product grid
695 """
696 interp = self.surrogates[str(alpha)][str(beta)]
697 grid_sizes = interp.get_grid_sizes(beta)
698 coords = [list(range(grid_sizes[n])) for n in range(interp.xdim())]
699 xi = np.zeros((np.prod(grid_sizes), interp.xdim()), dtype=np.float32)
700 yi = np.zeros((np.prod(grid_sizes), self.ydim), dtype=np.float32)
701 for i, coord in enumerate(itertools.product(*coords)):
702 xi[i, :] = self.xi_map[str(alpha)][str(coord)]
703 yi_curr = self.yi_map[str(alpha)][str(coord)]
704 if update_nan and np.any(np.isnan(yi_curr)):
705 # Try to replace NaN values if they are stored
706 yi_curr = self.yi_nan_map[str(alpha)].get(str(coord), yi_curr)
707 yi[i, :] = yi_curr
709 return xi, yi
711 def get_training_data(self) -> tuple[dict[str: np.ndarray], dict[str: np.ndarray]]:
712 """Grab all `x,y` training data stored in the sparse grid for each model fidelity level $\\alpha$.
714 :returns: `xi`, `yi`, each a `dict` mapping `alpha` indices to `np.ndarrays`
715 """
716 xi, yi = dict(), dict()
717 for alpha, x_map in self.xi_map.items():
718 x = np.zeros((len(x_map), len(self.x_vars)))
719 y = np.zeros((len(x_map), self.ydim))
720 for i, (coord, x_coord) in enumerate(x_map.items()):
721 x[i, :] = x_coord
722 y[i, :] = self.yi_nan_map[alpha].get(coord, self.yi_map[alpha][coord])
724 xi[alpha] = x
725 yi[alpha] = y
727 return xi, yi
729 def update_yi(self, alpha: tuple, beta: tuple, yi_dict: dict[str: np.ndarray]):
730 """Helper method to update `yi` values, accounting for possible `nans` by regression imputation.
732 :param alpha: the model fidelity indices
733 :param beta: the surrogate fidelity indices
734 :param yi_dict: a `dict` mapping `str(coord)` grid coordinates to function values
735 """
736 self.yi_map[str(alpha)].update(yi_dict)
737 imputer, xdim = None, len(self.x_vars)
738 for grid_coord, yi in yi_dict.items():
739 if np.any(np.isnan(yi)):
740 if imputer is None:
741 # Grab all 'good' interpolation points and train a simple linear regression fit
742 xi_mat, yi_mat = np.zeros((0, xdim)), np.zeros((0, self.ydim))
743 for coord, xi in self.xi_map[str(alpha)].items():
744 if coord not in self.yi_nan_map[str(alpha)] and coord in self.yi_map[str(alpha)]:
745 yi_add = self.yi_map[str(alpha)][str(coord)]
746 xi_mat = np.concatenate((xi_mat, xi.reshape((1, xdim))), axis=0)
747 yi_mat = np.concatenate((yi_mat, yi_add.reshape((1, self.ydim))), axis=0)
748 nan_idx = np.any(np.isnan(yi_mat), axis=-1)
749 xi_mat = xi_mat[~nan_idx, :]
750 yi_mat = yi_mat[~nan_idx, :]
751 imputer = Pipeline([('scaler', MaxAbsScaler()), ('model', Ridge(alpha=1))])
752 imputer.fit(xi_mat, yi_mat)
753 x_interp = self.xi_map[str(alpha)][str(grid_coord)].reshape((1, xdim))
754 y_interp = np.atleast_1d(np.squeeze(imputer.predict(x_interp)))
755 nan_idx = np.isnan(yi)
756 y_interp[~nan_idx] = yi[~nan_idx] # Only keep imputed values where yi is nan
757 self.yi_nan_map[str(alpha)][str(grid_coord)] = y_interp
759 # Go back and try to re-interpolate old nan values as more points get added to the grid
760 if imputer is not None:
761 for grid_coord in list(self.yi_nan_map[str(alpha)].keys()):
762 if grid_coord not in yi_dict:
763 x_interp = self.xi_map[str(alpha)][str(grid_coord)].reshape((1, xdim))
764 y_interp = imputer.predict(x_interp)
765 self.yi_nan_map[str(alpha)][str(grid_coord)] = np.atleast_1d(np.squeeze(y_interp))
767 # Override
768 def get_sub_surrogate(self, alpha: tuple, beta: tuple, include_grid: bool = False) -> BaseInterpolator:
769 """Get the specific sub-surrogate corresponding to the $(\\alpha, \\beta)$ fidelity.
771 :param alpha: A multi-index specifying model fidelity
772 :param beta: A multi-index specifying surrogate fidelity
773 :param include_grid: whether to add the `xi/yi` interpolation points to the returned `BaseInterpolator` object
774 :returns: the `BaseInterpolator` object corresponding to $(\\alpha, \\beta)$
775 """
776 interp = super().get_sub_surrogate(alpha, beta)
777 if include_grid:
778 interp.xi, interp.yi = self.get_tensor_grid(alpha, beta)
779 return interp
781 def build_interpolator(self, alpha, beta):
782 """Abstract method implementation for constructing the tensor-product grid interpolator."""
783 # Create a new tensor-product grid interpolator for the base index (0, 0, ...)
784 if np.sum(beta) == 0:
785 kwargs = copy.deepcopy(self._model_kwargs)
786 if len(alpha) > 0:
787 kwargs['alpha'] = alpha
788 interp = LagrangeInterpolator(beta, self.x_vars, model=self._model, model_args=self._model_args,
789 model_kwargs=kwargs, init_grids=True, reduced=True)
790 x_pt = np.array([float(interp.x_grids[n][beta[n]]) for n in range(interp.xdim())], dtype=np.float32)
791 self.curr_max_beta[str(alpha)] = list(beta)
792 self.x_grids[str(alpha)] = copy.deepcopy(interp.x_grids)
793 self.xi_map[str(alpha)] = {str(beta): x_pt}
794 self.yi_map[str(alpha)] = dict()
795 self.yi_nan_map[str(alpha)] = dict()
796 if self.save_enabled():
797 self.yi_files[str(alpha)] = dict()
799 return [beta], x_pt.reshape((1, len(self.x_vars))), interp
800 # Otherwise, all other indices are a refinement of previous grids
802 # Look for first multi-index neighbor that is one level of refinement away
803 refine_tup = None
804 for beta_old_str in list(self.surrogates[str(alpha)].keys()):
805 beta_old = ast.literal_eval(beta_old_str)
806 if self.is_one_level_refinement(beta_old, beta):
807 idx_refine = int(np.nonzero(np.array(beta, dtype=int) - np.array(beta_old, dtype=int))[0][0])
808 refine_level = beta[idx_refine]
809 if refine_level > self.curr_max_beta[str(alpha)][idx_refine]:
810 # Generate next refinement grid and save (refine_tup = tuple(x_new_idx, x_new, interp))
811 refine_tup = self.surrogates[str(alpha)][beta_old_str].refine(beta, auto=False)
812 self.curr_max_beta[str(alpha)][idx_refine] = refine_level
813 self.x_grids[str(alpha)][idx_refine] = copy.deepcopy(refine_tup[2].x_grids[idx_refine])
814 else:
815 # Access the refinement grid from memory (it is already computed)
816 num_pts = self.surrogates[str(alpha)][beta_old_str].get_grid_sizes(beta)[idx_refine]
817 x_refine = self.x_grids[str(alpha)][idx_refine][:num_pts]
818 refine_tup = self.surrogates[str(alpha)][beta_old_str].refine(beta, x_refine=x_refine,
819 auto=False)
820 break # Only need to grab one neighbor
822 # Gather new interpolation grid points
823 x_new_idx, x_new, interp = refine_tup
824 xn_coord = [] # Save multi-index coordinates of points to compute model at for refinement
825 xn_pts = np.zeros((0, interp.xdim()), dtype=np.float32) # Save physical x location of new points
826 for i, multi_idx in enumerate(x_new_idx):
827 if str(multi_idx) not in self.yi_map[str(alpha)]:
828 # We have not computed this grid coordinate yet
829 xn_coord.append(multi_idx)
830 xn_pts = np.concatenate((xn_pts, x_new[i, np.newaxis, :]), axis=0) # (N_new, xdim)
831 self.xi_map[str(alpha)][str(multi_idx)] = x_new[i, :]
833 return xn_coord, xn_pts, interp
835 def update_interpolator(self, x_new_idx, x_new, interp):
836 """Awkward solution, I know, but actually compute and save the model evaluations here."""
837 # Compute and store model output at new refinement points in a hash structure
838 yi_ret = interp.set_yi(x_new=(x_new_idx, x_new))
840 if self.ydim is None:
841 for coord_str, yi in yi_ret['y'].items():
842 self.ydim = yi.shape[0]
843 break
845 alpha = interp._model_kwargs.get('alpha', ())
846 self.update_yi(alpha, interp.beta, yi_ret['y'])
847 if self.save_enabled():
848 self.yi_files[str(alpha)].update(yi_ret['files'])
849 cost = interp.model_cost * len(x_new_idx)
851 return cost
853 def parallel_add_candidates(self, candidates: IndexSet, executor: Executor):
854 """Work-around to make sure mutable instance variable changes are made before/after
855 splitting tasks using this method over parallel (potentially MPI) workers. You can pass if you are not
856 interested in such parallel ideas.
858 !!! Warning
859 MPI workers cannot save changes to `self` so this method should only distribute static tasks to the workers.
861 :param candidates: list of [(alpha, beta),...] multi-indices
862 :param executor: the executor used to iterate candidates in parallel
863 """
864 # Do sequential tasks first (i.e. make mutable changes to self), build up parallel task args
865 task_args = []
866 for alpha, beta in candidates:
867 x_new_idx, x_new, interp = self.build_interpolator(alpha, beta)
868 task_args.append((alpha, beta, x_new_idx, x_new, interp))
870 def parallel_task(alpha, beta, x_new_idx, x_new, interp):
871 # Must return anything you want changed in self or interp (mutable changes aren't saved over MPI workers)
872 logger = get_logger(self.__class__.__name__, log_file=self.log_file, stdout=False)
873 logger.info(f'Building interpolator for index {(alpha, beta)} ...')
874 yi_ret = interp.set_yi(x_new=(x_new_idx, x_new))
875 model_cost = interp.model_cost if interp.model_cost is not None else 1
876 return yi_ret, model_cost
878 # Wait for all parallel workers to return
879 fs = [executor.submit(parallel_task, *args) for args in task_args]
880 wait(fs, timeout=None, return_when=ALL_COMPLETED)
882 # Update self and interp with the results from all workers (and check for errors)
883 for i, future in enumerate(fs):
884 try:
885 a = task_args[i][0]
886 b = task_args[i][1]
887 x_new_idx = task_args[i][2]
888 interp = task_args[i][4]
889 yi_ret, model_cost = future.result()
890 interp.model_cost = model_cost
891 self.surrogates[str(a)][str(b)] = interp
892 self.update_yi(a, b, yi_ret['y'])
893 if self.save_enabled():
894 self.yi_files[str(a)].update(yi_ret['files'])
895 self.costs[str(a)][str(b)] = interp.model_cost * len(x_new_idx)
897 if self.ydim is None:
898 for coord_str, yi in self.yi_map[str(a)].items():
899 self.ydim = yi.shape[0]
900 break
901 except:
902 self.logger.error(f'An exception occurred in a thread handling build_interpolator{candidates[i]}')
903 raise
906class AnalyticalSurrogate(ComponentSurrogate):
907 """Concrete "surrogate" class that just uses the analytical model (i.e. bypasses surrogate evaluation)."""
909 def __init__(self, x_vars, model, *args, **kwargs):
910 """Initializes a stand-in `ComponentSurrogate` with all unnecessary fields set to empty.
912 !!! Warning
913 This overwrites anything passed in for `truth_alpha`, `max_alpha`, `max_beta`, or `multi_index` since
914 these are not used for an analytical model.
915 """
916 kwargs['truth_alpha'] = ()
917 kwargs['max_alpha'] = ()
918 kwargs['max_beta'] = ()
919 kwargs['multi_index'] = []
920 super().__init__(x_vars, model, *args, **kwargs)
922 # Override
923 def predict(self, x: np.ndarray | float, **kwargs) -> np.ndarray:
924 """Evaluate the analytical model at points `x`, ignore extra `**kwargs` passed in.
926 :param x: `(..., x_dim)` the points to be evaluated
927 :returns y: `(..., y_dim)` the exact model output at the input points
928 """
929 ret = self._model(x, *self._model_args, **self._model_kwargs)
931 if not isinstance(ret, dict):
932 self.logger.warning(f"Function {self._model} did not return a dict of the form {{'y': y}}. Please make sure"
933 f" you do so to avoid conflicts. Returning the value directly instead...")
935 return ret['y'] if isinstance(ret, dict) else ret
937 # Override
938 def grad(self, x, training=False, index_set=None):
939 """Use auto-diff to compute derivative of an analytical model. Model must be implemented with `numpy`.
941 !!! Warning "Not implemented yet"
942 Hypothetically, auto-diff libraries like `jax` could be used to write a generic gradient function here for
943 analytical models implemented directly in Python/numpy. But there are a lot of quirks that should be worked
944 out first.
945 """
946 raise NotImplementedError('Need to implement a generic auto-diff function here, like using jax for example.')
948 def hessian(self, x, training=False, index_set=None):
949 """Use auto-diff to compute derivative of an analytical model. Model must be implemented with `numpy`.
951 !!! Warning "Not implemented yet"
952 Hypothetically, auto-diff libraries like `jax` could be used to write a generic Hessian function here for
953 analytical models implemented directly in Python/numpy. But there are a lot of quirks that should be worked
954 out first.
955 """
956 raise NotImplementedError('Need to implement a generic auto-diff function here, like using jax for example.')
958 # Override
959 def activate_index(self, *args):
960 """Do nothing"""
961 pass
963 # Override
964 def add_surrogate(self, *args):
965 """Do nothing"""
966 pass
968 # Override
969 def init_coarse(self):
970 """Do nothing"""
971 pass
973 # Override
974 def update_misc_coeffs(self, **kwargs):
975 """Do nothing"""
976 pass
978 # Override
979 def get_sub_surrogate(self, *args):
980 """Nothing to return for analytical model"""
981 return None
983 # Override
984 def get_cost(self, *args):
985 """Return no cost for analytical model"""
986 return 0
988 def build_interpolator(self, *args):
989 """Abstract method implementation, return none for an analytical model"""
990 return None
992 def update_interpolator(self, *args):
993 """Abstract method implementation, return `cost=0` for an analytical model"""
994 return 0
996 def parallel_add_candidates(self, *args):
997 """Abstract method implementation, do nothing"""
998 pass