Coverage for src/uqtils/mcmc.py: 82%
153 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 03:45 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-05 03:45 +0000
1"""Module for Markov-Chain Monte Carlo routines.
3Includes:
5- `normal_pdf` - vectorized Gaussian pdf evaluation
6- `normal_sample` - vectorized Gaussian sampling
7- `is_positive_definite` - whether a matrix is positive semi-definite
8- `nearest_positive_definite` - finds the nearest PSD matrix
9- `dram` - Delayed rejection adaptive Metropolis-Hastings MCMC
10- `autocorrelation` - computes the autocorrelation of a set of samples
11"""
12import warnings
14import h5py
15import numpy as np
16import tqdm
18from .uq_types import Array
20__all__ = ['normal_pdf', 'normal_sample', 'is_positive_definite', 'nearest_positive_definite', 'dram',
21 'autocorrelation']
24def normal_sample(mean: Array, cov: Array, size: tuple | int = (), sqrt=False) -> np.ndarray:
25 """Generic batch sample multivariate normal distributions (pretty much however you want).
27 !!! Note
28 The provided `mean` and `cov` should match along the last dimension, that is the dimension of the random
29 variables to sample. If you want to sample a 1d Gaussian, then you can specify both the mean and covariance
30 as scalars. However, as long as the mean and covariance are broadcastable in size, then you can use this
31 function however you want, (i.e. sample many multivariate distributions at once, all with different means
32 and covariances, etc., just get creative)
34 :param mean: `(..., dim)`, expected values, where `dim` is the random variable dimension
35 :param cov: `(..., dim, dim)`, covariance matrices (or the sqrt(cov) if `sqrt=True`)
36 :param size: shape of additional samples
37 :param sqrt: whether `cov` was passed in already as the `sqrt(cov)` via cholesky decomposition
38 :returns samples: `(*size, ..., dim)`, samples from multivariate distributions
39 """
40 mean = np.atleast_1d(mean)
41 cov = np.atleast_2d(cov)
42 sqrt_cov = cov if sqrt else np.linalg.cholesky(cov)
44 if isinstance(size, int):
45 size = (size, )
46 shape = size + np.broadcast_shapes(mean.shape, cov.shape[:-1])
47 x_normal = np.random.standard_normal((*shape, 1)).astype(mean.dtype)
48 samples = np.squeeze(sqrt_cov @ x_normal, axis=-1) + mean
49 return samples
52def normal_pdf(x: Array, mean: Array, cov: Array, logpdf: bool = False) -> np.ndarray:
53 """Compute the Gaussian pdf at each `x` location (pretty much however you want).
55 :param x: `(..., dim)`, the locations to evaluate the pdf at
56 :param mean: `(..., dim)`, expected values, where dim is the random variable dimension
57 :param cov: `(..., dim, dim)`, covariance matrices
58 :param logpdf: whether to return the logpdf instead
59 :returns: `(...,)` the pdf values at `x`
60 """
61 x = np.atleast_1d(x)
62 mean = np.atleast_1d(mean)
63 cov = np.atleast_2d(cov)
64 dim = cov.shape[-1]
66 preexp = 1 / ((2*np.pi)**(dim/2) * np.linalg.det(cov)**(1/2))
67 diff = x - mean
68 diff_col = np.expand_dims(diff, axis=-1) # (..., dim, 1)
69 diff_row = np.expand_dims(diff, axis=-2) # (..., 1, dim)
70 inexp = np.squeeze(diff_row @ np.linalg.inv(cov) @ diff_col, axis=(-1, -2))
72 pdf = np.log(preexp) - 0.5 * inexp if logpdf else preexp * np.exp(-0.5 * inexp)
74 return pdf
77def is_positive_definite(A):
78 """Returns true when input is positive-definite, via Cholesky."""
79 try:
80 _ = np.linalg.cholesky(A)
81 return True
82 except np.linalg.LinAlgError:
83 return False
86def nearest_positive_definite(A):
87 """Find the nearest positive-definite matrix to input.
89 A Python port of John D'Errico's `nearestSPD` MATLAB code [1], which credits [2].
90 [1] https://www.mathworks.com/matlabcentral/fileexchange/42885-nearestspd
91 [2] N.J. Higham, "Computing a nearest symmetric positive semidefinite matrix", 1988
92 """
93 B = (A + A.T) / 2
94 _, s, V = np.linalg.svd(B)
95 H = np.dot(V.T, np.dot(np.diag(s), V))
96 A2 = (B + H) / 2
97 A3 = (A2 + A2.T) / 2
98 if is_positive_definite(A3):
99 return A3
100 spacing = np.spacing(np.linalg.norm(A))
101 # The above is different from [1]. It appears that MATLAB's `chol` Cholesky
102 # decomposition will accept matrixes with exactly 0-eigenvalue, whereas
103 # Numpy's will not. So where [1] uses `eps(mineig)` (where `eps` is Matlab
104 # for `np.spacing`), we use the above definition. CAVEAT: our `spacing`
105 # will be much larger than [1]'s `eps(mineig)`, since `mineig` is usually on
106 # the order of 1e-16, and `eps(1e-16)` is on the order of 1e-34, whereas
107 # `spacing` will, for Gaussian random matrixes of small dimension, be on
108 # othe order of 1e-16. In practice, both ways converge, as the unit test
109 # below suggests.
110 eye = np.eye(A.shape[0])
111 k = 1
112 while not is_positive_definite(A3):
113 mineig = np.min(np.real(np.linalg.eigvals(A3)))
114 A3 += eye * (-mineig * k ** 2 + spacing)
115 k += 1
117 return A3
120def dram(logpdf, x0, niter, cov0=None, gamma=0.5, eps=1e-6, adapt_after=100, adapt_interval=10,
121 delayed=True, progress=True, filename=None):
122 """Delayed adaptive metropolis-hastings MCMC with a Gaussian proposal.
124 :param logpdf: log PDF function of target distribution
125 :param x0: `(nwalkers, ndim)` initial parameter samples, ignored if samples exist in `filename`
126 :param cov0: `(ndim, ndim)` the initial proposal covariance, defaults to identity or `cov` value in filename
127 :param niter: number of iterations
128 :param gamma: scale factor for the covariance matrix for delayed rejection step
129 :param eps: small constant for making sure covariance is well-conditioned
130 :param adapt_after: the number of iterations before covariance adaptation begins (ignored if <=0)
131 :param adapt_interval: the number of iterations between each covariance adaptation (ignored if `adapt_after<=0`)
132 :param delayed: whether to try to sample again after first rejection
133 :param progress: whether to display progress of the sampler
134 :param filename: if specified, an hdf5 file to save results to. If the file already has dram results, the new
135 samples will be appended. Follows the same format as the `emcee` library
136 :returns: `samples, log_pdf, acceptance` - `(niter, nwalkers, ndim)` samples of the target distribution, the logpdf
137 values at these locations, and the cumulative number of accepted samples per walker
138 """
139 # Override x0, cov0 if filename already has samples
140 try:
141 if filename is not None:
142 with h5py.File(filename, 'a') as fd:
143 group = fd.get('mcmc', None)
144 if group is not None:
145 x0 = group['chain'][-1, ...]
146 if cov0 is None:
147 cov0 = np.array(group['cov']) # only override if cov0 is not passed in
148 niter += 1
149 except Exception as e:
150 warnings.warn(str(e))
152 # Initialize
153 x0 = np.atleast_2d(x0)
154 nwalk, ndim = x0.shape
155 cov0 = np.eye(ndim) if cov0 is None else cov0
156 sd = (2.4**2/ndim)
157 curr_cov = np.broadcast_to(cov0, (nwalk, ndim, ndim)).copy().astype(x0.dtype)
158 curr_chol = np.linalg.cholesky(curr_cov)
159 adapt_cov = curr_cov.copy() # adaptive covariance
160 curr_mean = x0
161 curr_loc_logpdf = logpdf(x0)
162 samples = np.empty((niter, nwalk, ndim), dtype=x0.dtype)
163 log_pdf = np.empty((niter, nwalk), dtype=x0.dtype)
164 accepted = np.zeros((nwalk,), dtype=x0.dtype)
165 samples[0, ...] = x0
166 log_pdf[0, ...] = curr_loc_logpdf
168 def accept_first(curr_log, prop_log):
169 with np.errstate(over='ignore'):
170 # Overflow values go to -> infty, so they will always get accepted
171 ret = np.minimum(1.0, np.exp(prop_log - curr_log))
172 return ret
174 # Main sample loop
175 iterable = tqdm.tqdm(range(niter-1)) if progress else range(niter-1)
176 # --8<-- [start:dram]
177 for i in iterable:
178 # Propose sample
179 x1 = samples[i, ...]
180 y1 = normal_sample(x1, curr_chol, sqrt=True) # (nwalkers, ndim)
181 x1_log = curr_loc_logpdf
182 y1_log = logpdf(y1)
184 # Compute first acceptance
185 with np.errstate(invalid='ignore'):
186 a1 = y1_log - x1_log # (nwalkers,)
187 a1_idx = a1 > 0
188 a1_idx |= np.log(np.random.rand(nwalk)) < a1
189 samples[i + 1, a1_idx, :] = y1[a1_idx, :]
190 samples[i + 1, ~a1_idx, :] = x1[~a1_idx, :]
191 curr_loc_logpdf[a1_idx] = y1_log[a1_idx]
192 accepted[a1_idx] += 1
194 # Second level proposal
195 if delayed and np.any(~a1_idx):
196 y2 = normal_sample(x1[~a1_idx, :], curr_chol[~a1_idx, ...] * np.sqrt(gamma), sqrt=True)
197 y2_log = logpdf(y2)
198 with ((np.errstate(divide='ignore', invalid='ignore'))):
199 # If a(y2, y1)=1, then log(1-a(y2,y1)) -> -infty and a2 -> 0
200 frac_1 = y2_log - x1_log[~a1_idx]
201 frac_2 = (normal_pdf(y1[~a1_idx, :], y2, curr_cov[~a1_idx, ...], logpdf=True) -
202 normal_pdf(y1[~a1_idx, :], x1[~a1_idx, :], curr_cov[~a1_idx, ...], logpdf=True))
203 frac_3 = (np.log(1 - accept_first(y2_log, y1_log[~a1_idx])) -
204 np.log(1 - np.minimum(1.0, np.exp(a1[~a1_idx]))))
205 a2 = frac_1 + frac_2 + frac_3
206 a2_idx = a2 > 0
207 a2_idx |= np.log(np.random.rand(a2.shape[0])) < a2
209 sample_a2_idx = np.where(~a1_idx)[0][a2_idx] # Indices that were False the 1st time, then true the 2nd
210 samples[i + 1, sample_a2_idx, :] = y2[a2_idx, :]
211 curr_loc_logpdf[sample_a2_idx] = y2_log[a2_idx]
212 accepted[sample_a2_idx] += 1
214 log_pdf[i+1, ...] = curr_loc_logpdf
216 # Update the sample mean and cov every iteration
217 if adapt_after > 0:
218 k = i + 1
219 last_mean = curr_mean.copy()
220 curr_mean = (1/(k+1)) * samples[k, ...] + (k/(k+1))*last_mean
221 mult = (np.eye(ndim) * eps + k * last_mean[..., np.newaxis] @ last_mean[..., np.newaxis, :] -
222 (k + 1) * curr_mean[..., np.newaxis] @ curr_mean[..., np.newaxis, :] +
223 samples[k, ..., np.newaxis] @ samples[k, ..., np.newaxis, :])
224 adapt_cov = ((k - 1) / k) * adapt_cov + (sd / k) * mult
226 if k > adapt_after and k % adapt_interval == 0:
227 try:
228 curr_chol[:] = np.linalg.cholesky(adapt_cov)
229 curr_cov[:] = adapt_cov[:]
230 except np.linalg.LinAlgError:
231 warnings.warn(f"Non-PSD matrix at k={k}. Ignoring...")
232 # --8<-- [end:dram]
234 try:
235 if filename is not None:
236 with h5py.File(filename, 'a') as fd:
237 group = fd.get('mcmc', None)
238 if group is not None:
239 samples = np.concatenate((group['chain'], samples[1:, ...]), axis=0)
240 log_pdf = np.concatenate((group['log_pdf'], log_pdf[1:, ...]), axis=0)
241 accepted += group['accepted']
242 del group['chain']
243 del group['log_pdf']
244 del group['accepted']
245 del group['cov']
246 fd.create_dataset('mcmc/chain', data=samples)
247 fd.create_dataset('mcmc/log_pdf', data=log_pdf)
248 fd.create_dataset('mcmc/accepted', data=accepted)
249 fd.create_dataset('mcmc/cov', data=curr_cov)
250 except Exception as e:
251 warnings.warn(str(e))
253 return samples, log_pdf, accepted
256def autocorrelation(samples, maxlag=100, step=1):
257 """Compute the auto-correlation of a set of samples.
259 :param samples: `(niter, nwalk, ndim)` samples returned from `dram` or a similar MCMC routine
260 :param maxlag: maximum distance to compute the correlation for
261 :param step: step between distances from 0 to `maxlag` for which to compute the correlations
262 :returns: lags, autos, tau, ess - the lag times, auto-correlations, integrated auto-correlation,
263 and effective sample sizes
264 """
265 niter, nwalk, ndim = samples.shape
266 mean = np.mean(samples, axis=0)
267 var = np.sum((samples - mean[np.newaxis, ...]) ** 2, axis=0)
269 lags = np.arange(0, maxlag, step)
270 autos = np.zeros((len(lags), nwalk, ndim))
271 for zz, lag in enumerate(lags):
272 # compute the covariance between all samples *lag apart*
273 for ii in range(niter - lag):
274 autos[zz, ...] += (samples[ii, ...] - mean) * (samples[ii + lag, ...] - mean)
275 autos[zz, ...] /= var
276 tau = 1 + 2 * np.sum(autos, axis=0) # Integrated auto-correlation
277 ess = niter / tau # Effective sample size
278 return lags, autos, tau, ess