from abc import ABC, abstractmethod
from collections import OrderedDict
from numbers import Integral, Real
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.metrics import r2_score
from typing import Callable
from matcal.core.data import convert_dictionary_to_data
from matcal.core.logger import initialize_matcal_logger
from matcal.core.object_factory import BasicIdentifier
from matcal.core.serializer_wrapper import matcal_save
from matcal.core.state import State
from matcal.core.utilities import (check_value_is_nonempty_str,
check_item_is_correct_type,
_time_interpolate,
_find_smallest_rect,
check_value_is_bool)
logger = initialize_matcal_logger(__name__)
class _DoNothingDataTransformer:
def inverse_transform(self, source_data):
return source_data
def transform(self, source_data):
return source_data
class _VarianceDecomposition:
def __init__(self, goal_variance):
self._goal_variance = goal_variance
def generate(self, source_data, make_log_scale, logger_on=True):
return _convert_data_and_make_basis(source_data, self._goal_variance, make_log_scale,
logger_on)
class _ReconstructionDecomposition:
def __init__(self, reconstruction_tol:float):
self._reconstruction_tol = reconstruction_tol
def generate(self, source_data, make_log_scale, logger_on=True):
return _tune_data_decomposition(source_data, make_log_scale, self._reconstruction_tol,
logger_on=logger_on)
[docs]
class SurrogateGenerator:
"""
This class is responsible for taking source data and a parameter set
and generating an efficient surrogate
for predicting probe based quantities of interest. The generator uses
Principal Component Analysis(PCA) to generate an efficient representation
of the data and then trains
a predictor in the latent space identified by the PCA.
To preform these calculations sklearn is
leveraged to perform the correct scaling, PCA, and predictor training required.
"""
def __init__(self, evaluation_information, interpolation_field=None,
interpolation_locations=200,
training_fraction=.8, surrogate_type = "PCA Multiple Regressors",
regressor_type="Gaussian Process", test_eval_info=None, **regressor_kwargs):
"""
:param evaluation_information: A container of the relevant
information to form a surrogate off of
a body of data. This is intended to be based off of the results of a MatCal conducted
sampling study.
In addition, previously run surrogates joblib files can be passed to rerun the surrogate
generation process with new settings.
:type evaluation_information: :class:`~matcal.core.study_base.StudyResults`
:param training_fraction: What fraction of the source data to use as training data.
Value should be 0 < training_fraction <= 1. If training_fraction == 1,
test_evaluation_information
must be provided.
:type training_fraction: float
:param interpolation_field: the field that will be the independent field for surrogate results.
:type interpolation_field: str
:interpolation_locations: the number of interpolation locations for the
surrogate to output at or an array-like of values for the interpolation locations.
If a number of locations is given, the surrogate will linearly space the points
over the min and max value for the interpolation field for all evaluations.
:interpolation_locations: int or Array-like
:param surrogate_type: What type of surrogate to run. Details of each are detailed in the
surrogate's documentation. Currently the only available
options are "PCA Multiple Regressors"
and "PCA Monolithic Regressor". The Default is set to
"PCA Multiple Regressors" as it has
better performance but uses more memory than the monolithic surrogate.
:type surrogate_type: str
:param regressor_type: The identifier key for what core regressor
form to use as the predictor.
Only "Random Forest" and "Gaussian Process" are accepted. Currently, MatCal
uses the implementations of these tools from the sklearn library.
:type regressor_type: str
:param test_eval_info: A container of the relevant
information to test a surrogate generated
from a MatCal sampling study. This data is only used and must
be provided if training_fraction == 1.0.
:type test_evaluation_information: :class:`~matcal.core.study_base.StudyResults`
:param regressor_kwargs: A keyword selection of parameters to pass to the predictor used.
Please refer to the sklearn documentation for more information for what can be passed to
the predictors.
"""
self._interpolation_field = interpolation_field
self._input_parameter_history = None
self._interpolation_locations = interpolation_locations
self._eval_info = evaluation_information
self._test_eval_info = test_eval_info
self._model_name = None
self._state = None
self._training_fraction = training_fraction
self._surrogate_type = surrogate_type
self._regressor_type = regressor_type
self._regressor_kwargs = regressor_kwargs
self._decomp_tool = _assign_decomp(.99, None)
self._logger_on=True
self._fields_to_log_scale = []
self._fields_of_interest = None
self._train_score = OrderedDict()
self._test_score = OrderedDict()
self._check_test_evaluation_information_provided()
[docs]
def set_model_and_state(self, model_name=None, state=None):
"""
Set the evaluation set and state to select from the study results.
:param model_name: This is the model name for which the surrogate will
generate results.
If no argument is passed, the surrogate generator will
expect the study to have a single model.
:type eval_set_key: str
:param state: This specifies the state for the model for which the surrogate
will generate results. It can be either a :class:`~matcal.core.state.State`
object or a state name. If no argument is provided,
this method will assume that only a single state is
associated with the model for which the surrogate is being generated.
:type state: str or :class:`~matcal.core.state.State`
"""
if model_name is not None:
check_value_is_nonempty_str(model_name, "model_name")
self._model_name = model_name
if state is not None:
check_item_is_correct_type(state, (str, State), "state")
self._state = state
[docs]
def set_PCA_details(self, decomp_var=.99, reconstruction_error=None):
"""
:param decomp_var: What level of the total variance should be accounted for in the PCA
decomposition. Values closer to 1 will keep more modes than lower values. The more modes
kept the more difficult it can become to train the predictors. A default value of .99 is
chosen because it is a common conventional choice, and explains the vast majority of the
seen behavior, and for an appropriate data set can lead
to very few modes being retained.
:type decomp_var: float
"""
self._decomp_tool = _assign_decomp(decomp_var, reconstruction_error)
[docs]
def set_surrogate_details(self, surrogate_type="PCA Multiple Regressors",
regressor_type="Gaussian Process",
training_fraction=.8, interpolation_locations=None,
test_eval_info=None, **regressor_kwargs):
"""
This method provides an other avenue to alter the surrogate
generation parameters after initialization.
:param surrogate_type: What type of surrogate to run. Details of each are detailed in the
surrogate's documentation. Currently the only available options
are "PCA Multiple Regressors"
and "PCA Monolithic Regressor". The Default is set to
"PCA Multiple Regressors" as it has
better performance but uses more memory than the monolithic surrogate.
:type surrogate_type: str
:param training_fraction: What fraction of the source data to use as
training data. Value should be 0 < training_fraction < 1.
:type training_fraction: float
:param regressor_type: The identifier key for what core regressor
form to use as the predictor.
Only "Random Forest" and "Gaussian Process" are accepted. Currently, MatCal
uses the implementations of these tools from the sklearn library.
:type regressor_type: str
:param test_eval_info: A container of the relevant
information to test a surrogate generated
from a MatCal sampling study. This data is only used and must
be provided if training_fraction == 1.0.
:type test_evaluation_information: :class:`~matcal.core.study_base.StudyResults`
:param regressor_kwargs: A keyword selection of parameters to pass to the predictor used.
Please refer to the sklearn documentation for more information for what can be passed to
the predictors.
"""
self._training_fraction = training_fraction
self._surrogate_type = surrogate_type
self._regressor_type = regressor_type
self._regressor_kwargs = regressor_kwargs
if test_eval_info is not None:
self._test_eval_info = test_eval_info
self._check_test_evaluation_information_provided()
if (interpolation_locations is not None):
self._interpolation_locations = interpolation_locations
[docs]
def set_fields_to_log_scale(self, *field_names):
"""
For fields of interest that span over orders of magnitude it can be easier
to train to the natural log of the data rather than the raw data.
Passing fields here will inform the surrogate and the generator that
these fields should be evaluated on the natural log scale. Any predictions
given by the surrogate will be at the original scale. This just adds an
additional scaling/descaling step within it. Note that data that has values
less than or equal to zero will need to be scaled or modified by the user
prior to selecting them as an option for log scaling.
:param field_names: a series of field names to train on the log scale
:type field_names: str
"""
self._fields_to_log_scale = field_names
[docs]
def set_fields_of_interest(self, *fields_of_interest):
"""
Specify which data fields the surrogate should model.
By default the surrogate generator attempts to build a model for every
field present in the source data (aside from the independent
interpolation field). Use this method to limit the surrogate to a
user‑selected subset of fields.
:param fields_of_interest: One or more field names that should be
included in the surrogate model.
:type fields_of_interest: ``*str``
.. note::
* The independent interpolation field (if any) is never treated as a
field of interest and is automatically excluded; you should not
pass it here.
* Fields that are **not** listed will be ignored during surrogate
generation and will not appear in the surrogate’s output.
"""
if fields_of_interest:
for field in fields_of_interest:
check_value_is_nonempty_str(field, "field_of_interest")
self._fields_of_interest = fields_of_interest
[docs]
def generate(self, save_filename:str, preprocessing_function:Callable=None,
plot_n_worst:int=0)->Callable:
"""
Generates a surrogate based on the information passed to it upon initialization
:parameter save_filename: the base of a filename without any extensions
to be used to record the surrogate.
:type save_filename: str
:parameter preprocessing_function: an optional function that modifies
the model data before it is passed to the tools that generate the
surrogate model.
:type preprocessing_function: Callable
:parameter source_data_dict: a dictionary of training data from which to generate
the surrogate. Its keys are the field names for the data, rows contain
data samples and and columns are the data pts at each independent variable
data point. Not intended to be an argument for users. Passing data this way
will take the place of any other data source.
:type source_data_dict: dict(str, Array-Like)
:parameter plot_n_worst: Generate a number of plots that show the worst
recreations made by the surrogate. The number of plots made is equal to the
value passed to this argument. Any values less than 1 will result in no
plots being generated or worst analysis being performed.
:type plot_n_worst: int
:return: a callable surrogate
:rtype: :class:`~matcal.core.surrogates.MatCalPCASurrogateBase`
"""
check_value_is_nonempty_str(save_filename, "save_filename")
results = _package_surrogate_generator_input_data(self._eval_info, self._model_name,
self._state)
source_data, params = results
self._fields_of_interest = _identify_fields_of_interest(source_data,
self._interpolation_field,
self._fields_of_interest)
self._interpolation_locations = _process_interpolation_locations(source_data,
self._interpolation_locations,
self._interpolation_field)
source_dict = _process_data_for_surrogate(source_data, self._fields_of_interest,
self._interpolation_locations,
self._interpolation_field, preprocessing_function)
test_train_split_results = self._select_training_and_test_data(source_dict, params,
preprocessing_function)
train_data, test_data, train_params, test_params = test_train_split_results
combined_params = _combine_parameters(test_params, train_params)
param_ranges = _package_parameter_ranges(combined_params)
if self._logger_on:
logger.info(f'Generating and scoring {self._regressor_type} surrogates. '+
'The ideal score is 1.0.')
surrogate_class = _surrogate_selection.identify(self._surrogate_type)
new_surrogate = surrogate_class.fit(train_data, test_data, train_params, test_params,
self._fields_to_log_scale,
self._decomp_tool, self, param_ranges,
self._logger_on)
matcal_save(save_filename+".joblib", new_surrogate)
self._plot_worst_recreations(new_surrogate, params, source_dict,
plot_n_worst, save_filename)
return new_surrogate
def _check_test_evaluation_information_provided(self):
if self._training_fraction == 1.0 and self._test_eval_info is None:
raise ValueError("Test evaluations must be provided when training_fraction = 1.0.")
def _plot_worst_recreations(self, surrogate, parameters, source_data, n_worst, save_filename):
if n_worst < 1:
return
import matplotlib.pyplot as plt
plt.close('all')
n_eval = len(parameters[list(parameters.keys())[0]])
sur_predict = surrogate(parameters)
worst_sets = self._get_worst_recreations(source_data, n_worst, n_eval, sur_predict)
short, long = _find_smallest_rect(n_worst)
size_per_plt = 2
fig, ax_set = plt.subplots(short, long, figsize=(size_per_plt*long,size_per_plt*short),
constrained_layout=True)
ax_set = self._format_ax_set(n_worst, ax_set)
for ax, (field, eval_idx) in zip(ax_set, worst_sets):
self._plot_set(surrogate, source_data, sur_predict, ax, field, eval_idx)
filename = f"{save_filename}_worst.png"
plt.savefig(filename, dpi=400)
def _format_ax_set(self, n_worst, ax_set):
if n_worst > 1:
ax_set = ax_set.flatten()
else:
ax_set = [ax_set]
return ax_set
def _plot_set(self, surrogate, source_data, sur_predict, ax, field, eval_idx):
prediction_locations=surrogate.prediction_locations
if prediction_locations is not None:
ax.plot(prediction_locations, sur_predict[field][eval_idx,:], '--',
lw=3, label='surrogate')
ax.plot(prediction_locations, source_data[field][eval_idx,:], '-',
lw=3, label='source')
ax.set_xlabel(surrogate.independent_field)
else:
ax.plot(sur_predict[field][eval_idx,:], '--', lw=3, label='surrogate')
ax.plot(source_data[field][eval_idx,:], '-', lw=3, label='source')
ax.set_xlabel(surrogate.independent_field)
ax.set_title(f"{field} eval index{eval_idx}")
ax.set_ylabel(field)
ax.legend()
def _get_worst_recreations(self, source_data, n_worst, n_eval, sur_predict):
worst = _WorstEvaluations(n_worst)
for field in source_data:
field_prediction = sur_predict[field]
for eval_idx in range(n_eval):
sur_values = field_prediction[eval_idx,:]
source_values = source_data[field][eval_idx, :]
misfit = _score_recreation(sur_values, source_values)
worst.update(field, eval_idx, misfit)
worst_sets = worst.get_set()
return worst_sets
def _select_training_and_test_data(self, source_dict, params,
preprocessing_function):
if self._training_fraction == 1.0:
results = _package_surrogate_generator_input_data(self._test_eval_info,
self._model_name, self._state)
test_data, test_params = results
test_data = _process_data_for_surrogate(test_data, self._fields_of_interest,
self._interpolation_locations,
self._interpolation_field,
preprocessing_function)
train_data = source_dict
train_params = params
_check_fields_in_keys_list(self._fields_of_interest, test_data.keys(), "test data set")
else:
from sklearn.model_selection import train_test_split
first_param_key = list(params.keys())[0]
indices = np.arange(len(params[first_param_key]))
data_split_results = train_test_split(indices, train_size=self._training_fraction)
train_indices, test_indices = data_split_results
train_params, test_params = _split_dict_data_into_test_train_data_dicts(train_indices,
test_indices,
params)
train_data, test_data = _split_dict_data_into_test_train_data_dicts(train_indices,
test_indices,
source_dict)
return train_data, test_data, train_params, test_params
def _check_fields_in_keys_list(fields, data_fields, data_set_name):
for field in fields:
if field not in data_fields:
raise KeyError(f"The field of interest {field} for the surrogate was "+
f"not in the provided the {data_set_name}.")
def _split_dict_data_into_test_train_data_dicts(train_indices, test_indices, data_dict):
test_data = OrderedDict()
train_data = OrderedDict()
for key in data_dict:
data_key_array = np.array(data_dict[key])
test_data[key] = data_key_array[test_indices]
train_data[key] = data_key_array[train_indices]
return train_data, test_data
def _package_surrogate_generator_input_data(eval_info, model_name, state):
data_history, input_parameter_history = _select_relevant_study_data(eval_info, model_name, state)
param_history = _import_parameter_hist(input_parameter_history)
return data_history, param_history
def _select_relevant_study_data(evaluation_information, model_name, state):
parsed_eval_info = _parse_evaluation_info(evaluation_information, model_name)
input_parameter_history, _sim_hist_data_collection = parsed_eval_info
data_history = _select_state_data(state, _sim_hist_data_collection)
return data_history, input_parameter_history
def _select_state_data(state, sim_history_dc):
if state is None:
states = list(sim_history_dc.state_names)
if len(states) > 1:
raise ValueError(f"There are {len(states)} in the results data for the "
"surrogate generator. Specify a state for the surrogate.")
else:
state = states[0]
return sim_history_dc[state]
def _select_model(simulation_history, model_name):
if model_name is None:
model_name = list(simulation_history.keys())[0]
return model_name
def _parse_study_results(study_results, model_name, ):
input_hist = study_results.parameter_history
sim_history = study_results.simulation_history
model_name = _select_model(sim_history, model_name)
output_hist = sim_history[model_name]
return input_hist, output_hist
def _parse_evaluation_info(eval_info, model_name):
from matcal.core.study_base import StudyResults, StudyBase
if isinstance(eval_info, StudyResults):
input_hist, output_hist = _parse_study_results(eval_info, model_name,
)
elif isinstance(eval_info, StudyBase):
input_hist, output_hist = _parse_study_results(eval_info.results,
model_name)
elif isinstance(eval_info, dict):
input_hist = eval_info['input']
output_hist = eval_info['output']
else:
raise TypeError(f"Surrogate Generator can not process data of type {type(eval_info)}")
return input_hist, output_hist
def _apply_preprocessing_function(preprocessing_function, training_data_history):
if preprocessing_function is not None:
check_item_is_correct_type(preprocessing_function, Callable, "preprocessing_function")
for idx, data in enumerate(training_data_history):
processed_data = preprocessing_function(data)
if isinstance(processed_data, (dict, OrderedDict)):
processed_data = convert_dictionary_to_data(processed_data)
processed_data.set_state(data.state)
training_data_history[idx] = processed_data
return training_data_history
def _process_data_for_surrogate(source_data_list, fields_of_interest,
interpolation_locations, interpolation_field,
preprocessing_function=None):
source_data_list = _apply_preprocessing_function(preprocessing_function, source_data_list)
processed_data = _initialize_processed_data(source_data_list, fields_of_interest,
interpolation_locations)
for idx, data in enumerate(source_data_list):
for field in fields_of_interest:
data_field = data[field]
if interpolation_locations is not None and interpolation_field is not None:
data_field = np.interp(interpolation_locations,
data[interpolation_field], data_field)
processed_data[field][idx, :] = data_field
return processed_data
def _initialize_processed_data(training_data_list, fields_of_interest,
interpolation_locations):
processed_data = OrderedDict()
n_evals = len(training_data_list)
for field in fields_of_interest:
n_points = _get_n_points(interpolation_locations,
training_data_list, field)
processed_data[field] = np.zeros([n_evals, n_points])
return processed_data
def _get_n_points(interpolation_locations, training_data_list, field):
if interpolation_locations is None:
return len(training_data_list[0][field])
else:
return len(interpolation_locations)
class _WorstEvaluations:
def __init__(self, track_n):
self._n = track_n
self._scores = []
self._field_eval_sets =[]
def update(self, field, eval_idx, score):
self._scores.append(score)
self._field_eval_sets.append((field, eval_idx))
if len(self._scores) > self._n:
n_worst_args = np.argsort(self._scores).flatten()[-self._n:]
_new_scores = []
_new_sets = []
for idx in n_worst_args:
_new_scores.append(self._scores[idx])
_new_sets.append(self._field_eval_sets[idx])
self._scores = _new_scores
self._field_eval_sets = _new_sets
def get_set(self):
return self._field_eval_sets
def _init_random_forest_surrogate(n_inputs, **kwargs):
from sklearn.ensemble import RandomForestRegressor
return RandomForestRegressor(**kwargs)
def _init_gp_surrogate(n_inputs, **kwargs):
from sklearn.gaussian_process import GaussianProcessRegressor
# reference for later for anisotropic kernel generation
# aniso_kernel = RBF(1e-1 * np.ones(n_inputs), length_scale_bounds=(1e-5, 1e5))
gpr = GaussianProcessRegressor(**kwargs)
return gpr
_regressor_lookup = {"Random Forest":_init_random_forest_surrogate,
"Gaussian Process":_init_gp_surrogate}
def _initialize_regressor(regressor_type, n_inputs, regressor_kwargs):
return _regressor_lookup[regressor_type](n_inputs, **regressor_kwargs)
def _scale_data_for_surrogate(data_array, make_log=False):
"""
Expects the data as n_samples x n_features
"""
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
if make_log:
scaler = Pipeline([('log', _MatCalLogScaler()), ('standard', StandardScaler())])
else:
scaler = StandardScaler()
scaler.fit(data_array)
scaled_data = scaler.transform(data_array)
return scaled_data, scaler
def _decompose_with_pca(data, var_tol, logger_on=True):
"""
Expects data as n_samples x n_features
"""
from sklearn.decomposition import PCA
pca = PCA(n_components=var_tol, svd_solver='full')
transformed_data = pca.fit_transform(data)
if isinstance(var_tol, Integral) and logger_on:
logger.info(f"\tGenerated PCA decomposition with {pca.n_components_} components.")
elif isinstance(var_tol, Real) and logger_on:
logger.info(f"\tGenerated PCA decomposition with {pca.n_components_}"
f" components using {var_tol} variance explanation.")
elif logger_on:
logger.info(f"\tGenerated PCA decomposition with {pca.n_components_}"
f" components using option \'{var_tol}\'.")
return transformed_data, pca
def _use_pca_to_decompose_if_many_features(data, var_tol=.99, logger_on=True):
"""
Expects data as n_samples x n_features
"""
if data.shape[1] > 15:
return _decompose_with_pca(data, var_tol, logger_on)
else:
return data, _DoNothingDataTransformer()
def _import_parameter_hist(parameter_history):
return OrderedDict(parameter_history)
def _package_parameter_ranges(param_history):
out_dict = {}
for name, p in param_history.items():
out_dict[name] = (np.min(p), np.max(p))
return out_dict
def _convert_data_and_make_basis(source_data, decomp_variance, make_log_scale, logger_on=True):
scaled_data, data_scaler = _scale_data_for_surrogate(source_data, make_log_scale)
latent_data, decomposer = _use_pca_to_decompose_if_many_features(scaled_data, decomp_variance,
logger_on)
latent_data = _ensure_2d_array(latent_data)
scaled_latent_data, latent_scaler = _scale_data_for_surrogate(latent_data)
return data_scaler,decomposer,scaled_latent_data,latent_scaler
def _tune_data_decomposition(source_data, make_log_scale, reconstruction_error_tol:float=1e-3,
max_modes:int=10, logger_on=True):
scaled_data, data_scaler = _scale_data_for_surrogate(source_data, make_log_scale)
logger.info(" Tuning decomposition to meet recreation error tolerance of "+
f"{reconstruction_error_tol}, up to a limit of {max_modes} modes")
for mode_count in range(max_modes):
kept_modes = mode_count + 1
logger.info(f" Analyzing {kept_modes} mode decomposition")
latent_data, decomposer = _use_pca_to_decompose_if_many_features(scaled_data, kept_modes,
logger_on)
recreated_data = decomposer.inverse_transform(latent_data)
error = scaled_data - recreated_data
max_error_rel = np.amax(error) / np.amax(scaled_data)
logger.info(f" Recreation has max relative error of {max_error_rel}")
if max_error_rel < reconstruction_error_tol:
logger.info(f" Error below tolerance using {kept_modes} modes")
break
elif kept_modes == max_modes:
message = (f" Recreation error tolerance not met, but max modes reached, "+
f"using {max_modes} mode decomposition")
logger.info(message)
else:
logger.info(" Recreation error tolerance not met.\n")
latent_data = _ensure_2d_array(latent_data)
scaled_latent_data, latent_scaler = _scale_data_for_surrogate(latent_data)
return data_scaler,decomposer,scaled_latent_data,latent_scaler
def _record_variance_behaviors(decomposer, filename_base, field_name):
individual_variance = decomposer.explained_variance_ratio_
missing_variance = np.ones_like(individual_variance)
for i in range(len(missing_variance)):
missing_variance[i:] -= individual_variance[i]
logger.info(f" Decomposition Modes Explained Variance Ratios: {missing_variance}")
variance_filename = f"{filename_base}_{field_name}_pca_variance.png"
marker_levels = [.05, .01]
import matplotlib.pyplot as plt
plt.figure()
plt.title(field_name)
plt.xlabel('PCA Mode')
plt.ylabel('Missing Variance ratio [log10]')
plt.semilogy(missing_variance, label='PCA')
for marker in marker_levels:
plt.semilogy(np.ones_like(missing_variance) * marker, label=f"{int(marker*100)}%")
plt.legend()
plt.savefig(variance_filename, dpi=400)
[docs]
class MatCalSurrogateBase(ABC):
@abstractmethod
def fit(parameters, predictions):
""""""
@property
def scores(self):
"""
The test and train R2 scores for the surrogate.
"""
return self._r2_scores
@property
def max_errors(self):
"""
The test and train max errors for the surrogate in
the given field's units.
"""
return self._max_scores
@abstractmethod
def __call__(self, parameters)-> OrderedDict:
""""""
def __init__(self, latent_scores,
fields_to_log_scale, interp_field, interp_locs,
parameter_scaler, regressors, decomposers, data_scalers,
latent_scalers, param_ranges):
"""Surrogate abstract base class from which all surrogates should be derived
in MatCal.
"""
self._latent_scores = OrderedDict()
self._average_scores = OrderedDict()
self._max_scores = OrderedDict()
self._r2_scores = OrderedDict()
self._latent_scores['train'] = latent_scores[0]
self._latent_scores['test'] = latent_scores[1]
self._fields_to_log_scale = fields_to_log_scale
self._interpolation_field = interp_field
self._interpolation_locations = interp_locs
self._parameter_scaler = parameter_scaler
self._regressors = regressors
self._decomposers = decomposers
self._data_scalers = data_scalers
self._latent_scalers = latent_scalers
self._param_ranges = param_ranges
self._enforce_training_data_parameter_range = True
[docs]
def enforce_training_data_parameter_range(self, enforce_training_data_parameter_range=True):
"""
By default the surrogate will error if called with a parameter set outside of the
parameter ranges used in the training data set. To call the surrogate for parameters
outside of the training data range, call this method with the argument set to False.
Adherence to the training data range can be reactivated by calling this method
with the argument set to True.
:param ignore_training_range: bool flag to ignore training data range.
:type ignore_training_range:
"""
check_value_is_bool(enforce_training_data_parameter_range,
"enforce_training_data_parameter_range")
self._enforce_training_data_parameter_range = enforce_training_data_parameter_range
def _set_native_space_scores(self, average_scores, max_scores, r2_scores):
self._average_scores['train'] = average_scores[0]
self._average_scores['test'] = average_scores[1]
self._max_scores['train'] = max_scores[0]
self._max_scores['test'] = max_scores[1]
self._r2_scores['train'] = r2_scores[0]
self._r2_scores['test'] = r2_scores[1]
[docs]
def set_parameter_ranges(self, *args, **param_ranges):
"""
Update the admissible parameter ranges that the user can call the surrogate to evaluate.
The surrogate stores, for each input parameter, a lower and upper bound that
define the region of parameter space where the surrogate is considerd valid. When
the surrogate is called, values that fall outside of these ranges trigger a
``RuntimeError`` unless :meth:`enforce_training_data_parameter_range` has been
disabled.
Only keyword arguments are accepted; each keyword corresponds to a parameter
name and must map to a two‑element sequence ``(lower, upper)`` describing the
allowed range for that parameter.
:param param_ranges: Mapping of parameter names to (lower, upper) bounds.
:type param_ranges: ``dict`` or ``OrderedDict`` where each value is an
iterable of two ``float``/``int`` values.
:raises RuntimeError: If any positional arguments are supplied, or if a
required parameter is missing from ``param_ranges``.
:raises RuntimeError: If a supplied parameter name is not part of the
surrogate’s ``parameter_order`` (i.e., it was not present in the training
data).
:raises ValueError: If the lower bound is greater than the upper bound for any
parameter.
:raises TypeError: If either bound is not a real number (i.e., not an instance
of :class:`numbers.Real`).
**Example**
>>> surrogate.set_parameter_ranges(
... temperature=(300.0, 800.0),
... pressure=(1e5, 5e5)
... )
"""
valid_params = self._parameter_scaler.parameter_order
if args:
raise RuntimeError(f"{self.__class__.__name__}.set_parameter_ranges "+
"does not accept positional arguments. "+
"All inputs must be keyword arguments.")
for param in param_ranges:
if param not in valid_params:
raise RuntimeError(f"The parameter '{param}' is not a valid "+
f"parameter for the surrogate. Valid parameters include "+
f"{valid_params}.")
range_values = np.asarray(param_ranges[param])
if range_values.shape != (2,):
raise RuntimeError("Each parameter range must only have two values. "+
f"Received values with shape {range_values.shape} "+
f"for parameter '{param}'.")
for idx, value in enumerate(range_values):
hi_low = ["lower", "upper"]
if not isinstance(value, Real):
raise TypeError(f"The {hi_low[idx]} bound for parameter '{param}' "+
f"must be a real number. Received '{value}' of type {type(value)}.")
if range_values[1] < range_values[0]:
raise ValueError(f"The range for parameter '{param}' has a lower bound greater "+
"than the upper bound. The lower bound is specified first! "+
f"Received {range_values[0]} and then {range_values[1]} as "
"the lower bound and upper bound, respectively.")
for param in self._parameter_scaler.parameter_order:
if param not in param_ranges:
raise RuntimeError(f"The parameter '{param}' is required for the surrogate "+
"and was not provided for the desired updated parameter ranges. "
f"Received ranges for parameters {list(param_ranges.keys())}.")
self._param_ranges = param_ranges
def _get_decomp_results(train_data, test_data, make_log_scale, decomposition_tool, logger_on=True):
combined_data = np.vstack([train_data, test_data])
decomp_results = decomposition_tool.generate(combined_data, make_log_scale, logger_on)
data_scaler, decomposer, scaled_latent_data, latent_scaler = decomp_results
scaled_latent_test_data = _apply_decomposing_and_scaling_to_data(test_data, data_scaler,
decomposer, latent_scaler)
scaled_latent_train_data = _apply_decomposing_and_scaling_to_data(train_data, data_scaler,
decomposer, latent_scaler)
return scaled_latent_test_data, scaled_latent_train_data, data_scaler, decomposer, latent_scaler
def _apply_decomposing_and_scaling_to_data(data, data_scaler, decomposer,
latent_scaler):
"""Transform test data after scalers and decomposition tool have already
been trained on training data."""
scaled_data = data_scaler.transform(data)
latent_data = decomposer.transform(scaled_data)
latent_data = _ensure_2d_array(latent_data)
scaled_latent_test_data = latent_scaler.transform(latent_data)
return scaled_latent_test_data
def _scale_parameters(test_params, train_params, fields_to_log_scale):
combined_params = _combine_parameters(test_params, train_params)
parameter_scaler_set = _make_parameter_scaler_set(combined_params, fields_to_log_scale)
scaled_test_parameters = parameter_scaler_set.transform_as_array(test_params)
scaled_train_parameters = parameter_scaler_set.transform_as_array(train_params)
return parameter_scaler_set, scaled_test_parameters, scaled_train_parameters
def _combine_parameters(test_params, train_params):
combined_params = OrderedDict()
combined_params.update(train_params)
for field in combined_params:
combined_params[field] = np.hstack((combined_params[field], test_params[field]))
return combined_params
def _train_parameter_to_pca_weight_regressor(scaled_train_params, scaled_latent_train_data,
regressor_type, regressor_kwargs,
regressor_init_func):
n_parameters = scaled_train_params.shape[1]
regressor = regressor_init_func(regressor_type, n_parameters, regressor_kwargs)
scaled_latent_train_data = _ensure_2d_array(scaled_latent_train_data)
regressor.fit(scaled_train_params, scaled_latent_train_data)
return regressor
def _score_regressor_in_latent_space(regressor, scaled_train_params,
scaled_latent_train_data, scaled_test_params,
scaled_latent_test_data, logger_on):
train_score = _calculate_performance_metrics(regressor, scaled_train_params,
scaled_latent_train_data)
scaled_latent_test_data = _ensure_2d_array(scaled_latent_test_data)
test_score = _calculate_performance_metrics(regressor, scaled_test_params,
scaled_latent_test_data)
training_fraction = scaled_train_params.shape[0]/(scaled_train_params.shape[0]+
scaled_test_params.shape[0])
if logger_on:
logger.info(f"\tTraining Complete: {training_fraction*100} % of data used for training")
return train_score, test_score
def _print_scores(latent_train_score, latent_test_score, native_train_score, native_test_score):
for field in latent_train_score:
logger.info(f"\nSurrogate scores for {field}: ")
score_message = f"\tTrain:\n"
score_message += f"\t\tlatent space score: {latent_train_score[field]['score']}\n"
score_message += f"\t\tnative space score: {native_train_score[field]}\n"
score_message += f"\tTest:\n"
score_message += f"\t\tlatent space score: {latent_test_score[field]['score']}\n"
score_message += f"\t\tnative space score: {native_test_score[field]}\n"
logger.info(score_message)
def _calculate_additional_score_metrics(train_score, test_score):
train_score = _convert_instances_to_stats(train_score)
test_score = _convert_instances_to_stats(test_score)
return train_score, test_score
def _calculate_performance_metrics(regressor, param, data):
metrics = []
if param.shape[0] > 1:
# R2 score only valid for more than one sample
metrics.append(regressor.score(param, data))
else:
metrics.append(None)
metrics.append(nlpd(regressor, param, data))
metrics.append(rmse(regressor, param, data))
return metrics
[docs]
def nlpd(regressor, input_values, evals):
""" Negative Log Predictive Density
Only applicable for GPR
"""
if isinstance(regressor, _modal_regressor):
nlpd = np.zeros(evals.shape[1])
for idx, reg in enumerate(regressor._mode_regressors):
try:
nlpd[idx] = _calculate_nlpd(reg, input_values, evals[:, idx])
except:
nlpd[idx] = None
return nlpd
elif not isinstance(regressor, _modal_regressor):
try:
nlpd = _calculate_nlpd(regressor, input_values, evals)
except:
return None
return nlpd
def _calculate_nlpd(gpr, input_values, y_true):
mu, std = gpr.predict(input_values, return_std=True)
y_true = y_true.ravel()
mu = mu.ravel()
var = std ** 2
residuals = y_true - mu
nlpd = 0.5 * np.mean( np.log(2 * np.pi * var) + (residuals ** 2) / var)
return nlpd
def _mse(regressor, input_values, evals):
if isinstance(regressor, _modal_regressor):
mse = np.zeros(evals.shape[1])
for idx, reg in enumerate(regressor._mode_regressors):
mse[idx] = _calculate_mse(reg, input_values, evals[:, idx])
else:
mse = _calculate_mse(regressor, input_values, evals)
return mse
def _calculate_mse(regressor, input_values, y_true):
y_pred = regressor.predict(input_values)
residuals = y_true - y_pred
mse = np.mean( residuals ** 2)
return mse
def rmse(regressors, input_values, evals):
rmse = _mse(regressors, input_values, evals) ** 0.5
return rmse
def _convert_instances_to_stats(scores):
score_stats = OrderedDict()
score_stats['score'] = np.array(scores[0])
score_stats['nlpd'] = np.array(scores[1])
score_stats['rmse'] = np.array(scores[2])
return score_stats
class _modal_regressor:
def __init__(self, regressor_type:str, n_inputs, regressor_kwargs):
self._mode_regressors = []
self._regressor_type = regressor_type
self._regressor_kwargs = regressor_kwargs
self._n_inputs = n_inputs
def _initialize_regressors(self, n_inputs, n_modes):
for mode_idx in range(n_modes):
self._mode_regressors.append(_initialize_regressor(self._regressor_type, n_inputs,
self._regressor_kwargs))
def fit(self, input_values, mode_values):
n_modes = mode_values.shape[1]
n_inputs = input_values.shape[1]
if self._n_inputs != n_inputs:
err_msg = f"Inconsistent input size for regressor {self._n_inputs} vs {n_inputs}."
raise ValueError(err_msg)
self._initialize_regressors(n_inputs, n_modes)
for mode_idx, regressor in enumerate(self._mode_regressors):
regressor.fit(input_values, np.atleast_2d(mode_values[:, mode_idx]).T)
@property
def num_modes(self):
return len(self._mode_regressors)
def score(self, input_values, mode_values):
mode_scores = np.zeros(self.num_modes)
for mode_idx, regressor in enumerate(self._mode_regressors):
mode_scores[mode_idx] = regressor.score(input_values, mode_values[:, mode_idx])
return mode_scores
def predict(self, input_values):
n_predictions = input_values.shape[0]
prediction = np.zeros([n_predictions, self.num_modes])
for mode_idx, regressor in enumerate(self._mode_regressors):
prediction[:, mode_idx] = regressor.predict(input_values)
return prediction
[docs]
class MatCalPCASurrogateBase(MatCalSurrogateBase):
@property
def parameter_order(self):
"""
A list of strings that describe the correct order to input parameters
into the surrogate prediction.
"""
return self._parameter_scaler.parameter_order
@property
def independent_field(self):
"""
The name of the independent field used in the surrogate prediction
"""
return self._interpolation_field
@property
def prediction_locations(self):
"""
The array of locations that the surrogate predicts at
"""
return self._interpolation_locations
def __call__(self, *args, batch_evaluate=False, transpose=False, **kwargs)-> OrderedDict:
"""
By executing a call on the surrogate object. [Example my_surrogate(my_parameters)]
return a dictionary of the different field predictions
:param parameters: parameter values to evaluate the surrogate at.
If not a dict, the parameters are expected to be in an order as detailed by
:meth:`~matcal.core.surrogates.MatCalPCASurrogateBase.parameter_order`.
As an array, the input should have shape (n_samples, n_parameters).
:type parameters: np.ndarray or list or dict
:return: A dictionary of the various field predictions.
:rtype: dict
"""
param_names = self._parameter_scaler.parameter_order
params_array = _process_surrogate_args_call(param_names, *args,
batch_evaluate=batch_evaluate, transpose=transpose, **kwargs)
params_dict = _convert_param_array_to_dict(params_array, param_names)
_check_params_in_range(params_dict, self._param_ranges,
self._enforce_training_data_parameter_range)
scaled_params = self._parameter_scaler.transform_as_array(params_dict)
multiple_samples = False
if scaled_params.shape[0] > 1:
multiple_samples=True
results = OrderedDict()
if self._interpolation_field is not None:
results[self._interpolation_field] = self._interpolation_locations
for field in self._regressors:
scaled_latent_prediction = self._regressors[field].predict(scaled_params)
scaled_latent_prediction = scaled_latent_prediction.reshape(scaled_params.shape[0], -1)
results[field] = self._transform_data_to_native_space(field, scaled_latent_prediction)
if not multiple_samples:
results[field] = results[field].flatten()
return results
def _transform_data_to_native_space(self, field, scaled_latent_data):
latent_scaler = self._latent_scalers[field]
latent_prediction = latent_scaler.inverse_transform(scaled_latent_data)
scaled_prediction = self._decomposers[field].inverse_transform(latent_prediction)
prediction = self._data_scalers[field].inverse_transform(scaled_prediction)
return prediction
def _fit(train_data, test_data, train_params, test_params, fields_to_log_scale,
decomposition_tool, surrogate_generator, param_ranges,
regressor_initializer, surrogate_class, logger_on=True):
regressors = OrderedDict()
decomposers = OrderedDict()
data_scalers = OrderedDict()
latent_scalers = OrderedDict()
latent_train_scores = OrderedDict()
latent_test_scores = OrderedDict()
param_scaler, scaled_test_params, scaled_train_params = _scale_parameters(test_params,
train_params,
fields_to_log_scale)
for field in train_data:
if logger_on:
logger.info(f"\nGenerating Surrogate for {field}")
make_log_scale = field in fields_to_log_scale
decomp_results = _get_decomp_results(train_data[field], test_data[field],
make_log_scale, decomposition_tool,
logger_on=logger_on)
scaled_latent_test_data, scaled_latent_train_data = decomp_results[0:2]
data_scaler, decomposer, latent_scaler = decomp_results[2:5]
decomposers[field] = decomposer
data_scalers[field] = data_scaler
latent_scalers[field] = latent_scaler
regressor_type = surrogate_generator._regressor_type
regressor_kwargs = surrogate_generator._regressor_kwargs
regressor = _train_parameter_to_pca_weight_regressor(scaled_train_params,
scaled_latent_train_data,
regressor_type, regressor_kwargs,
regressor_initializer)
regressors[field] = regressor
decomposers[field] = decomposer
data_scalers[field] = data_scaler
latent_scalers[field] = latent_scaler
latent_scores = _score_regressor_in_latent_space(regressor, scaled_train_params,
scaled_latent_train_data, scaled_test_params,
scaled_latent_test_data, logger_on)
latent_scores = _calculate_additional_score_metrics(latent_scores[0], latent_scores[1])
latent_train_scores[field], latent_test_scores[field] = latent_scores
latent_scores = [latent_train_scores, latent_test_scores]
surrogate = surrogate_class(latent_scores, fields_to_log_scale,
surrogate_generator._interpolation_field,
surrogate_generator._interpolation_locations,
param_scaler, regressors,
decomposers, data_scalers, latent_scalers, param_ranges)
native_space_scores = _get_scores_in_native_data_space(surrogate, test_params, test_data,
train_params, train_data)
average_scores, max_scores, r2_scores = native_space_scores
surrogate._set_native_space_scores(average_scores, max_scores, r2_scores)
if logger_on:
_print_scores(*latent_scores, *r2_scores)
return surrogate
def _process_surrogate_args_call(param_names, *args,
batch_evaluate=False, transpose=False, **kwargs,):
if batch_evaluate:
processed_args = np.asarray(args[0], dtype=float)
if transpose:
processed_args = processed_args.T
elif len(args)==1 and isinstance(args[0], dict or OrderedDict):
if _all_params_exist_dict(param_names, args[0]):
params = _convert_param_dict_to_array(args[0], param_names)
batch_evaluate=True
return _process_surrogate_args_call( param_names, params, batch_evaluate=batch_evaluate,
transpose=transpose)
elif len(args) == len(param_names) and len(kwargs) == 0:
processed_args = np.asarray(args, dtype=float)
if transpose:
processed_args = processed_args.T
elif len(args) == 0 and len(kwargs) == len(param_names):
param_ordered_list = []
if _all_params_exist_dict(param_names, kwargs):
for param_name in param_names:
param_ordered_list.append(kwargs[param_name])
return _process_surrogate_args_call(param_names, *param_ordered_list, transpose=transpose)
else:
raise RuntimeError("Surrogate model was not called correctly. The input parameters "+
"are likely of the incorrect format. Check input")
return processed_args
def _all_params_exist_dict(param_names, data_dict):
for param_name in param_names:
if param_name not in data_dict:
error_message = (f"All required parameters were not passed to the surrogate. "+
f"Required parameters include:\n{param_names}\n"+
f"Received parameters include:\n{data_dict.keys()}")
raise RuntimeError(error_message)
return True
def _check_params_in_range( params_dict, param_ranges, enforce_range=True):
if not isinstance(param_ranges, (dict, OrderedDict)):
param_ranges = _convert_param_array_to_dict(param_ranges, params_dict.keys())
for param in params_dict:
param_values = params_dict[param]
bad_values = param_values > np.max(param_ranges[param])
bad_values = (param_values < np.min(param_ranges[param])) | bad_values
if bad_values.any() and enforce_range:
raise RuntimeError(f"The passed parameter values for parameter '{param}' contains "+
"values outside of the trained parameter range of "+
f"{param_ranges[param][0]} to "+
f"{param_ranges[param][1]}.\n{param_values}")
def _get_scores_in_native_data_space(surrogate, test_params, test_data, train_params, train_data):
average_train_score = _get_field_scores(surrogate, train_params, train_data,
_average_l2_error_norm)
max_train_score = _get_field_scores(surrogate, train_params, train_data,
_max_error_inf_norm)
r2_train_score = _get_field_scores(surrogate, train_params, train_data, r2_score)
average_test_score = _get_field_scores(surrogate, test_params, test_data,
_average_l2_error_norm)
max_test_score = _get_field_scores(surrogate, test_params, test_data,
_max_error_inf_norm)
r2_test_score = _get_field_scores(surrogate, test_params, test_data, r2_score,
needs_length=True)
average_scores = (average_train_score, average_test_score)
max_scores = (max_train_score, max_test_score)
r2_scores = (r2_train_score, r2_test_score)
return average_scores, max_scores, r2_scores
def _get_field_scores(surrogate, params, data, score_function, needs_length=False):
surrogate_data = surrogate(params)
scores = OrderedDict()
eval_score=True
for field in surrogate_data:
surogate_data_field = np.atleast_2d(surrogate_data[field])
if needs_length:
if len(surogate_data_field) > 1:
eval_score = surogate_data_field.shape[1] > 1
else:
eval_score = len(surogate_data_field) > 1
if field != surrogate._interpolation_field:
if eval_score:
scores[field] = score_function(data[field], surogate_data_field)
else:
scores[field] = np.nan
return scores
#TODO - make function for surrogate class
# if not isinstance(decomposer, _DoNothingDataTransformer):
# _record_variance_behaviors(decomposer, support_information['save_filename'], field)
[docs]
class MatCalMonolithicPCASurrogate(MatCalPCASurrogateBase):
"""
This class takes the results of the :meth:`~matcal.core.surrogates.SurrogateGenerator.generate`
and create a callable object that can generate predictions.
:param surrogate_information: The file path to or the lists of information generated by
:meth:`~matcal.core.surrogates.SurrogateGenerator.generate`.
"""
name = "PCA Monolythic Regressor"
def fit(train_data, test_data, train_params, test_params,
fields_to_log_scale, decomposition_tool,
surrogate_generator, param_ranges, print_score=True):
return MatCalPCASurrogateBase._fit(train_data, test_data, train_params,
test_params, fields_to_log_scale,
decomposition_tool, surrogate_generator,
param_ranges,
_initialize_regressor, __class__,
print_score)
[docs]
class MatCalMultiModalPCASurrogate(MatCalPCASurrogateBase):
"""
This class takes the results of the :meth:`~matcal.core.surrogates.SurrogateGenerator.generate`
and create a callable object that can generate predictions.
:param surrogate_information: The file path to or the lists of information generated by
:meth:`~matcal.core.surrogates.SurrogateGenerator.generate`.
"""
name = "PCA Multiple Regressors"
def fit(train_data, test_data, train_params, test_params,
fields_to_log_scale, decomposition_tool,
surrogate_generator, param_ranges, print_score=True):
return MatCalPCASurrogateBase._fit(train_data, test_data, train_params, test_params,
fields_to_log_scale, decomposition_tool,
surrogate_generator, param_ranges, _modal_regressor,
__class__, print_score)
_surrogate_selection = BasicIdentifier()
_surrogate_selection.register(MatCalMultiModalPCASurrogate.name, MatCalMultiModalPCASurrogate)
_surrogate_selection.register(MatCalMonolithicPCASurrogate.name, MatCalMonolithicPCASurrogate)
def _ensure_2d_array(active_array):
if not isinstance(active_array, np.ndarray):
active_array = np.array(active_array)
if active_array.ndim == 1:
#Reshape 1D vector to be column vector (nsamples, 1) - single feature
active_array = active_array.reshape(-1, 1)
return np.atleast_2d(active_array)
class _MatCalSurrogateWrapper:
def __init__(self, surrogate):
self._surrogate = surrogate
def __call__(self, **parameters):
results = self._surrogate(parameters)
return results
def _score_recreation(sur_values, source_values):
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler().fit(source_values.reshape(1, -1))
scaled_source = scaler.transform(source_values.reshape(1, -1))
scaled_sur = scaler.transform(sur_values.reshape(1, -1))
delta = scaled_source - scaled_sur
return np.linalg.norm(delta.flatten())
def _assign_decomp(decomp_var, reconstruction_error):
if reconstruction_error == None:
if not (isinstance(decomp_var, str) and decomp_var == "mle"):
if decomp_var <=0 or (decomp_var > 1 and not isinstance(decomp_var, int)):
err_str = ("Total Explained Variance Decomposition ratio must be between 0 and 1,"+
f"if a float, or 1 or greater if an integer.\nPassed {decomp_var}.")
raise RuntimeError(err_str)
return _VarianceDecomposition(decomp_var)
else:
if reconstruction_error >= 1 or reconstruction_error <=0 :
err_str = ("Reconstruction tolerance must be between 0 and 1, "+
f"passed {reconstruction_error}.")
raise RuntimeError(err_str)
return _ReconstructionDecomposition(reconstruction_error)
def _process_interpolation_locations(output_history, interpolation_locations, interpolation_field):
if interpolation_field is None:
return None
elif isinstance(interpolation_locations, (np.ndarray)):
return interpolation_locations
elif isinstance(interpolation_locations, Integral):
return _get_interpolation_field(output_history, interpolation_field,
interpolation_locations)
else:
raise ValueError("The surrogate generator expects an integer or array-like "
f"set of values. Received variable of type {type(interpolation_locations)}.")
def _get_interpolation_field(output_history, interpolation_field, n_interp):
start, end = _identify_common_region(output_history, interpolation_field)
return np.linspace(start, end, n_interp)
def _identify_common_region(output_history, interpolation_field):
start = None
end = None
for current_array in output_history:
cur_max = np.max(current_array[interpolation_field])
cur_min = np.min(current_array[interpolation_field])
if start is None:
start = cur_min
if end is None:
end = cur_max
start = np.max([start, cur_min])
end = np.min([end, cur_max])
return start,end
def _identify_fields_of_interest(sim_list, indep_field, user_fields_of_interest):
sim_data_fields = sim_list[0].field_names
if user_fields_of_interest is not None:
fields_of_interest = user_fields_of_interest
_check_fields_in_keys_list(fields_of_interest, sim_data_fields, "training data set")
else:
fields_of_interest = sim_data_fields
if indep_field is not None and indep_field in fields_of_interest:
fields_of_interest.remove(indep_field)
return fields_of_interest
class _MatCalLogScaler(BaseEstimator):
def __init__(self):
self._offset = None
self._lower_limit = 1
def fit(self, data, y=None, **fit_params):
# interface designed to align with that of sklearn's preprocessors
self._check_data(data)
self._offset = np.min(data, axis=0)
def transform(self, data):
self._check_data(data)
return np.log10(data - self._offset + self._lower_limit)
def fit_transform(self, data, y=None, **fit_params):
self.fit(data, y, **fit_params)
return self.transform(data)
def inverse_transform(self, trans_data):
self._check_data(trans_data)
return np.power(10, trans_data) + self._offset - self._lower_limit
def _check_data(self, data):
if not isinstance(data, np.ndarray):
raise TypeError("Passed data must be of type np.ndarray")
if data.ndim != 2:
msg = f"Data must be of dimension 2, passed array of dimension {data.ndim}."
if data.ndim == 1:
msg += ("\nOne dimensional data can be mapped by using reshape(-1, 1) and "+
"reshape(1, -1), making it an array of multiple samples or multiple "+
"features, respectively. ")
raise IndexError(msg)
class _ParameterScalerSet:
def __init__(self):
self._scalers = OrderedDict()
def add_scaler(self, field_name, scaler):
self._scalers[field_name] = scaler
@property
def parameter_order(self):
return list(self._scalers.keys())
def _arbitrary_transform_to_array(self, parameters, transform_method_name):
if isinstance(parameters, (dict, OrderedDict)):
parameters = _convert_param_dict_to_array(parameters, self.parameter_order)
for param_index, (param_name, scaler) in enumerate(self._scalers.items()):
param_data = parameters[:, param_index]
param_data = _ensure_2d_array(param_data)
method_to_call = getattr(scaler, transform_method_name)
parameters[:, param_index] = method_to_call(param_data).flatten()
return parameters
def transform_as_array(self, parameter_dict):
return self._arbitrary_transform_to_array(parameter_dict, "transform")
def inverse_transform_as_array(self, parameter_dict):
return self._arbitrary_transform_to_array(parameter_dict, "inverse_transform")
def _make_parameter_scaler_set(parameter_fields, fields_to_log_scale):
parameter_scaler_set = _ParameterScalerSet()
for parameter_name, parameter_values in parameter_fields.items():
use_log_scale = parameter_name in fields_to_log_scale
prepared_params = _ensure_2d_array(parameter_values)
s_parameters, field_scaler = _scale_data_for_surrogate(prepared_params, use_log_scale)
parameter_scaler_set.add_scaler(parameter_name, field_scaler)
return parameter_scaler_set
def _init_param_array(parameter_dict):
n_params = len(parameter_dict)
n_evals = _get_eval_count(parameter_dict)
return np.zeros((n_evals, n_params))
def _get_eval_count(parameter_dict):
first_key = list(parameter_dict.keys())[0]
first_param_vals = parameter_dict[first_key]
if isinstance(first_param_vals, (float, int)):
n_evals = 1
else:
n_evals = len(first_param_vals)
return n_evals
def _convert_param_array_to_dict(passed_params, parameter_order):
if isinstance(passed_params, (dict, OrderedDict)):
return passed_params
else:
passed_params = np.array(passed_params)
out = OrderedDict()
for param_i, param_name in enumerate(parameter_order):
out[param_name] = passed_params.reshape(-1, len(parameter_order))[:,param_i]
return out
def _convert_param_dict_to_array(passed_params_dict, parameter_order):
array = _init_param_array(passed_params_dict)
for param_i, param_name in enumerate(parameter_order):
array[:,param_i] = passed_params_dict[param_name]
return array
def _average_l2_error_norm(test_values, surrogate_values):
#expects arrays to be sized (n_samples, n_qois)
return (np.linalg.norm(test_values - surrogate_values)
/ test_values.shape[1])
def _max_error_inf_norm(test_values, surrogate_values):
#expects arrays to be sized (n_samples, n_qois)
return np.linalg.norm((test_values - surrogate_values).flatten(), ord=np.inf)