Skip to content

amisc.utils

Provides some basic utilities for the package.

Includes:

  • to_model_dataset — convert surrogate input/output dataset to a form usable by the true model
  • to_surrogate_dataset — convert true model input/output dataset to a form usable by the surrogate
  • constrained_lls — solve a constrained linear least squares problem
  • search_for_file — search for a file in the current working directory and additional search paths
  • format_inputs — broadcast and reshape all inputs to the same shape
  • format_outputs — reshape all outputs to a common loop shape
  • parse_function_string — convert function-like strings to arguments and keyword-arguments
  • relative_error — compute the relative L2 error between two vectors
  • get_logger — logging utility with nice formatting

constrained_lls(A, b, C, d)

Minimize \(||Ax-b||_2\), subject to \(Cx=d\), i.e. constrained linear least squares.

Note

See these lecture notes for more detail.

PARAMETER DESCRIPTION
A

(..., M, N), vandermonde matrix

TYPE: ndarray

b

(..., M, 1), data

TYPE: ndarray

C

(..., P, N), constraint operator

TYPE: ndarray

d

(..., P, 1), constraint condition

TYPE: ndarray

RETURNS DESCRIPTION
ndarray

(..., N, 1), the solution parameter vector x

Source code in src/amisc/utils.py
def constrained_lls(A: np.ndarray, b: np.ndarray, C: np.ndarray, d: np.ndarray) -> np.ndarray:
    """Minimize $||Ax-b||_2$, subject to $Cx=d$, i.e. constrained linear least squares.

    !!! Note
        See [these lecture notes](http://www.seas.ucla.edu/~vandenbe/133A/lectures/cls.pdf) for more detail.

    :param A: `(..., M, N)`, vandermonde matrix
    :param b: `(..., M, 1)`, data
    :param C: `(..., P, N)`, constraint operator
    :param d: `(..., P, 1)`, constraint condition
    :returns: `(..., N, 1)`, the solution parameter vector `x`
    """
    M = A.shape[-2]
    dims = len(A.shape[:-2])
    T_axes = tuple(np.arange(0, dims)) + (-1, -2)
    Q, R = np.linalg.qr(np.concatenate((A, C), axis=-2))
    Q1 = Q[..., :M, :]
    Q2 = Q[..., M:, :]
    Q1_T = np.transpose(Q1, axes=T_axes)
    Q2_T = np.transpose(Q2, axes=T_axes)
    Qtilde, Rtilde = np.linalg.qr(Q2_T)
    Qtilde_T = np.transpose(Qtilde, axes=T_axes)
    Rtilde_T_inv = np.linalg.pinv(np.transpose(Rtilde, axes=T_axes))
    w = np.linalg.pinv(Rtilde) @ (Qtilde_T @ Q1_T @ b - Rtilde_T_inv @ d)

    return np.linalg.pinv(R) @ (Q1_T @ b - Q2_T @ w)

format_inputs(inputs, var_shape=None)

Broadcast and reshape all inputs to the same shape. Loop shape is inferred from broadcasting the leading dims of all input arrays. Input arrays are broadcast to this shape and then flattened.

Example

inputs = {'x': np.random.rand(10, 1, 5), 'y': np.random.rand(1, 1), 'z': np.random.rand(1, 20, 3)}
fmt_inputs, loop_shape = format_inputs(inputs)
# Output: {'x': np.ndarray(200, 5), 'y': np.ndarray(200,), 'z': np.ndarray(200, 3)}, (10, 20)
PARAMETER DESCRIPTION
inputs

dict of input arrays

TYPE: Dataset

var_shape

dict of expected input variable shapes (i.e. for field quantities); assumes all inputs are 1d if None or not specified (i.e. scalar)

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION
tuple[Dataset, tuple[int, ...]]

the reshaped inputs and the common loop shape

Source code in src/amisc/utils.py
def format_inputs(inputs: Dataset, var_shape: dict = None) -> tuple[Dataset, tuple[int, ...]]:
    """Broadcast and reshape all inputs to the same shape. Loop shape is inferred from broadcasting the leading dims
    of all input arrays. Input arrays are broadcast to this shape and then flattened.

    !!! Example
        ```python
        inputs = {'x': np.random.rand(10, 1, 5), 'y': np.random.rand(1, 1), 'z': np.random.rand(1, 20, 3)}
        fmt_inputs, loop_shape = format_inputs(inputs)
        # Output: {'x': np.ndarray(200, 5), 'y': np.ndarray(200,), 'z': np.ndarray(200, 3)}, (10, 20)
        ```

    :param inputs: `dict` of input arrays
    :param var_shape: `dict` of expected input variable shapes (i.e. for field quantities); assumes all inputs are 1d
                      if None or not specified (i.e. scalar)
    :returns: the reshaped inputs and the common loop shape
    """
    var_shape = var_shape or {}

    def _common_shape(shape1, shape2):
        """Find the common leading dimensions between two shapes (with np broadcasting rules)."""
        min_len = min(len(shape1), len(shape2))
        common_shape = []
        for i in range(min_len):
            if shape1[i] == shape2[i]:
                common_shape.append(shape1[i])
            elif shape1[i] == 1:
                common_shape.append(shape2[i])
            elif shape2[i] == 1:
                common_shape.append(shape1[i])
            else:
                break
        return tuple(common_shape)

    def _shorten_shape(name, array):
        """Remove extra variable dimensions from the end of the array shape (i.e. field quantity dimensions)."""
        shape = var_shape.get(name, None)
        if shape is not None and len(shape) > 0:
            if len(shape) > len(array.shape):
                raise ValueError(f"Variable '{name}' shape {shape} is longer than input array shape {array.shape}. "
                                 f"The input array for '{name}' should have at least {len(shape)} dimensions.")
            return array.shape[:-len(shape)]
        else:
            return array.shape

    # Get the common "loop" dimensions from all input arrays
    inputs = {name: np.atleast_1d(value) for name, value in inputs.items()}
    name, array = next(iter(inputs.items()))
    loop_shape = _shorten_shape(name, array)
    for name, array in inputs.items():
        array_shape = _shorten_shape(name, array)
        loop_shape = _common_shape(loop_shape, array_shape)
        if not loop_shape:
            break
    N = np.prod(loop_shape)
    common_dim_cnt = len(loop_shape)

    # Flatten and broadcast all inputs to the common shape
    ret_inputs = {}
    for var_id, array in inputs.items():
        if common_dim_cnt > 0:
            broadcast_shape = np.broadcast_shapes(loop_shape, array.shape[:common_dim_cnt])
            broadcast_shape += array.shape[common_dim_cnt:]
            ret_inputs[var_id] = np.broadcast_to(array, broadcast_shape).reshape((N, *array.shape[common_dim_cnt:]))
        else:
            ret_inputs[var_id] = array

    return ret_inputs, loop_shape

format_outputs(outputs, loop_shape)

Reshape all outputs to the common loop shape. Loop shape is as obtained from a call to format_inputs. Assumes that all outputs are the same along the first dimension. This first dimension gets reshaped back into the loop_shape. Singleton outputs are squeezed along the last dimension. A singleton loop shape is squeezed along the first dimension.

Example

outputs = {'x': np.random.rand(10, 1, 5), 'y': np.random.rand(10, 1), 'z': np.random.rand(10, 20, 3)}
loop_shape = (2, 5)
fmt_outputs = format_outputs(outputs, loop_shape)
# Output: {'x': np.ndarray(2, 5, 1, 5), 'y': np.ndarray(2, 5), 'z': np.ndarray(200, 3)}, (2, 5, 20, 3)
PARAMETER DESCRIPTION
outputs

dict of output arrays

TYPE: Dataset

loop_shape

the common leading dimensions to reshape the output arrays to

TYPE: tuple[int, ...]

RETURNS DESCRIPTION
Dataset

the reshaped outputs

Source code in src/amisc/utils.py
def format_outputs(outputs: Dataset, loop_shape: tuple[int, ...]) -> Dataset:
    """Reshape all outputs to the common loop shape. Loop shape is as obtained from a call to `format_inputs`.
    Assumes that all outputs are the same along the first dimension. This first dimension gets reshaped back into
    the `loop_shape`. Singleton outputs are squeezed along the last dimension. A singleton loop shape is squeezed
    along the first dimension.

    !!! Example
        ```python
        outputs = {'x': np.random.rand(10, 1, 5), 'y': np.random.rand(10, 1), 'z': np.random.rand(10, 20, 3)}
        loop_shape = (2, 5)
        fmt_outputs = format_outputs(outputs, loop_shape)
        # Output: {'x': np.ndarray(2, 5, 1, 5), 'y': np.ndarray(2, 5), 'z': np.ndarray(200, 3)}, (2, 5, 20, 3)
        ```

    :param outputs: `dict` of output arrays
    :param loop_shape: the common leading dimensions to reshape the output arrays to
    :returns: the reshaped outputs
    """
    output_dict = {}
    for key, val in outputs.items():
        val = np.atleast_1d(val)
        output_shape = val.shape[1:]  # Assumes (N, ...) output shape to start with
        val = val.reshape(loop_shape + output_shape)
        if output_shape == (1,):
            val = np.atleast_1d(np.squeeze(val, axis=-1))  # Squeeze singleton outputs
        if loop_shape == (1,):
            val = np.atleast_1d(np.squeeze(val, axis=0))  # Squeeze singleton loop dimensions
        output_dict[key] = val
    return output_dict

get_logger(name, stdout=True, log_file=None, level=logging.INFO)

Return a file/stdout logger with the given name.

PARAMETER DESCRIPTION
name

the name of the logger to return

TYPE: str

stdout

whether to add a stdout stream handler to the logger

TYPE: bool DEFAULT: True

log_file

add file logging to this file (optional)

TYPE: str | Path DEFAULT: None

level

the logging level to set

TYPE: int DEFAULT: INFO

RETURNS DESCRIPTION
Logger

the logger

Source code in src/amisc/utils.py
def get_logger(name: str, stdout: bool = True, log_file: str | Path = None,
               level: int = logging.INFO) -> logging.Logger:
    """Return a file/stdout logger with the given name.

    :param name: the name of the logger to return
    :param stdout: whether to add a stdout stream handler to the logger
    :param log_file: add file logging to this file (optional)
    :param level: the logging level to set
    :returns: the logger
    """
    logger = logging.getLogger(name)
    logger.setLevel(level)
    logger.handlers.clear()
    if stdout:
        std_handler = logging.StreamHandler(sys.stdout)
        std_handler.setFormatter(LOG_FORMATTER)
        logger.addHandler(std_handler)
    if log_file is not None:
        f_handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
        f_handler.setLevel(level)
        f_handler.setFormatter(LOG_FORMATTER)
        logger.addHandler(f_handler)

    return logger

parse_function_string(call_string)

Convert a function signature like func(a, b, key=value) to name, args, kwargs.

PARAMETER DESCRIPTION
call_string

a function-like string to parse

TYPE: str

RETURNS DESCRIPTION
tuple[str, list, dict]

the function name, positional arguments, and keyword arguments

Source code in src/amisc/utils.py
def parse_function_string(call_string: str) -> tuple[str, list, dict]:
    """Convert a function signature like `func(a, b, key=value)` to name, args, kwargs.

    :param call_string: a function-like string to parse
    :returns: the function name, positional arguments, and keyword arguments
    """
    # Regex pattern to match function name and arguments
    pattern = r"(\w+)(?:\((.*)\))?"
    match = re.match(pattern, call_string.strip())

    if not match:
        raise ValueError(f"Function string '{call_string}' is not valid.")

    # Extracting name and arguments section
    name = match.group(1)
    args_str = match.group(2)

    # Regex to split arguments respecting parentheses and quotes
    # arg_pattern = re.compile(r'''((?:[^,'"()\[\]{}*]+|'[^']*'|"(?:\\.|[^"\\])*"|\([^)]*\)|\[[^\]]*\]|\{[^{}]*\}|\*)+|,)''')  # noqa: E501
    # pieces = [piece.strip() for piece in arg_pattern.findall(args_str) if piece.strip() != ',']
    pieces = _tokenize(args_str)

    args = []
    kwargs = {}
    keyword_only = False

    for piece in pieces:
        if piece == '/':
            continue
        elif piece == '*':
            keyword_only = True
        elif '=' in piece and (piece.index('=') < piece.find('{') or piece.find('{') == -1):
            key, val = piece.split('=', 1)
            kwargs[key.strip()] = ast.literal_eval(val.strip())
            keyword_only = True
        else:
            if keyword_only:
                raise ValueError("Positional arguments cannot follow keyword arguments.")
            args.append(ast.literal_eval(piece))

    return name, args, kwargs

relative_error(pred, targ, axis=None)

Compute the relative L2 error between two vectors along the given axis.

PARAMETER DESCRIPTION
pred

the predicted values

targ

the target values

axis

the axis along which to compute the error

DEFAULT: None

RETURNS DESCRIPTION

the relative L2 error

Source code in src/amisc/utils.py
def relative_error(pred, targ, axis=None):
    """Compute the relative L2 error between two vectors along the given axis.

    :param pred: the predicted values
    :param targ: the target values
    :param axis: the axis along which to compute the error
    :returns: the relative L2 error
    """
    with np.errstate(divide='ignore', invalid='ignore'):
        err = np.sqrt(np.sum((pred - targ)**2, axis=axis) / np.sum(targ**2, axis=axis))
    return np.nan_to_num(err, nan=np.nan, posinf=np.nan, neginf=np.nan)

search_for_file(filename, search_paths=None)

Search for the given filename in the current working directory and any additional search paths provided.

PARAMETER DESCRIPTION
filename

the filename to search for

TYPE: str | Path

search_paths

paths to try and find the file in

DEFAULT: None

RETURNS DESCRIPTION

the full path to the file if found, otherwise the original filename

Source code in src/amisc/utils.py
def search_for_file(filename: str | Path, search_paths=None):
    """Search for the given filename in the current working directory and any additional search paths provided.

    :param filename: the filename to search for
    :param search_paths: paths to try and find the file in
    :returns: the full path to the file if found, otherwise the original `filename`
    """
    if not isinstance(filename, str | Path):
        return filename

    search_paths = search_paths or []
    search_paths.append('.')

    save_file = Path(filename)
    need_to_search = True
    try:
        need_to_search = ((len(save_file.parts) == 1 and len(save_file.suffix) > 0) or
                          (len(save_file.parts) > 1 and not save_file.exists()))
    except Exception:
        need_to_search = False

    # Search for the save file if it was a valid path and does not exist
    if need_to_search:
        found_file = False
        name = save_file.name
        for path in search_paths:
            if (pth := Path(path) / name).exists():
                filename = pth.resolve().as_posix()
                found_file = True
                break
        if not found_file:
            pass  # Let the caller handle the error (just return the original filename back to caller)
            # raise FileNotFoundError(f"Could not find save file '{filename}' in paths: {search_paths}.")

    return filename

to_model_dataset(dataset, variables, del_latent=True, **field_coords)

Convert surrogate input/output dataset to a form usable by the true model. Primarily, reconstruct field quantities and denormalize.

PARAMETER DESCRIPTION
dataset

the dataset to convert

TYPE: Dataset

variables

the VariableList containing the variable objects used in dataset -- these objects define the normalization and compression methods to use for each variable

TYPE: 'amisc.variable.VariableList'

del_latent

whether to delete the latent variables from the dataset after reconstruction

TYPE: bool DEFAULT: True

field_coords

pass in extra field qty coords as f'{var}_coords' for reconstruction (optional)

DEFAULT: {}

RETURNS DESCRIPTION
tuple[Dataset, Dataset]

the reconstructed/denormalized dataset and any field coordinates used during reconstruction

Source code in src/amisc/utils.py
def to_model_dataset(dataset: Dataset, variables: 'amisc.variable.VariableList', del_latent: bool = True,
                     **field_coords) -> tuple[Dataset, Dataset]:
    """Convert surrogate input/output dataset to a form usable by the true model. Primarily, reconstruct
    field quantities and denormalize.

    :param dataset: the dataset to convert
    :param variables: the `VariableList` containing the variable objects used in `dataset` -- these objects define
                      the normalization and compression methods to use for each variable
    :param del_latent: whether to delete the latent variables from the dataset after reconstruction
    :param field_coords: pass in extra field qty coords as f'{var}_coords' for reconstruction (optional)
    :returns: the reconstructed/denormalized dataset and any field coordinates used during reconstruction
    """
    dataset = copy.deepcopy(dataset)
    _combine_latent_arrays(dataset)

    ret_coords = {}
    for var in variables:
        if var in dataset:
            if var.compression is not None:
                # coords = self.model_kwargs.get(f'{var.name}_coords', None)
                coords = field_coords.get(f'{var}_coords', None)
                field = var.reconstruct({'latent': dataset[var]}, coords=coords)
                if del_latent:
                    del dataset[var]
                coords = field.pop('coords')
                ret_coords[f'{var.name}_coords'] = copy.deepcopy(coords)
                dataset.update(field)
            else:
                dataset[var] = var.denormalize(dataset[var])

    return dataset, ret_coords

to_surrogate_dataset(dataset, variables, del_fields=True, **field_coords)

Convert true model input/output dataset to a form usable by the surrogate. Primarily, compress field quantities and normalize.

PARAMETER DESCRIPTION
dataset

the dataset to convert

TYPE: Dataset

variables

the VariableList containing the variable objects used in dataset -- these objects define the normalization and compression methods to use for each variable

TYPE: 'amisc.variable.VariableList'

del_fields

whether to delete the original field quantities from the dataset after compression

TYPE: bool DEFAULT: True

field_coords

pass in extra field qty coords as f'{var}_coords' for compression (optional)

DEFAULT: {}

RETURNS DESCRIPTION
tuple[Dataset, list[str]]

the compressed/normalized dataset and a list of variable names to pass to surrogate

Source code in src/amisc/utils.py
def to_surrogate_dataset(dataset: Dataset, variables: 'amisc.variable.VariableList', del_fields: bool = True,
                         **field_coords) -> tuple[Dataset, list[str]]:
    """Convert true model input/output dataset to a form usable by the surrogate. Primarily, compress field
    quantities and normalize.

    :param dataset: the dataset to convert
    :param variables: the `VariableList` containing the variable objects used in `dataset` -- these objects define
                      the normalization and compression methods to use for each variable
    :param del_fields: whether to delete the original field quantities from the dataset after compression
    :param field_coords: pass in extra field qty coords as f'{var}_coords' for compression (optional)
    :returns: the compressed/normalized dataset and a list of variable names to pass to surrogate
    """
    surr_vars = []
    dataset = copy.deepcopy(dataset)
    for var in variables:
        # Only grab scalars in the dataset or field qtys if all fields are present
        if var in dataset or (var.compression is not None and all([f in dataset for f in var.compression.fields])):
            if var.compression is not None:
                coords = dataset.get(f'{var}_coords', field_coords.get(f'{var}_coords', None))
                latent = var.compress({field: dataset[field] for field in
                                       var.compression.fields}, coords=coords)['latent']  # all fields must be present
                for i in range(latent.shape[-1]):
                    dataset[f'{var.name}{LATENT_STR_ID}{i}'] = latent[..., i]
                    surr_vars.append(f'{var.name}{LATENT_STR_ID}{i}')
                if del_fields:
                    for field in var.compression.fields:
                        del dataset[field]
                    if dataset.get(f'{var}_coords', None) is not None:
                        del dataset[f'{var}_coords']
            else:
                dataset[var.name] = var.normalize(dataset[var.name])
                surr_vars.append(f'{var.name}')

    return dataset, surr_vars