"""
This module contains adaptive surrogates.
"""
from collections import OrderedDict
import copy
import numpy as np
import os
from scipy import stats
from sklearn.metrics import r2_score
from matcal.core.logger import initialize_matcal_logger
from matcal.core.objective import SimulationResultsSynchronizer
from matcal.core.parameter_studies import HaltonStudy
from matcal.core.qoi_extractor import UserDefinedExtractor
from matcal.core.state import State
from matcal.core.study_base import StudyResults
from matcal.core.utilities import (check_value_is_positive_integer,
check_value_is_array_like_of_reals,
check_value_is_nonempty_str,
check_value_is_array_like_of_reals,
check_value_is_positive_real,
check_value_is_bool,
check_value_is_nonnegative_integer,
check_item_is_correct_type)
from matcal.core.serializer_wrapper import matcal_load, matcal_save
from matcal.core.surrogates import (_average_l2_error_norm,
_max_error_inf_norm,
_process_surrogate_args_call,
_check_params_in_range,
_convert_param_array_to_dict)
logger = initialize_matcal_logger(__name__)
def _get_parameter_bounds(parameters):
param_bounds = []
for name, parameter in parameters.items():
param_bounds.append(np.atleast_2d([parameter.get_lower_bound(),
parameter.get_upper_bound()]))
bounds = np.r_[*param_bounds]
return bounds
def _get_variable_from_bounds(bounds):
from pyapprox.variables import IndependentMarginalsVariable
marginals = [
stats.uniform(bound[0], bound[1] - bound[0]) for bound in bounds
]
return IndependentMarginalsVariable(marginals)
def _get_pyapprox_variable_transformer(study, bounds):
from pyapprox.variables.transforms import AffineTransform
variable = _get_variable_from_bounds(bounds)
return AffineTransform(variable)
def _setup_sparse_grid_surrogate(n_parameters, n_qois):
from pyapprox.surrogates.univariate.base import ClenshawCurtisQuadratureRule
from pyapprox.surrogates.affine.multiindex import DoublePlusOneIndexGrowthRule
from pyapprox.surrogates.univariate.lagrange import UnivariateLagrangeBasis
from pyapprox.surrogates.sparsegrids.combination import (
AdaptiveCombinationSparseGrid,
MaxNSamplesSparseGridSubspaceAdmissibilityCriteria,
VarianceRefinementCriteria
)
quad_rule = ClenshawCurtisQuadratureRule(store=True, bounds=[-1., 1.])
bases_1d = [UnivariateLagrangeBasis(quad_rule, 3) for dim_id in range(n_parameters)]
growth_rule = DoublePlusOneIndexGrowthRule()
sg = AdaptiveCombinationSparseGrid(n_qois, n_parameters)
sg.setup(
MaxNSamplesSparseGridSubspaceAdmissibilityCriteria(np.inf),
VarianceRefinementCriteria(),
bases_1d,
growth_rule
)
return sg
[docs]
class AdaptiveSurrogate:
"""
Stores the surrogate and training and test information regarding the surrogate
and the progress of training for the surrogate.
Can also be used to call the surrogate objects for predictions
using the surrogate models. Since all iterations of the surrogate are
stored, any version of the surrogate can be called.
"""
def __init__(self, target_field_name, indep_variable_name,
indep_variable_values, variable_transformer,
test_params, test_responses, param_names,
bounds):
"""
Create an :class:`AdaptiveSurrogate` instance.
:param str target_field_name: Name of the model field that the surrogate
will approximate (e.g., ``"temperature"``).
:param str indep_variable_name: Name of the auxiliary independent variable
(e.g., ``"time"`` or ``"x_position"``) that will be attached to the
surrogate output.
:param indep_variable_values: The values of the independent variable at
which the surrogate should be evaluated.
:type indep_variable_values: array‑like of real numbers
:param variable_transformer: Object that maps model parameters to the
canonical space required by the surrogate library.
:type variable_transformer: object with ``map_to_canonical`` and
``map_from_canonical`` methods
:param test_params: Parameter samples used for testing the surrogate.
:type test_params: :class:`numpy.ndarray` of shape ``(n_parameters, n_test)``
:param test_responses: Corresponding model responses for the test
parameter samples.
:type test_responses: :class:`numpy.ndarray` of shape
``(n_test, n_qois)``
:param param_names: Ordered list of parameter names that define the
mapping between positional arguments and model parameters.
:type param_names: list[str]
The constructor stores the supplied information and prepares internal
containers that will hold the surrogate objects, error histories and
sample counts as the adaptive training proceeds.
"""
self._surrogates: list = []
self._average_errors: list[float] = []
self._max_errors: list[float] = []
self._r2_scores: list[float] = []
self._sample_counts: list[int] = []
self._target_field_name: str = target_field_name
self._indep_variable_name = indep_variable_name
self._indep_variable_values = np.asarray(indep_variable_values)
self._variable_transformer = variable_transformer
self._test_params = test_params
self._test_responses = test_responses
self._param_names = param_names
self._bounds = bounds
self._enforce_training_data_parameter_range = True
[docs]
def enforce_training_data_parameter_range(self, enforce_training_data_parameter_range=True):
"""
By default the surrogate will error if called with a parameter set outside of the
parameter ranges used in the training data set. To call the surrogate for parameters
outside of the training data range, call this method with the argument set to False.
Adherence to the training data range can be reactivated by calling this method
with the argument set to True.
:param ignore_training_range: bool flag to ignore training data range.
:type ignore_training_range:
"""
check_value_is_bool(enforce_training_data_parameter_range,
"enforce_training_data_parameter_range")
self._enforce_training_data_parameter_range = enforce_training_data_parameter_range
def _add_iteration(
self,
surrogate,
nsamples
) -> None:
self._surrogates.append(copy.deepcopy(surrogate))
surrogate_values = self(self._test_params, batch_evaluate=True)[self._target_field_name]
average_l2_error = _average_l2_error_norm(self._test_responses,
surrogate_values)
max_abs_error = _max_error_inf_norm( self._test_responses, surrogate_values)
self._average_errors.append(average_l2_error)
self._max_errors.append(max_abs_error)
if self._test_responses.shape[1] > 1:
score = r2_score(self._test_responses, surrogate_values)
self._r2_scores.append(score)
else:
self._r2_scores.append(np.nan)
self._sample_counts.append(nsamples)
@property
def current_surrogate(self):
"""Return the most recent surrogate (or ``None`` if no iteration yet)."""
return self._surrogates[-1] if self._surrogates else None
@property
def average_error_history(self):
"""Returns the list of errors for the average error history. The average
error is calculated using
.. math::
E_{avg}
= \\frac{\\lVert \\mathbf{R}_{\\text{test}} - \\hat{\\mathbf{R}} \\rVert_{2}}
{N}
where :math:`N` is the number of QoIs in the response, :math:`{R}_{\\text{test}}` is the
test responses and :math:`{\\hat{R}}` is the surrogate responses.
"""
return self._average_errors
@property
def max_error_history(self):
"""Returns the list of errors for the max error history. The max
error is calculated using
.. math::
E_{max}
= \\lVert \\mathbf{R}_{\\text{test}} - \\hat{\\mathbf{R}} \\rVert_{\\infty}
where :math:`{R}_{\\text{test}}` is the
test responses and :math:`{\\hat{R}}` is the surrogate responses.
"""
return self._max_errors
[docs]
def score(self, surrogate_index=-1):
"""Returns the :math:`R^2` test score for the surrogate.
:param surrogate_index: optionally pick which surrogate to return the score for.
:type surrogate_index: int
"""
return self._r2_scores[surrogate_index]
@property
def sample_count_history(self):
"""Returns a list containing the number of samples used by each surrogate
training step."""
return self._sample_counts
def __call__(self, *args, surrogate_index=-1, batch_evaluate=False, transpose=False,
**kwargs):
"""
Evaluate a stored surrogate model. This is represented in mathematical notation by
.. math::
\\hat{\\mathbf{R}} = S_i\\bigl(\\mathbf{p}\\bigr),
where :math:`\\mathbf{R}` is the vector (or matrix) of output responses,
:math:`\\mathbf{p}` is the vector (or matrix) of input
parameters and :math:`S_i` denotes the selected surrogate model.
The surrogate objects includes all the
models generated during the adaptive training process. This method
provides an interface for retrieving predictions from any
version of the surrogate during training using the ``surrogate_index``
keyword argument.
:param *args: Positional arguments representing the model parameters.
The accepted calling patterns are:
* **Single‑sample evaluation** (``batch_evaluate=False``) – a
tuple whose length equals the number of model parameters.
The values are interpreted in the order defined by the
:class:`matcal.core.parameters.ParameterCollection` or order of parameters
passed to the adaptive surrogate training study.
* **Batch evaluation** (``batch_evaluate=True``) – a single argument
that must be a two‑dimensional ``np.ndarray`` of shape
``(n_samples, n_parameters)``. The array is forwarded unchanged
to the surrogate.
:type *args: tuple or np.ndarray
:param surrogate_index: Index of the surrogate to use. ``-1`` selects the
most recent surrogate. Any valid list index is accepted.
:type surrogate_index: int, optional
:param batch_evaluate: When ``True`` the call is interpreted as a *batch*
evaluation; otherwise it is a *single‑sample* evaluation.
:type batch_evaluate: bool, optional
:param **kwargs: Keyword arguments that map each parameter name to the desired
evaluation value. This calling style is mutually exclusive
with the positional ``*args`` form.
:type **kwargs: dict
:return: The surrogate prediction ``\\hat{\\mathbf{R}}``. For a single
sample, this is a dictionary
containing two one‑dimensional arrays of length ``n_qois``
with the independent variable and the corresponding target variable
response. For a batch evaluation, it is a two‑dimensional array of shape
``(n_samples, n_qois)``.
:rtype: np.ndarray or dict(str, np.ndarray)
:raises RuntimeError: If the supplied arguments do not match any of the
supported calling conventions (wrong number of positional arguments,
missing or extra keyword arguments, etc.).
"""
surrogate = self._surrogates[surrogate_index]
response = surrogate(*args, batch_evaluate=batch_evaluate, transpose=transpose,
**kwargs)
return response
[docs]
class SparseGridAdaptiveSurrogate(AdaptiveSurrogate):
def __call__(self, *args, surrogate_index=-1, batch_evaluate=False,
transpose=True, **kwargs):
surrogate = self._surrogates[surrogate_index]
params_array = _process_surrogate_args_call(self._param_names, *args,
batch_evaluate=batch_evaluate, transpose=transpose, **kwargs)
if not batch_evaluate:
params_array = np.atleast_2d(params_array).T
params_dict = _convert_param_array_to_dict(params_array.T, self._param_names)
_check_params_in_range(params_dict, self._bounds.T,
self._enforce_training_data_parameter_range)
params_array = self._variable_transformer.map_to_canonical(params_array)
response = surrogate(params_array)
if not batch_evaluate:
response = response.flatten()
results = {self._target_field_name:response}
results[self._indep_variable_name] = self._indep_variable_values
return results
[docs]
class AdaptiveSurrogateStudyBase(HaltonStudy):
def __init__(self, *parameters):
super().__init__(*parameters)
self._bounds = _get_parameter_bounds(self._parameter_collection)
self._target_field_name = None
self._independent_variable=None
self._independent_variable_values=None
self._evaluation_set_added = False
self._results_synchronizer = None
self._surrogate = None
self._variable_transformer = None
self._user_test_data = None
self._max_training_samples=None
self._training_samples_user_set = False
self._number_of_test_samples=None
self._test_samples_user_set = False
self._training_batch_number = 1
self.set_max_training_samples()
self._average_l2_error_goal = None
self._max_abs_error_goal = None
self.set_error_stopping_criteria()
self._surrogate_save_filename = None
self._test_group_random_seed = None
[docs]
def set_error_stopping_criteria(self,
average_l2_error_goal: float=1e-2,
max_abs_error_goal: float=1e-1):
"""
Set the error thresholds that determine when the adaptive surrogate
training stops.
When
the *average L2* error falls below ``average_l2_error_goal`` **or** the
*maximum absolute* error falls below ``max_abs_error_goal`` the training
loop terminates (provided at least two batches have been evaluated).
:param average_l2_error_goal: Desired upper bound for the average L2
error. Must be a positive number.
:type average_l2_error_goal: float, optional
:param max_abs_error_goal: Desired upper bound for the maximum absolute
error. Must be a positive number.
:type max_abs_error_goal: float, optional
"""
check_value_is_positive_real(average_l2_error_goal, "average_l2_error_goal")
self._average_l2_error_goal = float(average_l2_error_goal)
check_value_is_positive_real(max_abs_error_goal, "max_abs_error_goal")
self._max_abs_error_goal = float(max_abs_error_goal)
[docs]
def set_independent_variable(self, independent_variable,
independent_variable_values):
"""
Specify an independent (auxiliary) variable and the values at which the surrogate
will be evaluated.
This variable is **not** a model input; it is a field that will be used later
(for example, a spatial coordinate, a time step, or any other scalar quantity
that the surrogate should be conditioned on). The surrogate will be trained
on the parameter samples generated by the study and then provide a response at each
value supplied in ``independent_variable_values``.
:param independent_variable: Name of the independent variable (e.g. ``"time"``,
``"x_position"``, …) that will be attached to the surrogate output.
:type independent_variable: str
:param independent_variable_values: A 1‑D array‑like collection of real numbers
indicating the points at which the surrogate should be queried.
:type independent_variable_values: array‑like of real numbers
"""
check_value_is_nonempty_str(independent_variable, "independent_variable")
self._independent_variable = independent_variable
check_value_is_array_like_of_reals(independent_variable_values,
"independent_variable_values")
self._independent_variable_values = independent_variable_values
logger.debug(f"Independent variable field set to {self._independent_variable}")
logger.debug(f"Independent variable values set to {self._independent_variable_values}")
[docs]
def set_number_of_test_samples(self, number_of_test_samples):
"""
Set the number of samples that will be used for testing.
By default we test against ``max_training_samples``/20 or
the number of parameters*10, whichever is greater.
:param max_training_samples: desired number of test samples
:type max_training_samples: int
"""
self._test_samples_user_set = True
check_value_is_positive_integer(number_of_test_samples, "number_of_test_samples")
self._number_of_test_samples = number_of_test_samples
super().set_number_of_samples(self._number_of_test_samples)
logger.debug(f"number_of_test_samples set to {self._number_of_test_samples}")
[docs]
def set_max_training_samples(self, max_training_samples=1000):
"""
Set the maximum number of training samples you want to be run for
Sparse Grid surrogate generation. If the convergence criteria is not reached,
the training for the surrogate will stop after max_training_samples has been
reached.
:param max_training_samples: desired maximum number of samples
:type max_training_samples: int
"""
check_value_is_positive_integer(max_training_samples, "max_training_samples")
self._max_training_samples = max_training_samples
logger.debug(f"max_training_samples set to {self._max_training_samples}")
if not self._test_samples_user_set:
self.set_number_of_test_samples(self._set_default_number_of_test_samples())
self._test_samples_user_set = False
[docs]
def set_test_data(self, study_results):
"""
Provide an external test‑data set for the adaptive surrogate study. This must contain
the model name and field names necessary for the surrogate. This should only be used
when re-running surrogate generation with a previously existing test set from a
previous run where surrogate training was attempted. The independent variable data
must also match what is specified for surrogate training.
If this method is **not** called, the adaptive study will automatically
generate a test data set using a Halton sampling design. The number of
test samples is taken from the value set via
:meth:`~matcal.core.adaptive_surrogates.AdaptiveSurrogateStudyBase.set_number_of_test_samples`
(default is ``max_training_samples // 20`` or ``n_parameters * 10``,
whichever is larger). Supplying an explicit test data set overrides that
behavior.
:param study_results: The test data to be used for surrogate evaluation.
* **StudyResults** – a :class:`~matcal.core.study_base.StudyResults`
instance containing the desired parameter history and simulation
results.
* **str** – a path to a serialized ``.joblib`` file that, when loaded
returns a ``StudyResults`` object.
:type study_results: :class:`~matcal.core.study_base.StudyResults` or ``str``
:raises TypeError: If ``study_results`` is neither a ``StudyResults`` instance
nor a string.
:raises FileNotFoundError: If ``study_results`` is a string but the file
cannot be located or loaded.
:raises RuntimeError: If the loaded object is not a ``StudyResults`` instance.
:notes:
* The supplied test set is **only** used for validation of the surrogate;
it is never incorporated into the training data.
* Calling this method multiple times replaces any previously stored test
data with the most recent value.
* The test data must be compatible with the study’s parameter space
(same parameter names and bounds as the training data).
"""
check_item_is_correct_type(study_results, (StudyResults, str), "study_results")
if isinstance(study_results, str):
self._user_test_data = matcal_load(study_results)
if not isinstance(self._user_test_data, StudyResults):
raise RuntimeError(f"The data loaded by loading {study_results} is not " +
"a study results object and cannot be used for surrogate testing.")
elif isinstance(study_results, StudyResults):
self._user_test_data = study_results
else:
raise RuntimeError("Improper study results passed for the " +
"adaptive surrogate test data.")
def _set_default_number_of_test_samples(self) -> int:
"""
Compute the default number of test samples.
The rule follows:
* ``max_training_samples // 20`` (integer division) **or**
* ``n_parameters * 10``
Whichever of the two values is larger becomes the default.
:returns: Default number of test samples for the current study.
:rtype: int
"""
candidate_a = int(self._max_training_samples // 20) # floor division → int
candidate_b = self._number_parameters * 10
return max(candidate_a, candidate_b)
[docs]
def set_target_field_name(self, target_field_name):
"""Specify the field name for the response that the surrogate model
will seek to replicate. This is generally a model response such as temperature,
load, etc.
:param target_field_name: the name of the field that the surrogate will
replicate
:type target_field_name: str
"""
check_value_is_nonempty_str(target_field_name, "target_field_name")
self._target_field_name = target_field_name
def _get_test_data(self):
results = None
if self._user_test_data is not None:
results = self._user_test_data
else:
results = self._run_test_sampling()
test_params = self._format_params(results)
test_responses = self._format_output(results)
return test_params, test_responses
def _run_test_sampling(self):
"""
Execute the initial test‑sampling phase in a dedicated sub‑directory.
This method:
1. Sets up a temporary working directory via
:meth:`_setup_test_sampling_working_directory`.
2. Calls the parent ``HaltonStudy`` ``launch`` method to generate the
test samples.
3. Restores the original working directory.
4. Stores the formatted test parameters and responses for later use.
:raises RuntimeError: If the test‑sampling launch fails.
"""
orig_remove = self._remove_existing_working_directory
orig_working_directory = self._update_work_dir_for_test_sampling()
seed = self._seed
if self._test_group_random_seed is not None:
self.set_seed(self._test_group_random_seed)
super().launch()
test_results = copy.deepcopy(self._results)
self._reset_study_after_test_sampling_generation(orig_working_directory, orig_remove)
if seed is not None:
self.set_seed(seed)
return test_results
[docs]
def set_test_group_random_seed(self, seed):
"""
Set the random seed for the random generator that the study uses
to generate the test samples only.
The method should be called **before**
:meth:`launch` to
guarantee reproducibility.
:param seed: Integer seed for the pseudo‑random number generator.
:type seed: int
"""
check_value_is_positive_integer(seed, "seed")
self._test_group_random_seed = seed
def _update_work_dir_for_test_sampling(self):
"""
Prepare a temporary working directory for the initial test‑sampling phase.
The method follows the same logic that was previously in
:meth:`launch`:
* If the user has already set a working directory (``self._working_directory``)
it is suffixed with ``"_test_samples"``.
* If no working directory was set, a new directory named ``"test_samples"``
is used.
The original directory (or ``None`` if none was set) is returned so the
caller can restore it after the test run has completed.
:return: The original working‑directory path before it was modified,
or ``None`` if the study had no working directory set.
:rtype: str | None
"""
original_dir = None
if self._working_directory is not None:
original_dir = self._working_directory
test_samples_directory = self._working_directory + "_test_samples"
else:
test_samples_directory = os.path.abspath("test_samples")
self.set_working_directory(test_samples_directory,
remove_existing=self._remove_existing_working_directory)
return original_dir
def _reset_study_after_test_sampling_generation(self, orig_working_directory,
remove_existing):
self._working_directory = orig_working_directory
self._remove_existing_working_directory = remove_existing
self._results = None
self._next_evaluation_id_number = 1
[docs]
def add_evaluation_set(self, model, state=None, qoi_extractor=None):
"""
Add an evaluation set that uses a
:class:`~matcal.core.objective.SimulationResultsSynchronizer`
generated from the study’s independent variable, independent‑variable values,
and target field name.
.. warning::
For adaptive surrogates, this can only be called **once** as the
training points are adaptively chosen based on the response of
interest.
This method is a thin wrapper around :meth:`StudyBase.add_evaluation_set`.
It accepts only the *model* (required) and an optional *state* argument.
``state`` must be a single :class:`~matcal.core.state.State` instance; a
collection of states is **not** supported. If ``state`` is ``None`` MatCal's
default state will be used.
The synchronizer is built automatically from the attributes that
were defined via :meth:`set_independent_variable` and
:meth:`set_target_field_name`.
:param model: The model that will generate the simulation data.
:type model: :class:`~matcal.core.models.ModelBase`
:param state: The single state to which the evaluation set should be applied.
If ``None`` the model’s default state is used.
:type state: :class:`~matcal.core.state.State`, optional
:param qoi_extractor: Provide a
:class:`~matcal.core.qoi_extractor.UserDefinedExtractor` that will act on the
simulation results to provide a quantity of interest for the surrogate.
It must return target field values of the same length of the
independent variable values.
:type qoi_extractor: :class:`~matcal.core.qoi_extractor.UserDefinedExtractor`
:raises RuntimeError: If the required attributes for the synchronizer
(independent variable, its values, or target field name) have not been set.
"""
if self._evaluation_set_added:
raise RuntimeError(
"add_evaluation_set can only be called once for a "
f"{self.__class__.__name__} instance because adaptivity "
"is only supported for a single model and single response of interest."
)
if state is not None and not isinstance(state, State):
raise TypeError(
f"{self.__class__.__name__}.add_evaluation_set expects ``state`` "
"to be a single `State` instance (or None)."
)
self._results_synchronizer = self._make_simulation_results_synchronizer(qoi_extractor)
super().add_evaluation_set(
model,
objectives=self._results_synchronizer,
data=None,
states=state,
)
self._evaluation_set_added = True
def _make_simulation_results_synchronizer(self, qoi_extractor):
"""
Build a :class:`~matcal.core.objective.SimulationResultsSynchronizer`
that will be used by the surrogate study.
The synchronizer evaluates the *simulation* at the user‑provided
independent‑variable locations and extracts the target field as the
dependent quantity.
:return: Configured ``SimulationResultsSynchronizer`` instance.
:rtype: SimulationResultsSynchronizer
:raises RuntimeError: If any of the required attributes have not been set.
"""
if self._independent_variable is None:
raise RuntimeError(
"Independent variable name has not been set. Call "
"`set_independent_variable` before creating the synchronizer."
)
if self._target_field_name is None:
raise RuntimeError(
"Target field name has not been set. Call "
"`set_target_field_name` before creating the synchronizer."
)
sim_res_synch = SimulationResultsSynchronizer(
self._independent_variable, self._independent_variable_values,
self._target_field_name
)
if qoi_extractor is not None:
if not isinstance(qoi_extractor, UserDefinedExtractor):
raise TypeError(f"The qoi extractor passed to {self.__class__.__name__} "+
f"must be a UserDefinedExtractor. Received "+
f"argument of type '{type(qoi_extractor)}'. Check input.")
sim_res_synch.set_simulation_qoi_extractor(qoi_extractor)
return sim_res_synch
[docs]
def launch(self):
"""
Run the initial test‑sampling study in a dedicated sub‑directory,
then continue with the adaptive Sparse‑Grid workflow.
The test‑sampling phase is performed by a standard **HaltonStudy**
to generate the required test points. If the user called
:meth:`StudyBase.set_working_directory` before launching the study,
the test‑sampling directory is created by appending the suffix
``\"_test_samples\"`` to the user‑provided path. Otherwise, the test
samples are run in a local directory named ``\"test_samples\"``.
After the test sample study finishes, the original working directory is restored
and the surrogate‑building routine is started.
"""
test_params, test_responses = self._get_test_data()
param_names = self._parameter_collection.get_item_names()
self._variable_transformer = self._variable_transformer_factory(self._bounds)
self._surrogate = self._adaptive_surrogate_class(self._target_field_name,
self._independent_variable,
self._independent_variable_values,
self._variable_transformer,
test_params, test_responses, param_names,
self._bounds)
self._run_study = self._perform_adaptive_surrogate_batch_sampling
return super().launch()
def _stopping_criterion_met(self, training_batch_number, stop=False):
if training_batch_number > 0:
if np.abs(self._surrogate.average_error_history[-1]) <= self._average_l2_error_goal:
logger.info(f"Average L2 norm score converged! "+
f"\nFinal L2 norm error: {self._surrogate.average_error_history[-1]}")
stop=True
elif np.abs(self._surrogate.max_error_history[-1]) <=self._max_abs_error_goal:
logger.info(f"Max absolute error score converged! "+
f"\nFinal max error: {self._surrogate.max_error_history[-1]}")
stop=True
if self._results.number_of_evaluations > self._max_training_samples and not stop:
logger.info("Surrogate not converged yet, but maximum training "+
"samples reached. Exiting.")
stop=True
if stop:
logger.info(f"Surrogate trained on {self._results.number_of_evaluations} samples.")
else:
logger.info("Surrogate not converged yet.")
logger.info(f"Average error score: {self._surrogate.average_error_history[-1]}")
logger.info(f"Max error score: {self._surrogate.max_error_history[-1]}")
logger.info(f"R2 score: {self._surrogate.score()}\n")
return stop
def _matcal_evaluate_parameter_sets_batch_adaptive_training(self, parameter_sets):
self._populate_parameter_evaluations_adaptive(parameter_sets)
current_batch = len(self._surrogate.sample_count_history)
logger.info(f"Active learning batch {current_batch+1}. ")
if current_batch > 0:
logger.info(f"Currently the surrogate is trained on "+
f"{self._surrogate.sample_count_history[-1]} samples.")
logger.info("................................................................")
eval_meth = super()._matcal_evaluate_parameter_sets_batch
batch_results = eval_meth(self._parameter_sets_to_evaluate)
return self._format_batch_results(batch_results, parameter_sets)
def _add_parameter_evaluation(self, **p):
super()._add_parameter_evaluation(**p)
@property
def surrogate(self):
"""
Return the :class:`~matcal.core.adaptive_surrogates.AdaptiveSurrogate`
instance that holds the
surrogate models and their training history.
:return: The surrogate object, or ``None`` if the study has not yet
created one (i.e., before :meth:`launch` is called).
:rtype: :class:`~matcal.core.adaptive_surrogates.AdaptiveSurrogate` | None
"""
return self._surrogate
[docs]
def set_surrogate_save_filename(self, filename):
"""
Set the path used to save the surrogate object after each training batch.
The surrogate (an :class:`~matcal.core.adaptive_surrogates.AdaptiveSurrogate`
instance) is periodically saved to
disk with :func:`matcal.core.serializer_wrapper.matcal_save`. The filename
must be a non‑empty string that ends with the ``.joblib`` extension.
The directory
component of the path is not created automatically; it must already exist
or be created by the user prior to calling this method.
:param str filename: Full path (absolute or relative) to the file that will
store the surrogate. The filename **must** end with ``.joblib``.
Example: ``"my_model_sparse_grid_surrogate.joblib"`` or
``"/tmp/surrogate.joblib"``.
:raises ValueError: If *filename* does not contain the required ``.joblib``
suffix or is empty.
:raises TypeError: If *filename* is not a string.
"""
check_value_is_nonempty_str(filename, "filename")
if ".joblib" not in filename:
raise ValueError("The save filename for the Adaptive Surrogate Study " +
f"must end with \".joblib\". Passed filename is \"{filename}\".")
self._surrogate_save_filename = filename
@property
def surrogate_save_filename(self):
"""
Retrieve the filename (including the ``.joblib`` extension) that will be
used to save the surrogate object after each training batch.
:return: The absolute or relative path supplied via
:meth:`set_surrogate_save_filename`, or ``None`` if no filename has been set.
:rtype: str | None
"""
return self._surrogate_save_filename
@property
def results_synchronizer(self):
"""
Return the :class:`~matcal.core.objective.SimulationResultsSynchronizer` that
was created for this adaptive surrogate study.
The synchronizer is responsible for evaluating the model at the user‑provided
independent‑variable locations and extracting the target field (the quantity of
interest) from the simulation output. It is constructed the first time
:meth:`add_evaluation_set` is called and stored internally as
``self._results_synchronizer``. As a result, this should be called after
an evaluation set is added to the study.
:return: The ``SimulationResultsSynchronizer`` instance associated with the
study, or ``None`` if the synchronizer has not yet been created (i.e.
``add_evaluation_set`` has not been called).
:rtype: :class:`~matcal.core.objective.SimulationResultsSynchronizer` | None
"""
return self._results_synchronizer
def _format_params(self, results):
params_formatted = []
for param in results.parameter_history:
params_formatted.append(results.parameter_history[param])
return np.array(params_formatted).T
def _format_output(self, results):
model_name = self._get_model_names()[0]
objective = self._results_synchronizer
state_name = results.simulation_history[model_name].state_names[0]
qois = results.qoi_history[f"{model_name}:{objective.name}"]
sim_qois = qois.simulation_qois
nsamples = results.number_of_evaluations
nqois = len(self._independent_variable_values)
data = np.zeros((nsamples, nqois))
for idx, sim_qoi in enumerate(sim_qois):
data[idx,:] = sim_qoi[state_name][0][self._target_field_name]
return data
[docs]
class SparseGridAdaptiveSurrogateStudy(AdaptiveSurrogateStudyBase):
"""
The SparseGridAdaptiveSurrogateStudy builds a Sparse Grid adaptive surrogate
using the PyApprox library. They generally behave well for larger parameter spaces
and problems with discontinuities in the response of interest.
Some downsides for these surrogates
is that one must be trained independently for each response of interest.
As a result, this surrogate requires only a single model and state be passed to it.
It also requires that a target field name be specified for building the surrogate that
signifies the response of interest for the surrogate.
"""
_variable_transformer_factory= _get_pyapprox_variable_transformer
_adaptive_surrogate_class = SparseGridAdaptiveSurrogate
def _perform_adaptive_surrogate_batch_sampling(self):
from pyapprox.interface.model import ModelFromVectorizedCallable
n_qois = len(self._independent_variable_values)
canonical_model = ModelFromVectorizedCallable(n_qois, self._number_parameters,
self._matcal_evaluate_parameter_sets_batch_adaptive_training)
if self._surrogate_save_filename is None:
self.set_surrogate_save_filename(f"{self._get_model_names()[0]}_sparse_grid_surrogate.joblib")
sg = _setup_sparse_grid_surrogate(self._number_parameters, n_qois)
sg.step(canonical_model)
self._surrogate._add_iteration(sg, self._results.number_of_evaluations)
training_batch_number = len(self._surrogate.sample_count_history)
matcal_save(self._surrogate_save_filename, self._surrogate)
while not self._stopping_criterion_met(training_batch_number):
sg.step(canonical_model)
self._surrogate._add_iteration(sg, self._results.number_of_evaluations)
training_batch_number = len(self._surrogate.sample_count_history)
matcal_save(self._surrogate_save_filename, self._surrogate)
return self._results
def _populate_parameter_evaluations_adaptive(self, samples):
samples = self._variable_transformer.map_from_canonical(samples)
super()._populate_parameter_evaluations(samples.T)
def _format_batch_results(self, batch_results, parameter_sets_from_pyapprox):
model_name = self._get_model_names()[0]
objective_name = self._results_synchronizer.name
state_name = self._results.simulation_history[model_name].state_names[0]
formatted_results = np.zeros((parameter_sets_from_pyapprox.shape[1],
len(self._independent_variable_values)))
for idx, qoi in enumerate(batch_results["qois"]):
qoi = qoi[model_name][objective_name]
formatted_results[idx, :] = qoi.simulation_qois[state_name][0][self._target_field_name]
return formatted_results
def _fit_surrogate_model(eval_info, interpolation_field, interpolation_locations,
test_eval_info, target_field, save_filename='voronoi_surrogate',
logger_on=True, **kwargs):
from matcal.core.surrogates import SurrogateGenerator
decomp_var=0.99
if "decomp_var" in kwargs:
decomp_var = kwargs.pop("decomp_var")
surrogate_generator = SurrogateGenerator(eval_info, training_fraction=1.0,
interpolation_field=interpolation_field,
interpolation_locations=interpolation_locations,
test_eval_info=test_eval_info, **kwargs)
surrogate_generator.set_fields_of_interest(target_field)
surrogate_generator.set_PCA_details(decomp_var=decomp_var)
surrogate_generator._logger_on=logger_on
save_filename = save_filename.split(".joblib")[0]
return surrogate_generator.generate(save_filename)
[docs]
class VoronoiAdaptiveSurrogateStudy(AdaptiveSurrogateStudyBase):
_adaptive_surrogate_class = AdaptiveSurrogate
def _return_none(*args, **kwargs):
return None
_variable_transformer_factory = _return_none
def __init__(self, *parameters):
"""Initialize the VoronoiAdaptiveSurrogateStudy
"""
super().__init__(*parameters)
self._num_initial_samples = None
self._test_eval_info = None
self.set_number_of_initial_samples()
self._voronoi_type = None
self._finite_only = None
self._iterative_updates = None
self._thin = None
self._random_selection = None
self.set_voronoi_sampling_options()
self._nsplits = None
self._nmax_folds = None
self._nmax_loo = None
self._cv_scale = None
self._cv_metric = None
self._group_kfold = None
self._loo_errors = None
self.set_cross_validation_options()
self._nbatch_samples = []
self.test_eval_info = None
self._convergence_metric = None
self._eps = None
self.set_convergence_criteria()
self._current_surrogate_score = {}
self._current_surrogate_score['score'] = []
self._current_surrogate_score['nlpd'] = []
self._current_surrogate_score['rmse'] = []
self._max_fold_error_indices = None
self._surrogate_options = {}
self._seed = None
def _update_surrogate_score(self):
latent_score = self._surrogate.current_surrogate._latent_scores['test']
self._current_surrogate_score['score'].append(_get_surrogate_metric(latent_score, 'score'))
self._current_surrogate_score['nlpd'].append(_get_surrogate_metric(latent_score, 'nlpd'))
self._current_surrogate_score['rmse'].append(_get_surrogate_metric(latent_score, 'rmse'))
def _build_boundary_hull(self):
from scipy.spatial import ConvexHull, Delaunay
self._boundary_points = self._make_nd_grid(2)
self._boundary_hull = ConvexHull(self._boundary_points)
self._boundary_hull_eq = self._boundary_hull.equations # (nfacet, ndim + 1)
self._boundary_hull_V, self._boundary_hull_b = self._boundary_hull_eq[:, :-1],\
self._boundary_hull_eq[:, -1] # normal, offset
self._bhullD = Delaunay(self._boundary_points)
def _make_nd_grid(self, npts_along_dim):
grid_pts = []
for param_index in np.arange(self._number_parameters):
grid_pts.append(np.linspace(self._bounds[param_index][0], self._bounds[param_index][1],
npts_along_dim))
coords = np.meshgrid(*grid_pts)
coords_ravel = [np.asarray(coords[i]).ravel() for i in np.arange(self._number_parameters)]
return np.vstack(tuple(coords_ravel)).T
[docs]
def set_number_of_initial_samples(self, num_initial_samples=None):
"""
:param num_initial_samples: The number of samples to initiate the algorithm with.
The initial samples are used to train the initial surrogate and built the
initial voronoi tessellation. Default 10*ndim.
:type initial_training_length: None or int
"""
if num_initial_samples is None:
self._num_initial_samples = 10*self._number_parameters
else:
check_value_is_positive_integer(num_initial_samples, "num_initial_samples")
self._num_initial_samples = num_initial_samples
[docs]
def set_voronoi_sampling_options(self, voronoi_type='full',
finite_only=False,
iterative_updates = True,
thin=None,
random_selection=None):
"""Set options pertaining to the voronoi sampling algorithm. Properties
that can be altered are listed below.
:param vornoi_type: Defines which Vornoi-based sampling strategy to use.
Supported options are:
* 'full': Constructs the full Voronoi tessellation over all points (Default)
* 'local': Constructs a local Voronoi tessellation using only nearby
points determined by k-nearest neighbors. This can reduce computational
cost in high dimensions.
:type voronoi_type: str
:param finite_only: If True, only Vornoi vertices that lie inside the
convex hull defined by the boundary points are consided as candidate sample
locations. If False, all vertices are considered, and those lying outside
the parameter bounds are clipped back to the convex hull. This is more flexible but
can be more computationally expensive, especially in high dimensions.
:type finite_onlye: bool
:param iterative_updates: If True, the Voronoi tessellation is recomputed
after each new sample is added, promoting a more space-filling design.
If False, the tessellation is updated once per batch after all samples
in the batch are selected. This can be faster but may result in sample
clustering.
:type iterative_updates: bool
:param thin: If specified, every nth candidate sample location is selected as a new
sample location. This can significantly reduce computational
cost in high-dimensional spaces.
:type thin: int or None
:param random_selection: If sepecified, this defines the number of candidate sample
locations that are randomly selected as new samples. This provides an
alternative way to reduce computational cost in high-dimensional problems.
:type random_selection: int or None
"""
check_value_is_nonempty_str(voronoi_type, "voronoi_type")
if voronoi_type.lower() not in ['full', 'local']:
raise ValueError(f"Voronoi type must be either 'full' or 'local', recieved '{voronoi_type}'.")
else:
self._voronoi_type = voronoi_type.lower()
check_value_is_bool(finite_only, "finite_only")
self._finite_only = finite_only
check_value_is_bool(iterative_updates, "iterative_updates")
self._iterative_updates = iterative_updates
if thin is not None:
check_value_is_positive_integer(thin, "thin")
self._thin = thin
if random_selection is not None:
check_value_is_positive_integer(random_selection, "random_selection")
self._random_selection = random_selection
if self._random_selection is not None and self._thin is not None:
raise ValueError("Only one of 'thin' and 'random_selection' can be activated. Not both.")
[docs]
def set_surrogate_options(self, **kwargs):
"""
:param regressor_kwargs: A keyword selection of parameters to pass to the predictor used.
Please refer to the sklearn documentation for more information for what can be passed to
the predictors.
"""
self._surrogate_options = kwargs
[docs]
def set_convergence_criteria(self, eps=1e-12, convergence_metric='nlpd'):
"""
Convergence is determined by comparing RMSE or NLPD of
surrogate between two successive batches.
:param convergence_metric: Choose from root mean squared error ('rmse')
or negative log posterior density ('nlpd') to track surrogate performance
at each batch iteration. This metric is used to determine if the surrogate
has converged according to eps.
:type convergence metric: str
:param eps: Tolerance for surrogate convergence.
:type eps: float
"""
self._eps = eps
self._convergence_metric = convergence_metric
[docs]
def set_cross_validation_options(self, nsplits=5, nmax_folds=3, nmax_loo=10, cv_scale=1.0,
cv_metric='rmse', group_kfold=False):
"""Set options for cross validation. Properties that can be altered are listed below.
:param nsplits: The number of folds to use in k-fold cross validation.
If nsplits = 0, k-fold cross-validation is skipped entirely and new samples
are instead selected from every region of the Voronoi tessellation defined
by the current set of training samples.
:type nsplits: int
:param nmax_folds: Points in the folds with the highest k-fold error
(the top nmax_folds) define the Voronoi regions from which new samples
will be drawn.
:type nmax_folds: int
:param nmax_loo: Points with the largest leave-one-out cross-validation (LOOCV)
errors (the top nmax_loo). These define the Voronoi regions from which new
samples will be drawn. If nmax_loo = 'all', then new samples are drawn from
all Voronoi regions defined by nmax_folds, and leave-one-out cross-validation
is not performed.
:type nmax_loo: int or 'all'
:param cv_scale: Optional scaling applied to output before calculating errors in
cross-validation and leave-one-out cross-validation. This can be used to
balance error magnitude across dimensions or outputs.
:type scale: float
:param cv_metric: Determines which metric is used when computing errors during
cross-validation. Supported options are:
* rmse -- root mean squared error (Default)
* nlpd -- negative log posterior density
:type cv_metric: str
:param group_kfold: If True, samples are grouped using k-means clustering
prior to k-fold cross-validation so that nearby points are allways assigned
to the same fold. This prevents spatially correllated points from being split
across training and validation sets. If False, folds are assigned randomly
by the standard KFold algorithm.
:type group_kfold: bool
"""
check_value_is_nonnegative_integer(nsplits, "nsplits")
self._nsplits = nsplits
check_value_is_positive_integer(nmax_folds, "nmax_folds")
self._nmax_folds = nmax_folds
if isinstance(nmax_loo, str):
if nmax_loo != 'all':
raise ValueError(f"If the {__class__} 'nmax_loo' parameter is a string, "+
"it must be 'all'.")
else:
try:
check_value_is_positive_integer(nmax_loo, "nmax_loo")
except TypeError:
raise TypeError(f"The {__class__} 'nmax_loo' parameter must be a positive integer "+
f"or the string 'all'. Recieved value {nmax_loo}.")
except ValueError:
raise ValueError(f"The {__class__} 'nmax_loo' parameter must be a positive integer "+
f"recieved value {nmax_loo}.")
self._nmax_loo = nmax_loo
check_value_is_positive_real(cv_scale, "cv_scale")
self._cv_scale = cv_scale
check_value_is_nonempty_str(cv_metric, "cv_metric")
self._cv_metric = cv_metric
valid_cv_metrics = ['rmse', 'nlpd']
if self._cv_metric not in valid_cv_metrics:
raise ValueError("cv_metric not implemented. 'cv_metric' must one of"
" 'rmse', 'nlpd'")
check_value_is_bool(group_kfold, "group_kfold")
self._group_kfold = group_kfold
def _format_output_for_surrogate_gen(self, results):
from matcal.core.data import convert_data_to_dictionary
model_name = self._get_model_names()[0]
state_name = results.simulation_history[model_name].state_names[0]
sim_history = self._results.simulation_history[model_name][state_name]
nsamples = results.number_of_evaluations
data = []
for nn in np.arange(nsamples):
data.append(convert_data_to_dictionary(sim_history[nn]))
return data
def _reset_study_after_test_sampling_generation(self, orig_working_directory, remove_existing):
self._test_eval_info = copy.deepcopy(self._results)
super()._reset_study_after_test_sampling_generation(orig_working_directory, remove_existing)
def _perform_adaptive_surrogate_batch_sampling(self):
if self._surrogate_save_filename is None:
self.set_surrogate_save_filename(f"{self._get_model_names()[0]}_voronoi_adaptive_surrogate.joblib")
self._build_boundary_hull()
self.param_names = self._parameter_collection.get_item_names()
training_params, training_data = self._run_initial_training_samples()
batch_number = 0
while not self._stopping_criterion_met(batch_number):
logger.info(f"Active learning batch {batch_number+1}."
f"\nCurrently the surrogate is trained on "+
f"{self._nbatch_samples[-1]} samples.")
logger.info("................................................................")
new_points = self._create_voronoi_tess_and_choose_new_samples(batch_number,
training_params,
training_data)
self._populate_parameter_evaluations(new_points)
self._matcal_evaluate_parameter_sets_batch(self._parameter_sets_to_evaluate)
training_params, training_data = self._train_surrogate_with_current_results()
batch_number += 1
return self._results
def _stopping_criterion_met(self, training_batch_number, stop=False):
scores = self._current_surrogate_score
if training_batch_number > 1:
this_score = scores[self._convergence_metric][training_batch_number]
last_score = scores[self._convergence_metric][training_batch_number-1]
if np.abs(this_score - last_score) <= self._eps:
logger.info(f"Surrogate Converged!\n"+
f"Convergence from surrogate '{self._convergence_metric}' score:")
logger.info(f"Final score: {this_score}")
logger.info(f"Score delta: {np.abs(this_score - last_score)}")
logger.info(f"Score delta convergence criteria: {self._eps}\n")
stop = True
return super()._stopping_criterion_met(training_batch_number, stop)
def _run_initial_training_samples(self):
super().set_number_of_samples(self._num_initial_samples)
super()._generate_samples(self._num_initial_samples, self._skip)
self._matcal_evaluate_parameter_sets_batch(self._parameter_sets_to_evaluate)
return self._train_surrogate_with_current_results()
def _train_surrogate_with_current_results(self):
training_params = self._format_params(self._results)
training_data = self._format_output_for_surrogate_gen(self._results)
current_surrogate = _fit_surrogate_model(self, interpolation_field=self._independent_variable,
interpolation_locations=self._independent_variable_values,
test_eval_info=self._test_eval_info,
target_field=self._target_field_name,
save_filename=self._surrogate_save_filename,
**self._surrogate_options)
self._surrogate._add_iteration(current_surrogate, self._results.number_of_evaluations)
self._update_surrogate_score()
self._nbatch_samples.append(self.results.number_of_evaluations)
return training_params, training_data
def _create_voronoi_tess_and_choose_new_samples(self, iteration, training_params,
training_data):
"""Perform Voronoi batch sampling based on the specified algorithm.
:return: Returns a list of the new samples selected.
"""
if self._nsplits > 0:
# Step 1: Randomly sort existing samples into K-folds and perform KFold Cross Validation
self._perform_kfold_cross_validation(training_params, training_data)
# Step 2: Select the fold(s) with the n largest K-fold CV error(s)
self._find_kfold_max_errors()
if self._nmax_loo == 'all':
worst_sample_locations = training_params[self._max_fold_error_indices]
else:
# Step 3: Use LOOCV to evaluate each sample within the selected fold(s)
self._perform_loo_cross_validation(training_params, training_data)
# Step 4: Identify the n sample(s) with the highest LOOCV error(s)
worst_sample_locations = self._find_loo_max_errors(training_params)
else:
# Do not perform kfold or loo CV. New samples drawn for all Voroni regions.
worst_sample_locations = training_params
if self._thin is not None:
# thin the new samples locations according to "thin"
worst_sample_locations = worst_sample_locations[::self._thin, ...]
elif self._random_selection is not None:
# randomly select the new sample locations from the candidates in worst_sample_locations
draw_n = np.min((int(0.5 * worst_sample_locations.shape[0]), self._random_selection))
random_rows = np.random.choice(worst_sample_locations.shape[0],
size=draw_n, replace=False)
worst_sample_locations = worst_sample_locations[random_rows, ...]
self._worst_sample_locations = worst_sample_locations
logger.info(f"Initializing voronoi/tree for batch {iteration}")
self._create_voronoi_tess(training_params)
return self._find_new_sample_locations()
def _create_voronoi_tess(self, training_params):
if self._voronoi_type == 'full':
# Initialize Voronoi tessellation
self._voronoi_tessellation = VoronoiTessellation(training_params, self._bounds,
self._finite_only)
self._voronoi_tessellation.build()
elif self._voronoi_type == 'local':
# Make a local voronoi tesselation for each new sample by using knearest neighbors
# to determine the closest points
from scipy.spatial import KDTree
self._all_tree_points = training_params.copy()
self._tree = KDTree(self._all_tree_points)
def _find_new_sample_locations(self):
new_points = []
logger.info("Finding new sample locations")
for loc_idx, location in enumerate(self._worst_sample_locations):
if np.mod(loc_idx, 100) == 0:
logger.info(f"Drawing new sample from region index {loc_idx}"
f" of {len(self._worst_sample_locations)}.")
if self._voronoi_type == 'full':
# Identify corresponding voronoi cell
region_index = self._voronoi_tessellation.get_voronoi_region(location)[0][0]
# Step 5: Select the point within this sample’s Voronoi cell that
# is furthest from existing samples
region_vertices, furthest_vertex_index = \
self._voronoi_tessellation.find_furthest_vertex(region_index)
if region_vertices is None:
continue
furthest_vertex = region_vertices[furthest_vertex_index]
# Step 6: Add the new point and update Voronoi tessellation
if self._iterative_updates:
self._voronoi_tessellation.add_points(furthest_vertex)
# Step 7: Update X
new_points.append(furthest_vertex)
elif self._voronoi_type == 'local':
nneighbors = int(self._all_tree_points.shape[0] * 0.25)
nearest_neighbors = self._tree.query(location, k=nneighbors)
nn_points = self._all_tree_points[nearest_neighbors[1].squeeze()]
nn_vor = VoronoiTessellation(nn_points, self._bounds, self._finite_only)
nn_vor.build()
nn_region = nn_vor.get_voronoi_region(location)[0][0]
try:
nn_vert, nn_fvi = nn_vor.find_furthest_vertex(nn_region)
except:
continue
if nn_vert is None:
continue
furthest_vertex = nn_vert[nn_fvi]
new_points.append(furthest_vertex)
if self._iterative_updates:
from scipy.spatial import KDTree
self._all_tree_points = np.vstack((self._all_tree_points, furthest_vertex))
self._tree = KDTree(self._all_tree_points)
new_points = np.asarray(new_points)
# Make sure all new points are unique
unique_points = set(tuple(row) for row in new_points)
new_points = np.asarray([list(row) for row in unique_points])
new_points = self._check_points_within_bounds(new_points)
return new_points
def _check_points_within_bounds(self, points):
# verify that all samples are within bounds
lb = self._bounds[:,0]
ub = self._bounds[:,1]
mask = ((points >= lb) & (points <= ub)).all(axis=1)
return points[mask]
def _perform_kfold_cross_validation(self, training_params, training_data):
self._kf = None
logger.info("Performing kfold cross-validation")
kfcv = KFoldCrossValidation(self._nsplits, self._group_kfold, self._independent_variable,
self._independent_variable_values,
self._cv_scale, self._cv_metric, self._target_field_name,
self.param_names, self._surrogate_options)
groups = None
if self._group_kfold:
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=self._nsplits, random_state=42)
groups = kmeans.fit_predict(training_params)
self._kf = kfcv.perform_kfold_cv(training_params, training_data, groups)
def _find_kfold_max_errors(self):
max_folds = self._find_indices_of_n_largest_kf_errors()
self._max_fold_error_indices = np.concatenate(list(max_folds.values()))
logger.info(f"\n\tWorst kfold errors associated with the following sample indices:\n"+
f"\t{self._max_fold_error_indices}\n")
def _perform_loo_cross_validation(self, training_params, training_data):
self._loo_errors = None
logger.info("Finding worst sample locations using leave one out validation...")
loocv = LeaveOneOutCrossValidation(self._cv_scale, self._cv_metric,
self._independent_variable,
self._independent_variable_values,
self._target_field_name,
self.param_names, self._surrogate_options)
self._loo_errors = loocv.perform_loocv(training_params, training_data,
self._max_fold_error_indices)
def _find_loo_max_errors(self, training_params):
self._worst_sample_locations = None
max_loo_indices = self._find_indices_of_n_largest_errors()
logger.info(f"\n\tWorst errors when the following sample indices " +
"are left out of training:\n"+
f"\t{max_loo_indices}\n")
return training_params[max_loo_indices]
def _find_indices_of_n_largest_kf_errors(self):
# Create a list of (key, error, sample_index) tuples
items = [(key, value[0], value[1]) for key, value in self._kf.items()]
# Sort the items based on the error in descending order
sorted_items = sorted(items, key=lambda x: x[1], reverse=True)
# Get the top n items
top_n_items = sorted_items[:self._nmax_folds]
# Extract the arrays associated with the top n largest floats
result_arrays = {key: array for key, _, array in top_n_items}
return result_arrays
def _find_indices_of_n_largest_errors(self):
"""Find the indices of the n largest values in an array of errors.
:return: Returns an array of indices corresponding to the n largest errors.
"""
nkeep = int(self._nmax_loo)
# Create a list of (key, error, sample_index) tuples
items = [(key, value[0], value[1]) for key, value in self._loo_errors.items()]
# Sort the items based on the error in descending order
sorted_items = sorted(items, key=lambda x: x[1], reverse=True)
# Get the top n items
top_n_items = sorted_items[:nkeep]
# Extract the indices associated with the top n largest floats
indices = [item[2] for item in top_n_items]
return np.array(indices)
def _add_parameter_evaluation(self, **p):
super()._add_parameter_evaluation(**p)
def add_parameter_evaluation(self, **parameters):
""""""
raise self.StudyInputError("Users cannot add parameter evaluations to"
" a VoronoiAdaptiveSurrogateStudy.")
class VoronoiTessellation:
def __init__(self, points, bounds, finite_only):
"""Initialize the VoronoiBatchSamplingStudy
:param points: Array of points that are the seeds of the Voronoi tessellation
:type points: nd_array
:param bounds: Bounds for the parameter space,
e.g., [(xmin, xmax), (ymin, ymax)] for a 2D space.
:type bounds: list of tuples
"""
self.points = np.array(points)
self.ndim = self.points.shape[1]
self.bounds = bounds
self.finite_only = finite_only
self.incremental = False
def build(self):
"""Initialize the Voronoi tessellation with given points and bounds.
"""
from scipy.spatial import Voronoi, Delaunay, ConvexHull
self.boundary_points = self._make_nd_grid(npts_along_dim=2)
if not self.finite_only:
self.boundary_hull = ConvexHull(self.boundary_points)
self.boundary_hull_eq = self.boundary_hull.equations # (nfacet, ndim + 1)
self.boundary_hull_V, self.boundary_hull_b = \
self.boundary_hull_eq[:, :-1], self.boundary_hull_eq[:, -1] # normal, offset
self.bhullD = Delaunay(self.boundary_points)
else:
self.boundary_hull = None
self.bhullD = None
self.create_ghost_points()
self._all_points = np.vstack([self.points, self._ghost_points])
self.vor = Voronoi(self._all_points, incremental=self.incremental)
self.ghost_busters()
self.boundary_regions = self.get_voronoi_region(self.boundary_points) # may need to update
def _make_nd_grid(self, npts_along_dim):
grid_pts = []
for dim in np.arange(self.ndim):
grid_pts.append(np.linspace(self.bounds[dim,0], self.bounds[dim,1], npts_along_dim))
coords = np.meshgrid(*grid_pts)
coords_ravel = [np.asarray(coords[i]).ravel() for i in np.arange(self.ndim)]
return np.vstack(tuple(coords_ravel)).T
def create_ghost_points(self, stretchCoef=1.75, centCoef=1.5):
"""Reflect points nearest to the boundary hull across the nearest
face of the boundary hull """
boundary_points_stretched = self.boundary_points * stretchCoef
self._ghost_points = boundary_points_stretched
boundary_centroid = np.mean(self.boundary_points, axis=0)
max_dist = np.max(np.linalg.norm(self.boundary_points - boundary_centroid, axis=1))
self._ghost_points = \
np.vstack([self._ghost_points, \
boundary_centroid + centCoef * max_dist * np.eye(self.points.shape[1])])
self._ghost_points =\
np.vstack([self._ghost_points, \
boundary_centroid - centCoef * max_dist * np.eye(self.points.shape[1])])
def ghost_busters(self):
""" Identify which points in self._all_points are ghost points"""
self._boo = []
for point in self._all_points:
if point in self._ghost_points:
self._boo.append(True)
else:
self._boo.append(False)
def get_region_vertices(self, region_index, identify_outside_vertices=True):
"""Return the vertices of the Voronoi region."""
region = self.vor.regions[region_index].copy()
if -1 in region:
logger.warning(f"Infinite vertice in Region {region_index}")
if identify_outside_vertices:
updated_region = self.identify_vertices_outside_bounds(region)
if not -2 in updated_region and len(updated_region) > 0:
region_vertices = self.vor.vertices[region]
elif -2 in updated_region:
if self.finite_only:
if max(updated_region) < 0:
region_vertices = None
else:
region_vertices = \
np.asarray([self.vor.vertices[i]\
for i in updated_region if i > 0])
else:
region_tuple_list = list(zip(region, updated_region))
region_vertices = \
self.replace_unbounded_vertices(updated_region, region_index, region_tuple_list)
if region_vertices is not None:
if not self.finite_only:
boundary_in_region = \
[i for i in np.arange(len(self.boundary_regions))\
if self.boundary_regions[i][0] == region_index]
if boundary_in_region:
boundary_vertices = self.boundary_points[boundary_in_region]
region_vertices = np.concatenate((region_vertices, boundary_vertices))
unique_vertices = set(tuple(row) for row in region_vertices)
return np.asarray([list(row) for row in unique_vertices])
return region_vertices
elif not identify_outside_vertices:
return self.vor.vertices[region]
def get_voronoi_vertices(self, identify_outside_vertices=True):
"""Return the vertices of the Voronoi tessellation."""
vertices = []
for i, region in enumerate(self.vor.regions):
try:
region_point_index, = np.where(self.vor.point_region == i)[0]
except:
# empty region: Voronoi region for a point at infinity that was added internally
continue
if self._boo[region_point_index]:
# region belongs to a ghost point
continue
elif -1 in region:
logger.warning(f"Infinite vertice in Region {i}")
if identify_outside_vertices:
updated_region = self.identify_vertices_outside_bounds(region)
if not -2 in updated_region and len(updated_region) > 0:
vertices.append(self.vor.vertices[region])
elif -2 in updated_region:
if self.finite_only:
verts = np.asarray([self.vor.vertices[i] for i in updated_region if i > 0])
vertices.append(verts)
else:
region_tuple_list = list(zip(region, updated_region))
vertices.append(self.replace_unbounded_vertices(updated_region, i, region_tuple_list))
boundary_in_region =\
[ii for ii in np.arange(len(self.boundary_regions))\
if self.boundary_regions[ii][0] == i]
if boundary_in_region:
boundary_vertices = self.boundary_points[boundary_in_region]
vertices.append(boundary_vertices)
elif not identify_outside_vertices:
vertices.append(self.vor.vertices[region])
if vertices is not None:
vertices = np.concatenate((vertices))
unique_vertices = set(tuple(row) for row in vertices)
return np.asarray([list(row) for row in unique_vertices])
else:
return vertices
def identify_vertices_outside_bounds(self, region):
"""
Identify vertices that sit outside the bounding region
:param region: A list of the voronoi regions. Each list contains indices of the voronoi
vertices forming each Voronoi region.
:type region: list
:return: Returns a new list of voronoi regions with vertices outside
the bounding region replaced with -2.
"""
#outside = lambda lb, ub, x: (x < lb) + (x > ub)
# Create a boolean mask for vertices outside the bounds
region = np.array(region)
region_vertices = self.vor.vertices[region]
outside_mask = np.zeros(region_vertices.shape, dtype=bool)
for col_index in range(self.ndim):
lb, ub = self.bounds[col_index,0], self.bounds[col_index, 1]
#vert_outside, = np.where(outside(lb, ub, region_vertices[:, col_index]))
outside_mask[:, col_index] |= \
(region_vertices[:, col_index] < lb) | (region_vertices[:, col_index] > ub)
# Get the indices of vertices that are outside the bounds
vert_outside = np.where(outside_mask.any(axis=1))[0]
if len(vert_outside) > 0:
outside_vert_index = [region[i] for i in vert_outside]
region[vert_outside] = -2
return region.tolist()
def replace_unbounded_vertices(self, region, region_index, region_tuple):
"""
Replace the infinite vertices in a Voronoi region with new vertices on
the edge of the bounding box.
** vertices that sit outside the bounding region are considered infinite here
:param region: A list of the voronoi regions. Each list contains indices
of the Voronoi vertices forming each Voronoi region.
:type region: list
:param region_index: Region index
:type region_index: int
:return: Returns a new list of voronoi regions with infinite vertices replaced.
"""
try:
region_point_index, = np.argwhere(self.vor.point_region == region_index)[0]
except:
raise ValueError("No region point index found in VoronoiTessesllation"
" for Adaptive Surrogate Generation. Try a different random seed.")
region_vertices = []
if -2 in region:
finite_indices = [v for v in region if v >= 0]
if len(finite_indices) > 0:
finite_vertices = self.vor.vertices[finite_indices]
new_vertices = self.snip_ridge_vertices(\
region_index, region_point_index, region_tuple)
# Replace the infinite vertex
if len(finite_indices) > 0:
region_vertices = np.concatenate((finite_vertices, new_vertices))
elif len(new_vertices) > 0:
region_vertices = new_vertices
else:
return None
else:
region_vertices = self.vor.vertices[region]
return region_vertices
def snip_ridge_vertices(self, region_index, region_point_index, region_tuple):
# Find the ridge vertices for the specified region
region_dict = {x[0]: x[1] for x in region_tuple}
# the voronoi points that are equidistant from the ridge that lies between them
ridge_point_indices = np.argwhere(self.vor.ridge_points == region_point_index)[:, 0]
# the vertices at the end of each ridge
region_ridge_vertices = [self.vor.ridge_vertices[i] for i in ridge_point_indices]
new_vertices = []
for rv in region_ridge_vertices:
urv = [region_dict.get(num) for num in rv]
if len(urv) == 2: #2D Voronoi region
u, v = np.argsort(urv)
# and urv[v] > 0: # only one vertice is out of bounds - snip one end to the boundary hull
if urv[u] == -2:
ray_end = self.vor.vertices[rv[u]]
ray_origin = self.vor.vertices[rv[v]]
norm_ray_direction =\
self.get_normal_ray_direction(ray_origin, ray_end)
new_vertice = \
self.find_boundary_hull_ray_crossings(norm_ray_direction, ray_origin)
# confirm new vertice is in given region
if new_vertice is not None and region_index in self.get_voronoi_region(new_vertice)[0]:
# confirm point is within boundary hull
if self.bhullD.find_simplex(new_vertice) >= 0:
new_vertices.append(new_vertice)
# both vertices are out of bounds - snip both ends to the boundary hull
if urv[v] == -2:
ray_end = self.vor.vertices[rv[v]]
ray_origin = self.vor.vertices[rv[u]]
norm_ray_direction = \
self.get_normal_ray_direction(ray_origin, ray_end)
new_vertice =\
self.find_boundary_hull_ray_crossings(norm_ray_direction, ray_origin)
if new_vertice is not None and region_index in self.get_voronoi_region(new_vertice)[0]:
if self.bhullD.find_simplex(new_vertice) >= 0:
new_vertices.append(new_vertice)
elif len(urv) > 2: #3D + Voronoi region
nunbounded_vert = urv.count(-1) + urv.count(-2)
if nunbounded_vert > 0 and nunbounded_vert < len(urv):
edges = [[rv[i], rv[(i+1) % len(rv)]] for i in range(len(rv))]
updated_edges = \
[[urv[i], urv[(i+1) % len(urv)]] for i in range(len(urv))]
unbounded_edges =\
[[i, edge] for i, edge in enumerate(updated_edges) if -2 in edge]
for i, ev in unbounded_edges:
u, v = np.argsort(ev)
if ev[u] == -2: # and ev[v] > 0:
ray_end = self.vor.vertices[edges[i][u]]
ray_origin = self.vor.vertices[edges[i][v]]
norm_ray_direction = \
self.get_normal_ray_direction(ray_origin, ray_end)
new_vertice = \
self.find_boundary_hull_ray_crossings(norm_ray_direction, ray_origin)
if new_vertice is not None and region_index in self.get_voronoi_region(new_vertice)[0]:
if self.bhullD.find_simplex(new_vertice) >= 0:
new_vertices.append(new_vertice)
if ev[v] == -2: # and ev[v] > 0:
ray_end = self.vor.vertices[edges[i][v]]
ray_origin = self.vor.vertices[edges[i][u]]
norm_ray_direction = \
self.get_normal_ray_direction(ray_origin, ray_end)
new_vertice = \
self.find_boundary_hull_ray_crossings(norm_ray_direction, ray_origin)
if new_vertice is not None and region_index in self.get_voronoi_region(new_vertice)[0]:
if self.bhullD.find_simplex(new_vertice) >= 0:
new_vertices.append(new_vertice)
return np.asarray(new_vertices)
def get_normal_ray_direction(self, ray_origin, ray_end):
ray_direction = ray_end - ray_origin
return ray_direction / np.linalg.norm(ray_direction)
def find_boundary_hull_ray_crossings(self, U, z):
"""Find where a ray crosses the convex hull of the boundary.
:param U: Ray direction.
:type U: np.ndarray
:param z: Ray origin.
:type z: np.ndarray
:return: Returns a list of intersection points with the convex hull.
"""
V = self.boundary_hull_V
b = self.boundary_hull_b
denom = np.dot(V, U)
num = -(b + np.dot(V, z))
alpha = num[denom!=0] / denom[denom!=0]
if not np.any(alpha > 0):
return None
return np.min(alpha[alpha >0]) * U + z
def find_furthest_vertex(self, region_index, identify_outside_vertices=True):
"""Find the vertex that has the greatest distance from the cell centroid."""
self.raise_if_invalid_region_index(region_index)
vertices = self.get_region_vertices(region_index,\
identify_outside_vertices=identify_outside_vertices)
if vertices is not None:
centroid = self.get_region_seed(region_index)
distances = np.linalg.norm(vertices - centroid, axis=1)
furthest_vertex_index = np.argmax(distances)
else:
furthest_vertex_index = None
return vertices, furthest_vertex_index
def get_region_seed(self, region_index):
"""Given a region_index, return the seed of the Voronoi tesselation that
belongs to the region.
:param region_index: Region index.
:type region_index: int
:return: Returns the Voronoi seed that belongs to the indexed region.
"""
point_index, = np.where(self.vor.point_region == region_index)
return np.atleast_2d(self.vor.points[point_index[0]])
def get_voronoi_region(self, point_array):
"""Given an array of points, return the region of the Voronoi tesselation that the
points belongs to. If a point lies on a ridge or vertice, multiple regions are
returned for that point.
:param point: An array of points to find the region of.
:type point: nd_array
:return: Returns a list of lists, where each sublist contains the Voronoi
region(s) that contains the point. A point on a ridge has a sublist with
two regions (for 2D). A point on a vertice has a sublist
with 3 regions (for 2D)
"""
point_array = np.atleast_2d(point_array)
region_index = []
for point in point_array:
point_already_exists = np.any(np.all(self.vor.points == point, axis=1))
if point_already_exists:
seed_index, = np.where(np.all(self.vor.points == point, axis=1))
else:
seed_index = self.get_closest_seed(point)
# Get the region index for the point
region_index.append(self.vor.point_region[seed_index].tolist())
region_index = [sublist if sublist else [np.inf] for sublist in region_index]
return region_index
def get_closest_seed(self, point):
"""Return the index of the seed of the Voronoi cell that contains the given point.
If the point lies on a ridge or vertex, multiple seeds are returned."""
closest_seed_index = self.get_closest_point(self.vor.points, point)
return closest_seed_index[0]
def get_closest_point(self, candidates, target_point):
"""Return the index of the candidate point that has the min distance
from the target point"""
distances = np.linalg.norm(candidates - target_point, axis=1)
min_dist = min(distances)
closest_candidate_index = np.where(np.isclose(distances, min_dist, rtol=0, atol=1e-10))
return closest_candidate_index
def remove_invalid_rows(self, arr):
""" Remove points from NumPy array that contain NaN or infinite values."""
if not isinstance(arr, np.ndarray):
raise TypeError("Input to remove_invalid_rows must be a NumPy array.")
arr = np.atleast_2d(arr)
# Create a boolean mask for valid rows (no NaN, no inf, no -inf)
mask = np.isfinite(arr).all(axis=1)
return arr[mask]
def add_points(self, points):
"""Process a set of additional points.
Voronoi has a built in function to add points
-- self.vor.add_points(points, restart=True).
However, 'incremental` must be set to True to use the built-in add_points()
method and is very slow. Qhull throws an error for dim>2 when
incremental=True and restart=False. This class method, which rebuilds
'manually` is faster.
"""
from scipy.spatial import Voronoi
if not isinstance(points, np.ndarray):
raise TypeError("Input to add_points must be a NumPy array.")
points = np.atleast_2d(points)
if not points.shape[-1] == self._all_points.shape[-1]:
raise ValueError(f"Points in add_points have a different dimension"
" ({points.shape[-1]}) than points in voronoi"
" tessellation ({self._all_points.shape[-1]})")
points = self.remove_invalid_rows(points)
if points.size == 0:
logger.warning("All input points were NaN or Inf."
" No new points added to voronoi tessellation.")
return
# make sure all new points are unique
all_points = np.vstack((self._all_points, points))
unique_points = set(tuple(row) for row in all_points)
self._all_points = np.asarray([list(row) for row in unique_points])
self.vor = Voronoi(self._all_points)
def raise_if_invalid_region_index(self, region_index):
if region_index > len(self.vor.regions) or region_index < 0:
raise ValueError('Invalid region index. Index must be in (0, nregions]')
class KFoldCrossValidation:
def __init__(self, nsplits, group_kfold, interpolation_field, interpolation_values,
scale, metric, target_field, param_names, surrogate_options):
"""Initialize the K-Fold Cross-Validation with a given surrogate model.
"""
self.nsplits = nsplits
self.group_kfold = group_kfold
self.scale = scale
self.metric = metric
self.interpolation_field = interpolation_field
self.interpolation_values = interpolation_values
self.target_field = target_field
self.param_names = param_names
self.surrogate_options = surrogate_options
def _check_npslits(self, training_params):
if self.nsplits > training_params.shape[0]:
self.nsplits = int(training_params.shape[0]/2.0)
logger.warning("Input parameter \"nsplits\" can't be greater than " +
"the number of samples in KFoldCrosValidation. Reducing " +
"number of splits to approximately half the number of samples.")
def perform_kfold_cv(self, training_params, training_data, groups):
"""
Perform K-Fold Cross-Validation.
:param X: Feature matrix (training samples).
:type X: np.ndarray
:param y: Target values (ground truth).
:type y: np.ndarray
:return: Returns the index of the sample with the greatest
prediction error and the corresponding error value.
tuple (index_of_max_error, max_error)
"""
self._check_npslits(training_params)
self.groups = groups
from sklearn.model_selection import GroupKFold, KFold
from joblib import Parallel, delayed
if self.group_kfold:
assert self.groups is not None
cv = GroupKFold(n_splits=self.nsplits)
kf_results = Parallel(n_jobs=1)(
delayed(self.evaluate_fold)(train_index, test_index, training_params, training_data, index)
for index, (train_index, test_index) in enumerate(cv.split(training_params, training_data, self.groups))
)
else:
cv = KFold(n_splits=self.nsplits, shuffle=True, random_state=1)
kf_results = Parallel(n_jobs=1)(
delayed(self.evaluate_fold)(train_index, test_index, training_params, training_data, index)
for index, (train_index, test_index) in enumerate(cv.split(training_params))
)
# Convert the results to a dictionary
kf = {k_idx: result for k_idx, result in enumerate(kf_results)}
return kf
def evaluate_fold(self, train_index, test_index, X, y, kfold_count):
logger.info(f"\tEvaluating test '{self.metric}' error for surrogate for kfold cross validation set {kfold_count}..." +
"")
info = self.extract_fold_info(train_index, test_index, X, y)
train_eval_info, test_eval_info, X_test, y_test = info
fold_surrogate = _fit_surrogate_model(train_eval_info, self.interpolation_field,
self.interpolation_values, test_eval_info,
self.target_field,
"kfold_validation_surrogate.joblib", logger_on=False,
**self.surrogate_options)
error = _get_surrogate_metric(fold_surrogate._latent_scores['test'], self.metric)
logger.info(f"\t\terror = {error}")
return error, test_index
def extract_fold_info(self, train_index, test_index, X, y):
X_train, X_test = X[train_index], X[test_index]
y_train = [y[i] for i in train_index]
y_test = [y[i] for i in test_index]
train_res, test_res = _setup_studies_for_cv(self.param_names,
X_train, X_test, y_train, y_test)
return train_res, test_res, X_test, y_test
class LeaveOneOutCrossValidation:
def __init__(self, scale, metric, interpolation_field,
interpolation_values, target_field, par_names, surrogate_options):
"""
Initialize the LOOCV.
"""
self.scale = scale
self.metric = metric
self.interpolation_field = interpolation_field
self.interpolation_values = interpolation_values
self.target_field = target_field
self.par_names = par_names
self.surrogate_options = surrogate_options
def perform_loocv(self, X, y, indices):
"""Perform Leave-One-Out Cross-Validation.
:param X: Feature matrix (training samples).
:type X: np.ndarray
:param y: Target values (ground truth).
:type y: np.ndarray
:return: Returns the index of the sample with the greatest
prediction error and the corresponding error value.
tuple: (index_of_max_error, max_error)
"""
from joblib import Parallel, delayed
loo_results = Parallel(n_jobs=1)(
delayed(self.evaluate_loo_sample)(X, y, i)
for i in indices
)
loo = {loo_idx: result for loo_idx, result in enumerate(loo_results)}
return loo
def evaluate_loo_sample(self, X, y, index):
# Leave one out: create training and test sets
logger.info(f"\tEvaluating test '{self.metric}' error for surrogate leaving out sample {index}" +
"")
info = self.extract_loo_info(index, X, y)
train_eval_info, test_eval_info, X_test, y_test = info
fold_surrogate = _fit_surrogate_model(train_eval_info, self.interpolation_field,
self.interpolation_values, test_eval_info,
self.target_field,
"kfold_validation_surrogate.joblib",
logger_on=False,
**self.surrogate_options)
error = _get_surrogate_metric(fold_surrogate._latent_scores['test'], self.metric)
logger.info(f"\t\terror = {error}")
return error, index
def extract_loo_info(self, index, X, y):
X_train = np.delete(X, index, axis=0)
y_train = y.copy()
del y_train[index]
X_test = X[index].reshape(1, -1) # Reshape for a single sample
y_test = [y[index]]
train_res, test_res = _setup_studies_for_cv(self.par_names,
X_train, X_test,
y_train, y_test)
return train_res, test_res, X_test, y_test
def _setup_studies_for_cv(p_names, train_samples, test_samples,
train_evals, test_evals):
res = _get_parameter_and_simulation_hist(p_names, train_samples, train_evals)
test_res = _get_parameter_and_simulation_hist(p_names, test_samples, test_evals)
return res, test_res
def _get_parameter_and_simulation_hist(p_names, p_samples, m_evals):
from matcal.core.study_base import StudyResults
p_hist = _format_parameter_hist(p_names, p_samples)
res_hist = _format_parameter_evaluations(m_evals)
res = StudyResults()
res._update_parameter_history(p_hist, list(p_hist.keys()))
res._update_simulation_history(res_hist, 'cv')
return res
def _format_parameter_hist(names, p_samples):
n_samples = p_samples.shape[0]
params = OrderedDict()
for idx in range(n_samples):
params[f"eval_{idx}"] = OrderedDict()
for n_idx, param_name in enumerate(names):
params[f"eval_{idx}"][param_name] = p_samples[idx, n_idx]
return params
def _format_parameter_evaluations(model_evals):
from matcal.core.data import convert_dictionary_to_data, DataCollection
results_hist = DataCollection("CV data collection")
for eval in model_evals:
results_hist.add(convert_dictionary_to_data(eval))
return results_hist
def _get_surrogate_metric(latent_scores_test, metric):
combined_score = []
for field_idx, field_name in enumerate(latent_scores_test):
if isinstance(latent_scores_test[field_name], (dict, OrderedDict)):
combined_score += list(latent_scores_test[field_name][metric])
if combined_score == len(combined_score)*[None]:
return np.nan
elif metric == 'nlpd':
return np.sum(combined_score)
else:
return np.mean(combined_score)