Source code for matcal.core.parameter_studies

"""
This module contains pure MatCal implementations of parameter studies. 
These do not invoke external algorithm libraries. 
"""
from abc import abstractmethod
from collections import OrderedDict
import numpy as np
from scipy.stats import qmc

from matcal.core.data import MaxAbsDataConditioner, DataCollectionStatistics
from matcal.core.logger import initialize_matcal_logger
from matcal.core.parameter_batch_evaluator import ParameterBatchEvaluator
from matcal.core.study_base import StudyBase
from matcal.core.utilities import (check_value_is_real_between_values, 
                                   check_value_is_positive_integer, 
                                   check_value_is_nonnegative_real, 
                                   check_value_is_array_like_of_reals, 
                                   check_value_is_bool
                                   )


logger = initialize_matcal_logger(__name__)


[docs] class ParameterStudy(StudyBase): """ Use the MatCal :class:`~matcal.core.parameter_studies.ParameterStudy` to run models and evaluate objectives for a user specified set of parameters values. This can be used for brute-force manual calibrations, sensitivity studies when the user prefers to post process the results/chose evaluation parameters and building surrogates for the models using python based surrogate algorithms not directly supported in MatCal. """ study_class = "ParameterStudy" class NoEvaluationsDefinedError(RuntimeError): """""" def __init__(self, *parameters): super().__init__(*parameters) self._parameter_sets_to_evaluate = [] self._num_evaluations = 0 self._return_residuals = True self._batch_results = None @property def _needs_residuals(self): return self._return_residuals
[docs] def add_parameter_evaluation(self, **parameters): """ Add parameter sets to be evaluated to the study. This function can be called as many times as needed to evaluate several different parameter sets. They will be evaluated in the order they are added. All parameters that were passed into the study on initialization must also have a value specified when adding a parameter set to be evaluated with this function. :param parameters: the parameters values to be added as an evaluated parameter set for the study. :type parameters: dict(str, float) :raises ValueError: If all study parameters do not have a value passed to this function when called. """ self._check_all_parameters_provided(parameters) self._num_evaluations += 1 pc = self._parameter_collection for param, value in parameters.items(): check_value_is_real_between_values(value, pc[param].get_lower_bound(), pc[param].get_upper_bound(), param, closed=True) parameters[param] = float(value) self._parameter_sets_to_evaluate.append(OrderedDict(**parameters))
def _check_all_parameters_provided(self, new_param_set): if new_param_set.keys() != self._parameter_collection.keys(): raise ValueError("The following parameters are required:\n{}\n" " The following were provided for a parameter study evaluation:\n{}\n".format( list(self._parameter_collection.keys()), list(new_param_set.keys()))) @property def parameter_sets_to_evaluate(self): return self._parameter_sets_to_evaluate def _run_study(self): self._check_parameter_sets_populated() param_sets = self._parameter_sets_to_evaluate success = True exit_status = 0 err_msg="" try: self._batch_results = self._matcal_evaluate_parameter_sets_batch(param_sets) except Exception as e: success = False exit_status = -1 err_msg = str(repr(e)) logger.error(f"Error evaluating current parameter batch.\n{err_msg}") self._results._set_exit_information(success, exit_status, f"{err_msg}") return self._results def _check_parameter_sets_populated(self): if not self._parameter_sets_to_evaluate: raise RuntimeError("The parameter study has no evaluations defined." " Please use the \"add_parameter_evaluation\" " "method to add parameter sets to evaluate") def _format_parameter_batch_eval_results(self, batch_raw_objectives, flattened_batch_results, total_objs, parameter_sets, batch_qois): return ParameterBatchEvaluator.default_results_formatter(batch_raw_objectives, total_objs, parameter_sets, batch_qois)
[docs] def make_total_objective_study(self): """ This changes the stored total objectives to be a summation of all metric function results. """ self._return_residuals = False
[docs] def make_residuals_study(self): """ This changes the stored total objectives to be the L2 norm of one long concatenated residual from all objectives added using :meth:`~matcal.core.parameter_studies.ParameterStudy.add_evaluation_set` """ self._return_residuals = True
def _format_parameters(self, params): return params
[docs] def restart(self): """ Sets the study to launch in restart mode. The study will use existing results from previous launches to populate the results instead of running the simulations again. Note that this feature requires that no changes to the study to be made in order for the study to produce correct results. Files from previous runs are read in to this study, they should not be deleted. Missing files may cause errors in the restart. If any random number generation is used in the calculation. It is important to set the same seed value as used previously """ self._restart = True
def _study_specific_postprocessing(self): """"""
[docs] class HaltonStudy(ParameterStudy): def __init__(self, *parameters): super().__init__(*parameters) self.l_bounds = [] self.u_bounds = [] for idx, key in enumerate(self._parameter_collection): self.l_bounds.append(self._parameter_collection[key].get_lower_bound()) self.u_bounds.append(self._parameter_collection[key].get_upper_bound()) self._number_parameters = len(self._parameter_collection) self.HaltonSampler = qmc.Halton(d=self._number_parameters) self._seed = None self._scramble=None self._number_of_samples=None self._skip=None self.set_Halton_scramble() self.set_number_of_samples()
[docs] def set_seed(self, seed): """ Set the random seed for the random generator that the study uses to generate the samples. The method should be called **before** :meth:`launch` to guarantee reproducibility. :param seed: Integer seed for the pseudo‑random number generator. :type seed: int """ check_value_is_positive_integer(seed, "seed") self._seed = seed
[docs] def set_Halton_scramble(self, scramble=True): """ :param scramble: set the scramble keyword for the numpy Halton object. :type scramble: bool """ check_value_is_bool(scramble, "scramble") self._scramble=scramble
[docs] def set_number_of_samples(self, nsamples=20, skip=None): """ Set the number of samples for the study. :param nsamples: Number of parameter samples to generate from Halton sequence. :type nsamples: int :param skip: When continuing an existing design, the user may optionally skip ahead in the Halton sequence by an amount determined by 'skip'. :type skip: int """ check_value_is_positive_integer(nsamples, 'nsamples') self._number_of_samples = nsamples if skip is not None: check_value_is_positive_integer(skip, 'skip') self._skip=skip
[docs] def launch(self): self._generate_samples(self._number_of_samples, self._skip) return super().launch()
def _generate_samples(self, nsamples, skip): halton_sampler = qmc.Halton(d=self._number_parameters, scramble=self._scramble, seed=self._seed) if skip is not None: halton_sampler.fast_forward(skip) logger.debug( f"Halton sampler initialized with seed = {self._seed}, scramble = " + f"{self._scramble}, and "+ f"skip = {skip}." ) unscaled_samples = halton_sampler.random(n=nsamples) scaled_samples = self._scale_samples_to_bounds(unscaled_samples) self._populate_parameter_evaluations(scaled_samples) def _populate_parameter_evaluations(self, scaled_samples): param_order = self._parameter_collection.get_item_names() self._parameter_sets_to_evaluate = [] for sample in scaled_samples: ss = { key:sample[i] for i, key in enumerate(param_order) } self._add_parameter_evaluation(**ss) self._check_parameter_sets_populated() def _scale_samples_to_bounds(self, samples): """ Scale samples to be within defined bounds. :param samples: samples to be scaled :type samples: np.ndarray of size (nsamples x nfeatures) :return: scaled samples :type return: np.ndarray of size (nsamples x nfeatures) """ return qmc.scale(samples, self.l_bounds, self.u_bounds) def _add_parameter_evaluation(self, **p): super().add_parameter_evaluation(**p) def add_parameter_evaluation(self, **parameters): """""" raise RuntimeError(f"Users cannot add parameter evaluations to a "+ f"{self.__class__.__name__}.")
class FiniteDifference: def __init__(self, center_point, relative_step_size=1.e-3, epsilon=np.sqrt(np.finfo(float).eps)): self._center_point = np.array(center_point, dtype=float) self._number_of_variables = len(self._center_point) self._relative_step_size = relative_step_size self._step_sizes = [] for x in self._center_point: dx = np.abs(x)*relative_step_size if dx < epsilon: dx = epsilon self._step_sizes.append(dx) self._finite_difference_evaluation_points = None self._gradient_coefficients = None self._gradient_indices = None self._function_values = None self._hessian_coefficients = None self._hessian_indices = None def set_function_values(self,ys): self._function_values = ys ndim = np.squeeze(ys).ndim self._function_shape = None if ndim > 1: self._function_shape = ys[0].shape def gradient(self): shape = [self._number_of_variables] if self._function_shape is not None: shape.extend(self._function_shape) G = np.zeros(shape) for i,c in enumerate(self._gradient_coefficients): for j,ii in enumerate(self._gradient_indices[i]): G[i] += c[j]*self._function_values[ii] return G def hessian(self): shape = [self._number_of_variables,self._number_of_variables] if self._function_shape is not None: shape.extend(self._function_shape) H = np.zeros(shape) k = 0 for i in range(self._number_of_variables): for l,m in enumerate(self._hessian_indices[k]): H[i,i] += self._hessian_coefficients[k][l]*self._function_values[m] k += 1 for j in range(i+1,self._number_of_variables): for l,m in enumerate(self._hessian_indices[k]): H[i,j] += self._hessian_coefficients[k][l]*self._function_values[m] k += 1 H[j,i] = H[i,j] return H def compute_gradient_evaluation_points(self, three_point_finite_diff=True): self._gradient_coefficients = [] self._gradient_indices = [] self._finite_difference_evaluation_points = [self._center_point] for i in range(self._number_of_variables): dx = self._step_sizes[i] new_coeffs = [] new_indexes = [] coef_plus, idx_plus = self._get_gradient_step_point_coefficients_indices(dx, i, three_point_finite_diff) new_coeffs.append(coef_plus) new_indexes.append(idx_plus) if three_point_finite_diff: coef_minus, idx_minus = self._get_gradient_step_point_coefficients_indices(-dx, i, three_point_finite_diff) new_coeffs.append(coef_minus) new_indexes.append(idx_minus) else: new_coeffs.append(-1/dx) new_indexes.append(0) self._gradient_coefficients.append(new_coeffs) self._gradient_indices.append(new_indexes) return self._finite_difference_evaluation_points def _get_gradient_step_point_coefficients_indices(self, dx, i, three_point_finite_diff): x= self._center_point.copy() x[i] += dx coeff = 1/dx if three_point_finite_diff: coeff *= 0.5 self._finite_difference_evaluation_points.append(x) return coeff, len(self._finite_difference_evaluation_points)-1 def compute_hessian_evaluation_points(self): self.compute_gradient_evaluation_points(three_point_finite_diff=True) self._hessian_coefficients = [] self._hessian_indices = [] for i in range(self._number_of_variables): dxi = self._step_sizes[i] self._get_hessian_diagonal_term_step_point_coefficients_indices(dxi, i) for j in range(i+1,self._number_of_variables): dxj = self._step_sizes[j] coefs, idxs = self._get_all_hessian_cross_terms(dxi, dxj, i, j) self._hessian_coefficients.append(coefs) self._hessian_indices.append(idxs) return self._finite_difference_evaluation_points def _get_all_hessian_cross_terms(self, dxi, dxj, i, j): coefs = [] idxs = [] coef, idx = self._get_hessian_cross_term_step_point_coefficients_indices(-dxi, -dxj, i, j) coefs.append(coef) idxs.append(idx) coef, idx = self._get_hessian_cross_term_step_point_coefficients_indices(dxi, -dxj, i, j) coefs.append(coef) idxs.append(idx) coef, idx = self._get_hessian_cross_term_step_point_coefficients_indices(-dxi, dxj, i, j) coefs.append(coef) idxs.append(idx) coef, idx = self._get_hessian_cross_term_step_point_coefficients_indices(dxi, dxj, i, j) coefs.append(coef) idxs.append(idx) return coefs, idxs def _get_hessian_diagonal_term_step_point_coefficients_indices(self, dxi, i): inv_eps2 = 1.0/(dxi*dxi) self._hessian_coefficients.append([-2.0*inv_eps2, inv_eps2, inv_eps2]) ii = self._gradient_indices[i] self._hessian_indices.append([0, ii[0], ii[1]]) def _get_hessian_cross_term_step_point_coefficients_indices(self, dxi, dxj, i, j): inv_eps2 = 0.25/(dxi*dxj) x = self._center_point.copy() x[i] += dxi x[j] += dxj self._finite_difference_evaluation_points.append(x) return inv_eps2, len(self._finite_difference_evaluation_points)-1 _small = 1e-12 def _estimate_parameter_covariance(residuals, sensitivities, noise_variance): has_replicas = len(residuals.shape) > 1 if has_replicas: Sigma_y = _get_residual_covariance(residuals) Sigma_guess = _solve_for_parameter_covariance(Sigma_y, sensitivities, noise_variance) elif not has_replicas: raise RuntimeError("The LaplaceStudy has no repeats. Repeat data " "are needed for the study.") return Sigma_guess def _get_residual_covariance(residuals): Sigma_y = np.cov(residuals.T) return np.atleast_2d(Sigma_y) def _solve_for_parameter_covariance( output_covariance, residual_sensitivities, noise_variance=0.0): mineval, maxeval = _check_covariance(output_covariance) if np.abs(mineval) < _small: logger.warning("Residual " "covariance is not positive definite!") n_y = output_covariance.shape[0] n_p = residual_sensitivities.shape[1] output_covariance -= np.eye(n_y)*noise_variance [U,d,V] = np.linalg.svd(output_covariance) UTA = U[:,:n_p].T@residual_sensitivities invUTA = np.linalg.inv(UTA) s = np.diag(d[:n_p]) if (d[:n_p-1] < _small).any(): raise ValueError("LaplaceStudy under determined. " "System may be under determined.") S = invUTA@s@(invUTA.T) return S def _check_covariance(Sigma): try: evals = np.linalg.eigvalsh(Sigma) min_eval = np.min(evals) max_eval = np.max(evals) except Exception as e: logger.warning("Residual covariance eigenvalues could not be computed. " "The LaplaceStudy results are likely to be unreliable.\n" f"{repr(e)}") min_eval = 0 max_eval = 0 return min_eval, max_eval class _MinimizeCallbackWithCounter: def __init__(self, num_parameters): self._iteration = 0 self._num_parameters = num_parameters def __call__(self, intermediate_result): self._iteration += 1 if self._iteration % 20 == 0: r = intermediate_result cur_sig = _process_fitted_covariance_result(r.x, self._num_parameters) cur_sig_str = "\n\t\t".join([str(repr(row)) for row in cur_sig]) logger.info(f"\tCurrent covariance:\n\t\t{cur_sig_str}") logger.info(f"\tCurrent LaplaceStudy objective:\t{r.fun}") logger.info(f"\tCurrent iteration:\t{self._iteration}\n") def _fit_posterior(residuals, residual_sensitivities, sigma_estimate, noise_variance, method='nelder-mead'): nparameters = residual_sensitivities.shape[1] init_variances, init_correlation_coefficients = _decompose_covariance(sigma_estimate) init_theta = _to_theta(init_variances, init_correlation_coefficients) init_theta = [x for x in init_theta] from scipy.optimize import minimize args = (residuals, residual_sensitivities, noise_variance) logger.info("Improving posterior covariance estimate:") callback = _MinimizeCallbackWithCounter(nparameters) try: result = minimize(_fitted_posterior_objective, init_theta, args=args, method=method, tol=1e-3, callback=callback) except np.linalg.LinAlgError as e: logger.warning("Improving covariance failed. Try a different noise estimate. " + f"Improvement optimization failed due to the:\n \'{repr(e)}\'") return None theta = result.x optimized_sigma = _process_fitted_covariance_result(theta, nparameters) return optimized_sigma def _fitted_posterior_objective(theta, residuals, residual_sensitivities, noise_estimate): obj = -_log_posterior_predictive(theta, residual_sensitivities, residuals, noise_estimate) return obj def _decompose_covariance(Sigma): n = Sigma.shape[0] variances = np.diag(Sigma) s = np.diag(1.0/np.sqrt(variances)) sigma = s@Sigma@s correlation_coefficients = [] for i in range(n): for j in range(i+1,n): correlation_coefficients.append(sigma[i,j]) correlation_coefficients = np.array(correlation_coefficients) return variances,correlation_coefficients def _process_fitted_covariance_result(theta, nparameters): variances = theta[:nparameters] correlation_coefficients = theta[nparameters:] optimized_sigma = _assemble_covariance_matrix(np.exp(variances), np.tanh(correlation_coefficients)) return optimized_sigma def _to_theta(variances, correlation_coefficients, clip=True): v = np.log(variances) if clip: tol = 1.e-8 correlation_coefficients = np.clip(correlation_coefficients,-1.0+tol,1.0-tol) else: assert np.all(np.abs(correlation_coefficients) <= 1.0) c = np.arctanh(correlation_coefficients) theta = np.concatenate([v,c]) return theta def _log_posterior_predictive(theta, residual_sensitivities, residuals, noise): noise2 = noise*noise if noise2 == 0.0: noise2 = _small variances, correlation_coefficients = _from_theta(theta, residual_sensitivities.shape[1]) Sigma_y = _pushed_forward_variances(variances,correlation_coefficients,residual_sensitivities) Sigma_y = Sigma_y + noise2*np.eye(Sigma_y.shape[0]) sign, logdet = np.linalg.slogdet(Sigma_y) logdetSigma_y = sign*logdet invSigma = np.linalg.solve(Sigma_y, np.eye(Sigma_y.shape[0])) mse = np.einsum("ki,ij,kj",residuals,invSigma,residuals) n_repeats = residuals.shape[1] logp = -0.5*( logdetSigma_y + mse/n_repeats) return logp def _from_theta(theta, nparameters, clip=True): variances = np.exp (theta[:nparameters]) correlation_coefficients = np.tanh(theta[nparameters:]) if clip: tol = 1.e-8 correlation_coefficients = np.clip(correlation_coefficients,-1.0+tol,1.0-tol) return variances,correlation_coefficients def _pushed_forward_variances(variances, correlation_coefficients, parameter_sensitivities): A = parameter_sensitivities Sigma = _assemble_covariance_matrix(variances,correlation_coefficients) ASAT = A@Sigma@A.T return ASAT def _assemble_covariance_matrix(variances, correlation_coefficients): n = len(variances); Vars = np.diag(variances); Cors = np.eye(n); indx = 0 for i in range(n): for j in range(i+1,n): Cors[i,j] = correlation_coefficients[indx] Cors[j,i] = correlation_coefficients[indx] indx += 1 Sigma = np.sqrt(Vars)@Cors@np.sqrt(Vars) return Sigma class _LaplaceStudyBase(ParameterStudy): def __init__(self, *parameters): super().__init__(*parameters) self._center = None self._finite_difference = None self._step_size = None self.set_step_size() def _check_parameter_sets_populated(self): if not self._parameter_sets_to_evaluate: raise RuntimeError("The LaplaceStudy has no parameter center defined. " "Please use the \"set_parameter_center\" method before launching the study.") def set_parameter_center(self, **parameters): """ Pass an unpacked dictionary of parameters with valid values to set the center about which to calculate the Hessian for the study objectives. These parameters must be valid for the study parameters and all study parameters must be included. The values must be determined from a calibration and must be located at an objective minimum. :param parameters: keyword/value pair of parameter names and values for the location about which to calculate the Hessian """ self._check_all_parameters_provided(parameters) param_order = self._parameter_collection.get_item_names() ordered_center = OrderedDict() for param in param_order: ordered_center[param] = parameters[param] self._center = ordered_center center = [ self._center[key] for key in param_order ] self.mean = np.array(center) self._setup_finite_difference() def _setup_finite_difference(self): self._parameter_sets_to_evaluate = [] self._finite_difference = FiniteDifference(self.mean, relative_step_size=self._step_size) finite_difference_points = self._get_finite_difference_evaluation_points() param_order = self._parameter_collection.get_item_names() for pt in finite_difference_points: p = { key:pt[i] for i,key in enumerate(param_order) } self._add_parameter_evaluation(**p) def set_step_size(self, step_size=1e-3): """ Sets the finite difference step sizes for the LaplaceStudy hessian and gradient approximations. This is a relative step size. Default step size is a relative step of 1e-3. The value must be between zero and one. :param step_size: the desired step_size :type step_size: float """ check_value_is_real_between_values(step_size, 0, 1, "step_size") self._step_size=step_size if self._finite_difference is not None: self._setup_finite_difference() def _add_parameter_evaluation(self, **p): super().add_parameter_evaluation(**p) def add_parameter_evaluation(self, **parameters): """""" raise self.StudyInputError("Users cannot add parameter evaluations to a LaplaceStudy.") def _get_center_eval_index(self): return 0 def _gradient(self): G = self._finite_difference.gradient() return G def _get_raw_residuals(self, model_name, obj_name, eval_index): batch_objectives = self._batch_results['objectives'] return batch_objectives[eval_index][model_name][obj_name].residuals def _get_normalized_weighted_conditioned_residuals(self, model_name, obj_name, eval_index, flatten=False): batch_objectives = self._batch_results['objectives'] eval_model_obj_res = batch_objectives[eval_index][model_name][obj_name] result = eval_model_obj_res.weighted_conditioned_normalized_residuals if flatten: result = eval_model_obj_res.flatten_data_collection(result) return result def _log_total_sensitivity_information(self): logger.info("\n") logger.info("Parameter center:") logger.info(str(repr(self.mean))) logger.info("\n") logger.info("Estimated parameter covariance:") init_sigma = (self._results.outcome["estimated_parameter_covariance"]) logger.info(str(repr(init_sigma))) logger.info("\n") def _get_parameter_specific_results(self, gradient_key): results = OrderedDict() results[gradient_key] = self._gradient() results["mean"] = self.mean results = _package_parameter_specific_results(self._parameter_collection, results) return results @abstractmethod def _get_finite_difference_evaluation_points(self): """""" @abstractmethod def _get_overall_results(self): """"""
[docs] class LaplaceStudy(_LaplaceStudyBase): """ Use the MatCal :class:`~matcal.core.parameter_studies.LaplaceStudy` to evaluate the gradient of the calibration residuals at an optimal point in parameter space. The residual gradient can then be used to form a modified Laplace approximation to estimate the parameter covariance matrix for use in uncertainty quantification. We perform this assuming uncertainty is due to model form error. """ study_class = "LaplaceStudy" _laplace_results_key = "laplace results" def __init__(self, *parameters): super().__init__(*parameters) self._calibrate_covariance = True self.set_noise_estimate() def _get_finite_difference_evaluation_points(self): return self._finite_difference.compute_gradient_evaluation_points()
[docs] def add_evaluation_set(self, model, objectives, data=None, states=None, data_conditioner_class=MaxAbsDataConditioner): super().add_evaluation_set(model, objectives, data, states, data_conditioner_class) for eval_set in self._evaluation_sets.values(): for obj_set in eval_set.objective_sets: for obj_name in obj_set.objectives: obj = obj_set.objectives[obj_name] more_than_one_qoi = self._check_obj_qois_for_more_than_one_qoi(obj_set, obj_name) objs_invalid = more_than_one_qoi and not obj.has_independent_field() self._raise_error_if_objs_invalid(objs_invalid)
[docs] def set_noise_estimate(self, noise_estimate=0.0): """ Set the estimate for the noise in the data. Currently only a single value is accepted for all data. This is the expected standard deviation of the noise. :param noise_estimate: value for the noise estimate, must be non-negative :type noise_estimate: float """ check_value_is_nonnegative_real(noise_estimate, "noise_estimate") self._noise_variance=noise_estimate**2
[docs] def set_calibrate_covariance(self, calibrate_covaraince=True): """ By default, the laplace study will attempt to improve the covariance through a calibraiton. Optionally, turn this off or back on. :param calibrate_covariance: flag to turn the covariance calibration process off or on :type calibrate_covariance: bool """ self._calibrate_covariance = calibrate_covaraince
[docs] def update_laplace_estimate(self, noise_estimate): """Update the laplace study covariance estimate after with an updated noise estimate.""" if self._results is None: raise RuntimeError("Study has not been run yet. Use the \'launch\' method " + "for the first study run. ") self.set_noise_estimate(noise_estimate) self._study_specific_postprocessing() return self._results
def _log_total_sensitivity_information(self): super()._log_total_sensitivity_information() if self._calibrate_covariance: logger.info("Calibrated parameter covariance:") fit_sigma = (self._results.outcome["fitted_parameter_covariance"]) logger.info(str(repr(fit_sigma))) logger.info("\n") def _check_obj_qois_for_more_than_one_qoi(self, obj_set, obj_name): more_than_one_qoi = False conditioned_exp_qois = obj_set.conditioned_experiment_qoi_collection[obj_name] for state in conditioned_exp_qois: for data in conditioned_exp_qois[state]: if data.length > 1: more_than_one_qoi = True return more_than_one_qoi def _raise_error_if_objs_invalid(self, objs_invalid): if objs_invalid: raise ValueError(f"The {LaplaceStudy.study_class}Study" " only accepts residuals/objectives of length 1 or" " objectives with independent fields variables" " so that repeat data can be compared at common" " independent variable locations.") def _study_specific_postprocessing(self): total_eval_residual_vecs = self._extract_residual_information_for_processing() center_resids = total_eval_residual_vecs[self._get_center_eval_index()].T residual_gradients = self._calculate_residual_sensitivities(total_eval_residual_vecs) covariance_estimates = self._calculate_covariance(center_resids, residual_gradients) output = self._get_parameter_specific_results("residuals_gradient") output.update(self._get_overall_results(covariance_estimates)) self._results._set_outcome(output) self._log_total_sensitivity_information() def _get_overall_results(self, covariance_estimates): results = OrderedDict() results["parameter_order"] = self._parameter_collection.get_item_names() results.update(covariance_estimates) return results def _calculate_residual_sensitivities(self, total_eval_residuals): self._finite_difference.set_function_values(total_eval_residuals) residual_sensitivities = np.atleast_3d(self._gradient()) # grab first one from repeats, this is the most populated and since this is # the derivative of the residuals where the third index is the repeat #, # the first one is good enough for derivative of the model w.r.t. the parameters. residual_sensitivities = residual_sensitivities.T[0, :, :] return residual_sensitivities def _calculate_covariance(self, center_resids, residual_sensitivities): estimated_covariance = _estimate_parameter_covariance(center_resids, residual_sensitivities, self._noise_variance) covariance_results = OrderedDict() covariance_results["estimated_parameter_covariance"] = estimated_covariance if self._calibrate_covariance: fitted_posterior = _fit_posterior(center_resids, residual_sensitivities, estimated_covariance, self._noise_variance) covariance_results["fitted_parameter_covariance"] = fitted_posterior else: logger.info("Skipping covariance calibration by user request.\n") return covariance_results def _extract_residual_information_for_processing(self): residual_matrices=[] for eval_index in range(self._results.number_of_evaluations): eval_sub_residual_matrices = [] for model, eval_set in self._evaluation_sets.items(): for obj_set in eval_set.objective_sets: for obj_name in obj_set.objectives: resids_dc = self._get_raw_residuals(model.name, obj_name, eval_index) if obj_set.objectives[obj_name].has_independent_field(): indep_field = obj_set.objectives[obj_name].independent_field exp_dc = obj_set.data_collection new_resids = self._get_interpolated_responses(resids_dc, exp_dc, indep_field) else: new_resids = self._get_single_response_set(resids_dc) eval_sub_residual_matrices += new_resids _combine_array_method = _combine_array_list_into_zero_padded_single_array total_residual_matrix = _combine_array_method(eval_sub_residual_matrices) residual_matrices.append(np.atleast_2d(total_residual_matrix)) return residual_matrices def _get_interpolated_responses(self, residuals_dc, exp_dc, indep_field): data_stats = DataCollectionStatistics() combined_interpolated_residuals = [] for state in residuals_dc: register_data_method = data_stats._interpolate_state_data_to_common_independent_variable interpolated_resids = register_data_method(indep_field, residuals_dc, state, exp_dc) interpolated_resids.pop(indep_field) for field in interpolated_resids: combined_interpolated_residuals.append(np.atleast_2d(interpolated_resids[field]).T) return combined_interpolated_residuals def _get_single_response_set(self, response_dc): combined_responses = [] for state in response_dc: for field in response_dc.state_common_field_names(state): combined_resids_current_field = [] for data in response_dc[state]: if data.length > 1: raise RuntimeError(f"Error in {LaplaceStudy.study_class}Study." " Contact MatCal support") combined_resids_current_field.append(data[field][0]) combined_responses.append(np.atleast_2d(combined_resids_current_field)) return combined_responses
[docs] def sample_multivariate_normal(nsamples, mean, covariance_matrix, seed=None, param_names=None): """ Sample the multivariate normal distributions for the study parameters using the mean and covariance matrix provided by a LaplaceStudy or other UQ method. :param nsamples: the number of samples to return from the distribution :type nsamples: int :param mean: the mean value for the parameters. This would be the calibrated value for most MatCal studies. :type mean: Array-like :param covariance_matrix: parameter covariance matrix from which to generate samples from. :type covariance_matrix: Array-like :param seed: an optional seed for the random number generator performing the sampling :type seed: int :param param_names: optionally provide a list with the parameter names in the correct order. so that the resulting samples will be returned in a dictionary format where each parameter name key will have a list of parameter values associated with it with length nsamples. :type param_names: list(str) :return: a dictionary for the generated samples where the keys are the parameter names (if provided) and the values are arrays storing the sampled values. If parameter names are not provided a name is generated of the form "parameter_#". :rtype: dict(str, list(float)) """ check_value_is_positive_integer(nsamples, "nsamples") check_value_is_array_like_of_reals(mean, "mean") check_value_is_array_like_of_reals(covariance_matrix, "covariance_matrix") if seed is not None: check_value_is_positive_integer(seed, "seed") _check_sample_covariance_mat_inputs(mean, covariance_matrix, param_names) samples = _get_multivariate_normal_samples(mean, covariance_matrix, nsamples, seed) samples_dict = _create_samples_dict_from_samples_array(samples, param_names) return samples_dict
def _check_sample_covariance_mat_inputs(mean, covariance_matrix, param_names): if (len(mean)!=covariance_matrix.shape[0] or covariance_matrix.shape[0]!=covariance_matrix.shape[1]): raise ValueError("The mean and covariance matrix passed to \"sample_covariance_matrix\" " "are invalid. Their sizes must match appropriately.") if param_names is not None and len(param_names) != covariance_matrix.shape[0]: raise ValueError("The length of the parameter names list must equal the number of " "rows and columns in the provided covariance matrix.") def _get_multivariate_normal_samples(mean, sigma, nsamples, seed): try: # modern python if seed is not None: rng = np.random.default_rng(seed=seed) samples = rng.multivariate_normal(mean, sigma, nsamples).T except: # old python e.g. 3.7 if seed is not None: np.random.seed(seed) samples = np.random.multivariate_normal(mean, sigma, nsamples).T return samples def _create_samples_dict_from_samples_array( samples, param_names=None): samples_dict = OrderedDict() for param_index, value in enumerate(samples[:, 0]): if param_names is not None: parameter_name = param_names[param_index] else: parameter_name = f"parameter_{param_index}" samples_dict[parameter_name] = samples[param_index, :] return samples_dict
[docs] class ClassicLaplaceStudy(_LaplaceStudyBase): """ Use the MatCal :class:`~matcal.core.parameter_studies.ClassicLaplaceStudy` to evaluate the Hessian (and gradient) at an optimal point in parameter space. The Hessian can then be used to form the Laplace approximation to the parameter covariance matrix for use in uncertainty quantification. We perform this assuming uncertainty is due to noise in the data alone for the classical approach the Laplace Approximation. """ study_class = "ClassicLaplaceStudy" _laplace_results_key = "laplace results" def _get_finite_difference_evaluation_points(self): return self._finite_difference.compute_hessian_evaluation_points() def _study_specific_postprocessing(self): results = self._extract_objective_information_for_processing() total_SSE_objectives, total_residual_vecs = results self._finite_difference.set_function_values(total_SSE_objectives) output = self._get_parameter_specific_results("objective_gradient") output.update(self._get_overall_results(total_residual_vecs)) self._results._set_outcome(output) self._log_total_sensitivity_information() def _get_overall_results(self, total_residual_vecs): results = OrderedDict() results["hessian"] = self._hessian() results["parameter_order"] = self._parameter_collection.get_item_names() total_noise_estimate = np.std(total_residual_vecs[self._get_center_eval_index()]) results["standard_deviation"] = total_noise_estimate param_covariance = _get_total_scaled_covariance(self._inverse_hessian(), total_noise_estimate) results["estimated_parameter_covariance"] = param_covariance return results def _extract_objective_information_for_processing(self): SSE_objectives=[] flattened_resids = [] for eval_index in range(self._results.number_of_evaluations): eval_flattened_residuals = np.array([]) for model, eval_set in self._evaluation_sets.items(): for obj_set in eval_set.objective_sets: for obj_name in obj_set.objectives: get_resid_method = self._get_normalized_weighted_conditioned_residuals this_flattened_resids = get_resid_method(model.name, obj_name, eval_index, flatten=True) eval_flattened_residuals = np.append(eval_flattened_residuals, np.atleast_1d(this_flattened_resids)) flattened_resids.append(eval_flattened_residuals) objective = self._get_sum_of_squares_objective(eval_flattened_residuals) SSE_objectives.append(objective) return SSE_objectives, flattened_resids def _get_sum_of_squares_objective(self, residuals): return np.dot(residuals, residuals) def _hessian(self): H = self._finite_difference.hessian() return H def _inverse_hessian(self): H = self._finite_difference.hessian() try: C = np.linalg.inv(H) except np.linalg.LinAlgError: logger.warning("Could not invert the hessian for this LaplaceStudy." " Error estimation due to external noise is invalid.") C = np.ones(H.shape) return C
def _get_total_scaled_covariance(inverse_hessian, std_dev_estimate): cov = inverse_hessian scale = 2*std_dev_estimate*std_dev_estimate return scale*cov def _package_parameter_specific_results(param_collect, sens_info): out = OrderedDict() for sens_key, sens_val in sens_info.items(): for param_i, param_key in enumerate(param_collect.keys()): if isinstance(sens_val, (list, np.ndarray)): out_name = f"{sens_key}:{param_key}" out[out_name] = sens_val[param_i] return out def _combine_array_list_into_zero_padded_single_array(arrays): max_shape = [0,0] num_resids = 0 for array in arrays: current_shape = array.shape max_shape[0] = np.max((max_shape[0], current_shape[0])) max_shape[1] = np.max((max_shape[1], current_shape[1])) num_resids += current_shape[0] combined_array = np.zeros((num_resids, max_shape[1])) current_eval_set_row = 0 from copy import deepcopy for array in arrays: start_row = current_eval_set_row end_row = current_eval_set_row+array.shape[0] end_col = array.shape[1] combined_array[start_row:end_row, 0:end_col] = deepcopy(array) current_eval_set_row = end_row return combined_array