"""
The data module contains classes and functions for converting
data into the structure that MatCal requires for studies.
"""
from matcal.core.serializer_wrapper import _format_serial
import numpy as np
from itertools import count
from collections import OrderedDict
import numbers
from abc import ABC, abstractmethod
import os
from matcal.core.state import SolitaryState, State, StateCollection
from matcal.core.utilities import (ContainerCollectionBase, check_value_is_real_between_values,
check_value_is_positive_integer, check_value_is_bool,
check_item_is_correct_type, check_value_is_nonempty_str)
from matcal.core.logger import initialize_matcal_logger
logger = initialize_matcal_logger(__name__)
[docs]
class Data(np.ndarray):
"""
Data is the base data structure for all MatCal data. This data structure is
an interface to data that are
used for MatCal studies. It is derived from a NumPy ndarrays
but adds name and state, so that the data can be
uniquely identified.
**Construction / initialization**
`Data` may only be constructed from:
1. A NumPy structured/record array (i.e., an ``np.ndarray`` with
``dtype.names is not None`` or an ``np.record``), or
2. A ``dict``/``OrderedDict`` mapping field names to array-like values.
If a dictionary is passed, it is converted using
:func:`~matcal.core.data.convert_dictionary_to_data`.
Passing anything else (including a plain/unstructured ``np.ndarray``)
raises a built-in ``TypeError``.
Accessing fields through field names returns the data for that field in
either 1D or 2D arrays. If the data is 'global' such as time or load,
the data will be reported as a 1D [n_times] array. If the data is field
based the data is reported back as a 2D
[n_times, n_points] array.
"""
_id_numbers = count(0)
def __new__(cls, data, state=SolitaryState(), name=None):
"""
:param data: data to be added to the MatCal data object. Must be either:
(1) a NumPy structured/record array, or
(2) a dict/OrderedDict of field_name -> array-like, which will be converted
using :func:`~matcal.core.data.convert_dictionary_to_data`.
:type data: numpy.ndarray | numpy.record | dict | OrderedDict
:param state: the state associated with the data. If none is passed it
will be assigned the default state.
:type state: :class:`~matcal.core.state.State`
:param name: the name for the data. By default it is set to "data_set_#"
name with a unique id number. If
:func:`~matcal.core.data_importer.FileData` is used to import data,
then its name is set to the
filename from which the data was imported.
:type name: str
"""
# If a dictionary is passed, convert it to a Data object first
if isinstance(data, (dict, OrderedDict)):
data = convert_dictionary_to_data(data)
# Enforce ndarray/record type
check_item_is_correct_type(
data, (np.ndarray, np.record), "data"
)
# Enforce structured array only (plain ndarray is not allowed)
if isinstance(data, np.ndarray) and data.dtype.names is None:
raise TypeError(
"The data passed to MatCal Data must be a NumPy structured/record array "
"(dtype.names must not be None) or a dictionary."
)
obj = np.asarray(data).view(cls) # view will cast all internal arrays as cls[Data] as well
obj._state = None
obj.set_state(state)
obj._id_number = next(cls._id_numbers)
obj._name = "data_set_{}".format(obj._id_number)
if name is not None:
obj.set_name(name)
return obj
def __array_finalize__(self, obj):
if obj is None: return
self._state = getattr(obj, '_state', None)
self._id_number = getattr(obj, '_id_number', None)
self._name = getattr(obj, '_name', None)
def __array_wrap__(self, out_array, context=None, return_scalar=False):
return np.ndarray.__array_wrap__(self, out_array, context, return_scalar)
[docs]
def set_state(self, state):
"""
Sets the optional state value for the data.
:param state: The state for this particular data set.
:type state: :class:`~matcal.core.state.State`
"""
check_item_is_correct_type(state, State, "state")
self._state = state
[docs]
def set_name(self, name):
"""
Sets the optional name value for the data. If the data is imported using
:func:`~matcal.core.data_importer.FileData`, the name is set to the
filename from which the data was
imported. If no name is passed and the data was created from the
constructor or another function,
an arbitrary name will be given to the data.
:param name: The name for this particular data set.
:type name: str
"""
check_item_is_correct_type(name, str, "name")
self._name = name
[docs]
def add_field(self, field_name, data):
"""
Adds a new 1D field to the data and returns the
updated data. The original data object is not modified.
The added field must have the
same length as the existing fields.
:param field_name: The name of the new field to be added.
:type field_name: str
:param data: the data to be added.
:type data: ArrayLike
:return: the data with newly added field
:rtype: `~matcal.core.data.Data`
"""
check_item_is_correct_type(field_name, str, "field_name")
if len(data) != self.length:
error_str = (f"Field to be added '{field_name}' has length " +
f"{len(data)}. It must be of length " +
f"{self.length}.")
raise ValueError(error_str)
data_dict = convert_data_to_dictionary(self)
data_dict.update({field_name:data})
updated_data = convert_dictionary_to_data(data_dict)
updated_data.set_state(self.state)
return updated_data
def _check_field_in_data(self, field):
if field not in self.field_names:
raise KeyError(f"The field \"{field}\" does not exist. "+
f"The following fields exist in the data:\n{self.field_names}")
@property
def length(self):
"""
:return: The length of the data for each field.
:rtype: int
"""
if len(self.shape) > 0:
return self.shape[0]
else:
return 1
@property
def state(self):
"""
:return: The physical state of the data corresponding to the experimental conditions.
:rtype: :class:`~matcal.core.state.State`
"""
return self._state
@property
def field_names(self):
"""
:return: list of strings of all field names.
:rtype: list
"""
field_names = self.dtype.names
if field_names is None:
return []
else:
return list(field_names)
def keys(self):
return self.field_names
@property
def name(self):
"""
Returns the name for the data. If the data is imported using
:func:`~matcal.core.data_importer.FileData`, the name is set to the
filename from which the data was
imported. If no name is passed and the data was created from the
constructor or another function,
an arbitrary name will be given to the data.
:rtype name: str
"""
return self._name
[docs]
def remove_field(self, field):
"""Returns a copy of the Data class with the desired field removed. The
original data object is not modified.
:rtype: :class:`~matcal.core.data.Data`
"""
check_item_is_correct_type(field, str, "field")
self._check_field_in_data(field)
updated_field_names = self.field_names
updated_field_names.remove(field)
if len(updated_field_names) == 0:
# Must return a valid empty structured array (plain ndarray is not allowed)
empty = np.zeros(0, dtype=[])
return Data(empty, self.state, self.name)
else:
return self[updated_field_names].copy()
[docs]
def rename_field(self, old_name, new_name):
"""
Returns the Data class with the desired the field name changed.
Note that the old name is overwritten and not
saved.
:param old_name: the old field name that is to be updated
:type old_name: str
:param new_name: the replacement field name for the field name that is being changed.
:type new_name: str
"""
check_item_is_correct_type(old_name, str, "old_name")
check_item_is_correct_type(new_name, str, "new_name")
self._check_field_in_data(old_name)
field_names = self.field_names
name_to_change_index = field_names.index(old_name)
field_names[name_to_change_index] = new_name
try:
self.dtype.names = field_names
except ValueError as e:
logger.error("Cannot rename the field as given. "+
"Likely repeated name or some issue with the name. "+
f"The old field name is '{old_name}' and "+
f"the new field name is '{new_name}'. The existing field names are:\n"+
f"{self.field_names}")
raise e
return self
def __eq__(self, value) -> bool:
return super().__eq__(value)
def __reduce__(self):
# Get the parent's __reduce__ tuple
pickled_state = super(Data, self).__reduce__()
# Create our own tuple to pass to __setstate__,
# but append the __dict__ rather than individual members.
new_state = pickled_state[2] + (self.__dict__,)
# Return a tuple that replaces the parent's __setstate__ tuple with our own
return (pickled_state[0], pickled_state[1], new_state)
def __setstate__(self, state):
self.__dict__.update(state[-1]) # Update the internal dict from state
# Call the parent's __setstate__ with the other tuple elements.
super(Data, self).__setstate__(state[0:-1])
[docs]
class DataCollection(ContainerCollectionBase):
"""
A collection of :class:`~matcal.core.data.Data` objects to be used for a study. No
restrictions are enforced on the type or contact of :class:`~matcal.core.data.Data` objects
added to the collection. However, they are meant to hold data that is related by experiment
and should generally have the same if not similar fields.
Exceptions to this rule may be
when two different types of data are taken from the same experiment using different
data acquisition hardware. In this case it may make sense to store
:class:`~matcal.core.data.Data` objects
in a data collection with different fields.
.. warning::
Not all MatCal objects or methods support data collections with
:class:`~matcal.core.data.Data`
objects that contain different field names. Appropriate errors should be
used if such data collections
are passed to them.
"""
_collection_type = Data
def __init__(self, name, *data_sets):
"""
:param name: The name of this data collection.
:type name: str
:param data_sets: The :class:`~matcal.core.data.Data` sets to be added to the collection.
:type data_sets: list(:class:`~matcal.core.data.Data`) or
:class:`~matcal.core.data.Data`.
:raises CollectionValueError: If name is a an empty string.
:raises CollectionTypeError: If name is not a string and the data objects
to be added to the collection are
not of the correct type.
"""
self._field_names = []
super().__init__(name, *data_sets)
@property
def field_names(self):
"""
:return: a list of field names that exist in the data collection.
These may not exist in all data objects or states and may only be
in one data object in the collection.
"""
self._field_names = []
for data_list in self._items.values():
for data in data_list:
for field_name in data.field_names:
if field_name not in self._field_names:
self._field_names.append(field_name)
return self._field_names
@property
def state_names(self):
"""
:return: the names of the :class:`~matcal.core.state.State` objects in the data collection.
:rtype: list(str)
"""
state_names = []
for state, item in self._items.items():
state_names.append(state.name)
return state_names
@property
def states(self):
"""
:return: The state :class:`~matcal.core.state.State` objects in the data collection.
:rtype: :class:`~matcal.core.state.StateCollection`
"""
sc = StateCollection('data states')
for key, item in self._items.items():
sc.add(key)
return sc
[docs]
def state_field_names(self, state):
"""
Return all the field names in all Data objects for the given state.
Note that not all Data objects need to have all field names. This
is just a comprehensive list of field names that exist across all Data
objects in the DataCollection for this state.
:param state: the state of interest to get all field names for
:type state: str or :class:`~matcal.core.state.State`
:return: a list of all field names
:rtype: list(str)
"""
state_field_names = []
for data in self.__getitem__(state):
state_field_names += data.field_names
return list(set(state_field_names))
[docs]
def state_common_field_names(self, state):
"""
Return all the field names common to all Data objects for the given state.
:param state: the state of interest to get all field names for
:type state: str or :class:`~matcal.core.state.State`
:return: a list of all field names that are common to all data sets for that state
:rtype: list(str)
"""
from copy import deepcopy
state_field_names = self.state_field_names(state)
common_state_field_names = deepcopy(state_field_names)
for state_field_name in state_field_names:
for data in self.__getitem__(state):
if state_field_name not in data.field_names:
if state_field_name in common_state_field_names:
common_state_field_names.remove(state_field_name)
return common_state_field_names
[docs]
def add(self, item):
"""
Add a :class:`~matcal.core.data.Data` object to a data collection.
:param item: Data object to be added to the data collection.
:type item: :class:`~matcal.core.data.Data`
"""
if isinstance(item, list):
for it in item: self.add(it)
return
super()._check_item_is_correct_type(item)
self._add_data(item)
self._add_new_field_names(item)
[docs]
def remove_field(self, field_name):
"""
Removes the field from all data sets stored
in the data collection that have the passed field name.
If the data collection does not have any data sets with the
specified field name, a warning will be sent to MatCal output.
:param field_name: the name of the field to remove
:type field_name: str
"""
valid_field = self._valid_field_name(field_name)
if valid_field:
for state, data_list in self._items.items():
for index, data in enumerate(data_list):
if field_name in data.field_names:
self._items[state][index] = data.remove_field(field_name)
else:
logger.warning(f"The field \"{field_name}\" is not in DataCollection "+
f"\"{self.name}\" and will not be removed")
def _valid_field_name(self, field_name):
if not isinstance(field_name, str):
raise self.CollectionTypeError(f"The field passed to the DataCollection "
f"\"remove_field\" method must be a string. Received \"{field_name}\".")
return field_name in self.field_names
def _add_data(self, data):
if data.state.name in self.state_names and data.state not in self.states.values():
raise self.NonUniqueStateNameError(self.state_names, data.state, self.states)
super().add(data.state, data)
def _add_new_field_names(self, data):
for field_name in data.field_names:
if field_name not in self._field_names:
self._field_names.append(field_name)
def __getitem__(self, key):
if isinstance(key, str):
if key in self.state_names:
key = self.states[key]
else:
err_msg = f"State named \"{key}\" not in the DataCollection \"{self.name}\".\n"
err_msg += f"Available states are: {list(self.states.keys())}"
raise KeyError(err_msg)
if not isinstance(key, State):
err_msg = (f"Getting items from DataCollection requires a state or "
f"state name as a key. Passed a variable of type \"{type(key)}\', "
f"value \"{key}\".")
raise KeyError(err_msg)
return self._items[key]
class NonUniqueStateNameError(RuntimeError):
def __init__(self, names, new_state, states):
message = ('Attempting to add a different data state with '+
'the same name as a different data state:')
message += f'\nExisting names: {names}'
message += f"\n New state: {new_state}"
message += f"\n Existing states: {states}"
super().__init__(message)
def dumps(self, ignore_point_data=False):
dump_data = {}
for state in self.states:
state_data = self.__getitem__(state)
dump_data[state] = []
for sd in state_data:
processed_data = dict(convert_data_to_dictionary(sd))
p_data = {}
for name, value in processed_data.items():
converted_data = np.atleast_1d(value.astype(float))
if ignore_point_data and converted_data.ndim>1 and converted_data.shape[1]>1:
continue
p_data[name] = converted_data.tolist()
dump_data[state].append(p_data)
return dump_data
[docs]
def plot(self, independent_field: str, dependent_field: str, plot_function=None,
figure=None, show: bool=True, labels: str=None, state: State=None,
block: bool=True, **kwargs) -> None:
"""
Plots the data with the independent field on the horizontal axis and
dependent field on the vertical axis. It plots each state on a separate figure.
:param independent_field: field name to use as horizontal axis variable.
:type independent_field: str
:param dependent_field: field name to use as vertical axis variable.
:type dependent_field: str
:param plot_function: a valid matplotlib plot function such as plot, semilogx, etc
:type plot_function: matplotlib plot function
:param figure: a valid matplotlib Figure for the data collection to be plotted on.
:type figure: matplotlib Figure
:param show: option to show or not show plot
:type show: bool
:param labels: provide a label for each data set other than the data set name.
This can take the form of "suppress",
"{user_provided_label}" or "{user_provided_label} (#)".
If "suppress" is passed, none of the data
will be labeled. If "{user_provided_label}" is passed,
the first data set will be labeled once
as "{user_provided_label}" where "user_provided_label" can be any
user provided string. The rest will not be
labeled. If the
last option is used, where labels="{user_provided_label} (#)", each data set will
be labeled with "{user_provided label}" and a number based on the
order it is pulled from the data set.
For example, a data collection with three data sets and this
function called with labels="experiment (#)", the labels
will be "experiment 1", "experiment 2", "experiment 3".
:type labels: str
:param state: specify a specific state to plot using the state name or state object
:type state: :class:`~matcal.core.state.State` or str
:param block: stops Python from executing code after the plot figure is created.
Follow-on code
will not execute until the figure is closed.
Default is to block (e.g. block=True).
:type block: bool
:param kwargs: a set of valid keyword argument pairs for the Matplotlib plotting function
:type kwargs: dict(str, str)
"""
import matplotlib.pyplot as plt
user_state = state
user_figure = figure
if plot_function is None:
plot_function = plt.plot
label_count = 0
if user_state is not None:
self._plot_state_data_list(self[user_state], plt, plot_function, dependent_field,
independent_field, user_figure, labels, label_count, kwargs)
else:
for state in self.keys():
self._plot_state_data_list(self[state], plt, plot_function, dependent_field,
independent_field, user_figure, labels, label_count, kwargs)
if show:
plt.show(block=block)
def _plot_state_data_list(self, data_state_list, plt, plot_function,
dependent_field, independent_field, user_figure, labels, label_count, kwargs):
state =data_state_list[0].state
self._set_figure(user_figure, state, independent_field, dependent_field)
if "linestyle" not in kwargs.keys() and "marker" not in kwargs.keys():
kwargs["linestyle"] = ''
kwargs["marker"] = '.'
for data_index, data in enumerate(data_state_list):
if independent_field in data.field_names and dependent_field in data.field_names:
label = self._get_plot_label(labels, data.name, data_index, label_count)
label_count +=1
plot_function(data[independent_field], data[dependent_field], label=label, **kwargs)
else:
logger.warning(f"Skipping plotting for data \"{data.name}\" in DataCollection "+
f"\"{self.name}\". The independent and dependent "
"fields are not in the data.")
plt.xlabel(independent_field)
plt.ylabel(dependent_field)
if not user_figure:
plt.title(state.name)
plt.legend()
def _set_figure(self, figure, state: State, independent_field: str, dependent_field: str):
import matplotlib.pyplot as plt
valid_user_fig_provided = self._check_valid_user_fig_provided_for_plot(figure)
if not valid_user_fig_provided:
figure = plt.figure(state.name+" "+independent_field+" "+dependent_field,
constrained_layout=True)
else:
plt.figure(figure.number, constrained_layout=True)
return figure
def _check_valid_user_fig_provided_for_plot(self, figure):
from matplotlib.figure import Figure
valid_user_fig_provided = False
if figure is not None and isinstance(figure, Figure):
valid_user_fig_provided = True
elif figure is not None:
raise self.CollectionTypeError("Invalid figure passed to DataCollection.plot(). "
f"Received type \"{type(figure)}\", but expected a matplotlib Figure.")
return valid_user_fig_provided
def _get_plot_label(self, labels, data_name, index, label_count):
if labels is None:
return self._get_default_label(data_name)
default_label = self._get_default_label(data_name)
if labels=="suppress":
return "_"+default_label
elif "(#)" in labels:
return labels.replace("(#)", str(index))
elif labels is not None:
if label_count > 0:
labels = "_"+labels
return labels
def _get_default_label(self, data_name):
split_data_name = os.path.split(data_name)
default_label = os.path.split(data_name)[-1]
return default_label
[docs]
def get_data_by_state_values(self, **kwargs):
"""
Get a :class:`~matcal.core.data.DataCollection` containing
data that has the state variables with values passed into
this method.
:param kwargs: keyword/value pairs of the desired state variables
:type kwargs: dict(str, str or float)
:return: all data in the data collection that have states with the
state variable and values specified in kwargs.
:rtype: :class:`~matcal.core.data.DataCollection`
"""
data_col_with_state_vals = DataCollection(self._get_sub_selection_name(kwargs))
all_data_has_state_vals = True
for state in self.keys():
values_in_state = self._dict_in_state_params(kwargs, state)
if values_in_state:
data_col_with_state_vals.add(self[state])
all_data_has_state_vals = (values_in_state and
all_data_has_state_vals)
if all_data_has_state_vals:
data_col_with_state_vals = self
return data_col_with_state_vals
def _get_sub_selection_name(self, kwargs):
new_name = self.name+"_with_state_params"
for key, val in kwargs.items():
new_name += f"_{key}_{val}"
return new_name
def _dict_in_state_params(self, dictionary, state):
for key, val in dictionary.items():
if key not in state.params:
return False
else:
if state.params[key] != val:
return False
return True
[docs]
def get_states_by_state_values(self, **kwargs):
"""
Get a :class:`~matcal.core.state.StateCollection` containing
the states with the state variable values passed into
this method.
:param kwargs: keyword/value pairs of the desired state variables
:type kwargs: dict(str, str or float)
:return: a state collection that has all states with the
state variables and values specified in kwargs.
:rtype: :class:`~matcal.core.state.StateCollection`
"""
return self.get_data_by_state_values(**kwargs).states
[docs]
def report_statistics(self, independent_field:str) -> dict:
"""
Get a summary of the statistics information. The method will report the
mean and standard deviation for all dependent fields across the independent
within each state. The data will be collocated to a common set of locations
within the independent field. Statistics near the limits of the independent
field range may be less accurate than of those in the interior because of
errors due to extrapolation that may occur in the collocation process.
:param independent_field: The string to designate which field should be
interpreted as the independent field.
:type independent_field: str
:return: a dictionary that contains the statistical measurements of the data fields.
the data is organized by [field_name][state_name][stat_name]
:rtype: dict
"""
stats_tool = DataCollectionStatistics()
stats_report = {}
for state, state_data in self._items.items():
stats_report[state.name] = stats_tool.generate_state_statistics(independent_field,
self, state)
return stats_report
class DataCollectionStatistics:
def __init__(self, num_interpolation_points=None, sort_ascending=True, interpolation_tool=None,
**interp_keyword_arguments):
"""
This class can be used to calculate basic statistics on the data in a data collection
by state and field. By default it calculates the mean and standard deviation of the data.
It can also be used to calculate the percentiles at user specified percentile values.
This class assumes the data is repeated 1D data with an independent field. It will
interpolate the data to a common set of independent field values and then
calculate the statistics at each of these values. For the independent field,
the maximum and minimum values will be the maximum and minimum values
for that field from all repeats for the state of interest.
:param num_interpolation_points: Select the number of independent fields to interpolate the
dependent fields data to. By default this sets the number of points to the
average length of all repeat data for the specified state and field.
:type num_interpolation_points: int
:param sort_ascending: sort the data according to the independent variable
before interpolating.
:type sort_ascending: bool
:param interpolation_tool: The data for a given state and independent field are interpolated
to a common set of independent field values. Optionally select the interpolation method
used with this parameter. The interpolation method by default is NumPy.interp.
To change the interpolation method, pass in an appropriate SciPy 1D
interpolation class or function such as `make_interp_spline` or other
similar interpolation tool that builds a callable interpolation object
that takes the independent values and dependent values with optional
keyword arguments on initialize. The callable object created will return
interpolation values for passed independent variable values
:type interpolation_tool: func or class
:interp_keyword_arguments: optional keyword arguments that are valid
for the given interpolation tool. If an interpolation tool
is not passed, these must be valid keyword arguments for NumPy.interp.
:interp_keyword_arguments: dict(str,(float,str))
"""
self._analysis_to_perform = OrderedDict()
self._analysis_to_perform['mean'] = _mean
self._analysis_to_perform['std dev'] = _std_dev
self._num_interpolation_points = None
self.set_number_of_interpolation_points(num_interpolation_points)
self._interpolation_tool = None
self._interpolation_kwargs = {}
self.set_interpolation_tool(interpolation_tool, **interp_keyword_arguments)
self._percentiles = []
self._sort_ascending = False
self.set_sort_ascending(sort_ascending)
def set_interpolation_tool(self, interpolation_tool=None, **interp_keyword_arguments):
"""
Change the interpolation tool and associated keyword arguments.
:param interpolation_tool: The data for a given state and independent field are interpolated
to a common set of independent field values. Optionally select the interpolation method
used with this parameter. The interpolation method by default is NumPy.interp.
To change the interpolation method, pass in an appropriate SciPy 1D
interpolation class or function such as `make_interp_spline` or other
similar interpolation tool that builds a callable interpolation object
that takes the independent values and dependent values with optional
keyword arguments on initialize. The callable object created will return
interpolation values for passed independent variable values
:type interpolation_tool: func or class
:interp_keyword_arguments: optional keyword arguments that are valid
for the given interpolation tool. If an interpolation tool
is not passed, these must be valid keyword arguments for NumPy.interp.
:interp_keyword_arguments: dict(str,(float,str))
"""
self._interpolation_tool=interpolation_tool
self._interpolation_kwargs = interp_keyword_arguments
def set_sort_ascending(self, sort_ascending=True):
"""
Automatically sort the data so that the independent variable is ascending.
This is necessary for some interpolation methods to get a valid interpolation.
:param sort_ascending: Flag to turn sorting off/on.
:type sort_ascending: bool
"""
check_value_is_bool(sort_ascending, "sort_ascending")
self._sort_ascending = sort_ascending
def set_number_of_interpolation_points(self, num_interpolation_points):
"""
Manually select the number of points for the interpolation of
the dependent fields.
:param num_interpolation_points: the number of points for interpolation
:type num_interpolation_points: int
"""
if num_interpolation_points is not None:
check_value_is_positive_integer(num_interpolation_points, "num_interpolation_points")
self._num_interpolation_points = num_interpolation_points
def set_percentiles_to_evaluate(self, *percentiles):
"""
Set the percentiles to evaluate for the data sets of interest.
Calling this will remove any preexisting percentiles
previously requested.
:param percentiles: Specify percentiles of interest for the data set.
:type percentiles: list(float)
"""
self._percentiles = []
for percentile in percentiles:
check_value_is_real_between_values(percentile, 0,100,
"percentiles", closed=True)
self._percentiles.append(percentile)
def generate_state_statistics(self, indep_field, data_collection, state):
"""
Calculate the requested statistics on the DataCollection of
interest for the given state and independent field.
:param indep_field: the desired independent field for interpolation and
subsequent statistics calculation.
:type indep_field: str
:param data_collection: the data collection that includes the data
for the statistics calculations.
:type data_collection::class:`~matcal.core.data.DataCollection`
:param state: the state of interest for the current calculation
:type state: :class:`~matcal.core.state.State`
:return: A nested dictionary of the statistics results. The first key is the
name of the fields for which the statistics were evaluated. The second key is the
statistic that was calculated. These include "mean", "std dev", and "percentile_#".
:rtype: dict(str, Array-Like[float])
"""
self._verify_generate_stats_inputs(data_collection, indep_field, state)
interped_data = self._interpolate_state_data_to_common_independent_variable(indep_field,
data_collection, state)
state_report = {f"locations": interped_data.pop(indep_field)}
for field in interped_data:
field_data = np.array(interped_data[field])
field_report = {}
for stat_name, stat_fun in self._analysis_to_perform.items():
field_report[stat_name] = stat_fun(field_data)
if self._percentiles:
for percentile in self._percentiles:
field_report[f"percentile_{percentile}"] = np.percentile(field_data, percentile,
axis=0)
state_report[field] = field_report
return state_report
def _verify_generate_stats_inputs(self, data_collection, indep_field, state):
if state not in data_collection:
raise KeyError(f"The state \"{state.name}\" is not in "+
f"data collection \"{data_collection.name}\".")
for idx, data in enumerate(data_collection[state]):
if indep_field not in data.field_names:
raise KeyError(f"The independent field \"{indep_field}\" is not in "+
f"data set {idx} in the data collection \"{data_collection.name}\".")
def _interpolate_state_data_to_common_independent_variable(self, indep_field,
data_collection, state, indep_field_data_collection=None):
if indep_field_data_collection is None:
indep_field_data_collection = data_collection
state_data = data_collection[state]
field_names = data_collection.state_field_names(state)
interp_locations = self._make_interpolation_domain(indep_field,
indep_field_data_collection, state)
interpolated_data = self._generate_interpolated_data_per_state_by_field(indep_field,
state_data, field_names, interp_locations, indep_field_data_collection[state])
return interpolated_data
def _generate_interpolated_data_per_state_by_field(self, indep_field,
state_data,field_names,interp_locations, indep_field_state_data):
interpolated_data_by_field = {}
for field in field_names:
interpolated_data_by_field[field] = []
interpolated_data_by_field[indep_field] = interp_locations
for cur_data, indep_field_cur_data in zip(state_data, indep_field_state_data):
for field in field_names:
if field in cur_data.field_names and field != indep_field:
try:
interped_data = self._interpolate(interp_locations,
indep_field_cur_data[indep_field], cur_data[field])
interpolated_data_by_field[field].append(interped_data)
except Exception as e:
self._raise_stats_error(indep_field, field, cur_data, e)
return interpolated_data_by_field
def _interpolate(self, interp_locs, independent_data, dependent_data):
if self._sort_ascending:
sorted_indices = np.argsort(independent_data)
independent_data = independent_data[sorted_indices]
dependent_data = dependent_data[sorted_indices]
if self._interpolation_tool is None:
return np.interp(interp_locs, independent_data, dependent_data,
**self._interpolation_kwargs)
else:
interpolator = self._interpolation_tool(independent_data, dependent_data,
**self._interpolation_kwargs)
return interpolator(interp_locs)
def _raise_stats_error(self, indep_field, f_name, cur_data, exception):
message = "Error Generating Stats for:\n"
message += f"Field: {f_name}\n"
message += f"Indep Data: {cur_data[indep_field]}\n"
message += f"Field Data: {cur_data[f_name]}\n"
message += f"{repr(exception)}"
raise RuntimeError(message)
def _make_interpolation_domain(self, indep_field, data_collection, state):
state_data = data_collection[state]
n_points = self._get_number_of_field_points(indep_field, state_data)
state_max = None
state_min = None
for data in state_data:
x = data[indep_field]
state_max = self._update_term(state_max, x, np.max)
state_min = self._update_term(state_min, x, np.min)
interp_locations = np.linspace(state_min, state_max, n_points)
return interp_locations
def _get_number_of_field_points(self, indep_field, state_data):
if self._num_interpolation_points is None:
n_points_list = []
for data in state_data:
n_points_list.append(len(data[indep_field]))
n_points = int(np.ceil(np.average(n_points_list)))
return n_points
else:
return self._num_interpolation_points
def _update_term(self, term, x, f):
if term is None:
r_val = f(x)
else:
r_val = f([f(x), term])
return r_val
def _std_dev(data_array):
return np.std(data_array, axis=0)
def _mean(data_array):
return np.mean(data_array, axis=0)
[docs]
class Scaling(object):
"""
This class is used to apply a scaling multiplier and
an offset to a specific field of a :class:`~matcal.core.data.Data` class.
The offset is applied first, followed by the scale factor.
"""
class ScalingTypeError(Exception):
def __init__(self, *args):
super().__init__(*args)
def __init__(self, field, scalar=1, offset = 0):
"""
:param field: The name of the field to be scaled.
:type field: str
:param scalar: The magnitude of the scaling to be applied to the specified field.
:type scalar: float
:param offset: The magnitude of the offset to be applied to the specified field.
:type scalar: float
:raises TypeError: If the scaling object name and the field names are not strings.
:raises TypeError: If the scalar value passed in is not a number.
"""
if not isinstance(field, str):
raise self.ScalingTypeError(f"The field to be scaled must be of type str. \'{field}\' "+
"was passed as the field which is of "+
f"type \'{type(field)}\'.")
self._field = field
self._scalar = None
self._offset = offset
self.set_scalar(scalar)
@property
def field(self):
"""
:return: The name of the field to be scaled by the scaling object.
:rtype: str
"""
return self._field
[docs]
def apply_to_data(self, data):
"""
:param data: the data object with the desired field to be scaled.
:type data: :class:`~matcal.core.data.Data`
:return: The data object with the appropriately scaled field
:rtype: :class:`~matcal.core.data.Data`
"""
scaled_data = data.copy()
scaled_field_data = self._scalar * (scaled_data[self._field] + self._offset)
scaled_data[self.field] = scaled_field_data
return scaled_data
[docs]
def set_scalar(self, value):
"""
Sets the scalar value to a different value if needed.
:param value: the new scalar value for the scaling object.
:type value: float
"""
if isinstance(value, numbers.Real):
self._scalar = value
else:
raise self.ScalingTypeError(f"Received an invalid number \"{value}\" when setting the "+
f"scalar value in Scaling object scaling \"{self.field}\"")
@property
def scalar(self):
"""
:return: the scaling value for the scaling object.
"""
return self._scalar
@property
def offset(self):
"""
:return: the offset value for the scaling object.
"""
return self._offset
[docs]
class ScalingCollection(ContainerCollectionBase):
"""
A collection of :class:`~matcal.core.data.Scaling` objects. This is
used to combine multiple scaling objects so that
more than one scaling function or value can be applied to a data set.
This class is used when applying different
scaling functions or values to different fields within a data set.
"""
_collection_type = Scaling
def __init__(self, name, *scalings):
"""
:param name: the name for the scaling collection used for identification for error catching.
:type name: str
:param scalings: The scaling items to be added to the collection. They
can be passed in as comma separated
list or an unpacked list. Unpack a list using \\*list_name.
:type scalings: list(:class:`~matcal.core.data.Scaling`)
:raises CollectionValueError: If name is an empty string.
:raises CollectionTypeError: If name is not a string and the
scalings to be added to the collection are
not of the correct type.
"""
super().__init__(name, *scalings)
[docs]
def add(self, scaling):
"""
Adds a :class:`~matcal.core.data.Scaling` object to the
collection.
:param scaling: scaling object to be added to the collection
:type scaling: :class:`~matcal.core.data.Scaling`
"""
self._check_item_is_correct_type(scaling)
super().add(scaling.field, scaling)
[docs]
class DataConditionerBase(ABC):
"""
This is the base class for MatCal data conditioners. The data conditioners
attempt to modify all data sets for a state in a single evaluation set such that
the experimental data is on the order of -1 to 1. The data is modified
according to:
.. math::
\\mathbf{d}_c = \\frac{\\mathbf{d}-o}{s}
where :math:`\\mathbf{d}` is a vector created from all data sets included in a single state,
:math:`o` is a scalar data offset calculated from :math:`\\mathbf{d}`, and :math:`s` is
a scalar scale factor calculated from :math:`d`.
If :math:`s=0` after it is calculated, the base conditioner class will
change the scale factor such
that :math:`s=mean\\left(\\left|\\mathbf{d}\\right|\\right)` or the
average of the absolute value of the relevant data.
If :math:`s` is still near zero, then the vector is full of zero or near zero values and
the base conditioner sets the scale factor to :math:`s=1`
The calculation of :math:`o` and :math:`s` is specific to the derived
conditioner class. The abstract methods
:meth:`~matcal.core.data.DataConditionerBase.get_scale_for_data_field`
and :meth:`~matcal.core.data.DataConditionerBase.get_scale_for_data_field`
define the calculations
for :math:`o` and :math:`s`. A custom user class can be defined to implement
conditioning of the user's choice by including only the implementation of these
methods.
"""
def __init__(self):
self._zero_tolerance = 1e-14
self._field_names = []
self._field_offsets = OrderedDict()
self._field_scales = OrderedDict()
self._initialized = False
[docs]
def apply_to_data(self, passed_data):
"""
Apply the conditioner to a data set. This can be any data set and
does not need to be the one that was used to initialize the data set.
If a field name in a the data set passed to this method
was not in the data set used to
initialize the conditioner, the passed data field is returned unchanged.
:param passed_data: a data set to be conditioned using an initialized conditioner.
:type passed_data: :class:`~matcal.core.data.Data`
"""
self._check_data(passed_data)
self._verify_initialized()
conditioned_data = self._condition_data(passed_data)
return conditioned_data
def _verify_initialized(self):
if not self._initialized:
raise RuntimeError("Cannot condition passed data. Conditioner is not initialized.")
def _condition_data(self, passed_data):
conditioned_data = self._initialize_conditioned_array(passed_data)
for field_name in conditioned_data.field_names:
logger.debug("Conditioning Field Name: {}".format(field_name))
data_to_condition = conditioned_data[field_name]
if self._is_noise_field(field_name):
field_name_no_noise = self._get_non_noise_field_name(field_name)
conditioned_data[field_name] = self._apply_field_conditioning(field_name_no_noise,
data_to_condition, self._condition_noise)
else:
conditioned_data[field_name] = self._apply_field_conditioning(field_name,
data_to_condition, self._condition_field)
conditioned_data.set_state(passed_data.state)
conditioned_data.set_name("conditioned "+ passed_data.name)
return conditioned_data
def _initialize_conditioned_array(self, passed_data):
formats = []
for field_name in passed_data.field_names:
if passed_data.dtype[field_name] == np.dtype('int'):
formats.append(float)
else:
formats.append(passed_data.dtype[field_name])
updated_dtype = np.dtype({'names':passed_data.field_names, 'formats':formats})
conditioned_data = passed_data.copy().astype(updated_dtype)
return conditioned_data
def _apply_field_conditioning(self, field_name, passed_data, condition_func):
if field_name in self._field_names:
conditioned_field_data = condition_func(field_name, passed_data)
return conditioned_field_data
else:
return passed_data
def _get_non_noise_field_name(self, field_name):
field_name_no_noise = "_".join(field_name.split("_")[:-1])
return field_name_no_noise
def _check_data(self, passed_data):
if not isinstance(passed_data, Data):
passed_type = type(passed_data)
raise TypeError("Conditioner needs a class derived from the MatCal Data class. "
f"Passed object of type {passed_type}")
def _is_noise_field(self, field_name):
return field_name.split("_")[-1].lower() == "noise"
def _condition_field(self, field_name, field_data):
offset = self._field_offsets[field_name]
scale = self._field_scales[field_name]
conditioned_field_data = (field_data - offset) / scale
return conditioned_field_data
def _condition_noise(self, field_name, field_data):
scale = self._field_scales[field_name]
conditioned_field_data = (field_data) / scale
return conditioned_field_data
[docs]
def initialize_data_conditioning_values(self, data_list):
"""
Initialize the conditioner for a given list of data sets from
a single state of a data collection.
:param: list of data sets to be used for conditioning. Generally passed
as a ``__getitem_`` of a state from a :class:`~matcal.core.data.DataCollection`.
:param type: list(:class:`~matcal.core.data.Data`)
"""
combined_data = combine_data_sets_in_data_list(data_list)
for field_name, values in combined_data.items():
if self._is_noise_field(field_name):
continue
self._field_names.append(field_name)
self._save_data_conditioning_values_for_field(field_name, values)
self._verify_valid_initialization()
self._initialized = True
def _verify_valid_initialization(self):
if not self._field_names:
raise ValueError("Initialization failed. Initialization data list likely empty.")
def _save_data_conditioning_values_for_field(self, field_name, field_data):
self._field_offsets[field_name] = self.get_offset_for_data_field(field_data)
scale = self.get_scale_for_data_field(field_data)
if scale < self._zero_tolerance:
scale = np.max(np.abs(field_data))
if scale < self._zero_tolerance:
scale = 1.0
self._field_scales[field_name] = scale
[docs]
@abstractmethod
def get_scale_for_data_field(self, field_data):
"""
Calculates the scale factor :math:`s` for the data conditioner given
all values for a specific field name from the data collection
for a single state. This scale factor will be used to condition all
data with this state and field name when compared using an evaluation set.
:param field_data: all data for a specific field from a single state of
a data collection used to calculate an objective in an evaluation set.
:type field_data: ArrayLike
"""
[docs]
@abstractmethod
def get_offset_for_data_field(self, field_data):
"""
Calculates the offset :math:`o` for the data conditioner given
all values for a specific field name from the data collection
for a single state. This offset will be used to condition all
data with this state and field name when compared using an evaluation set.
:param field_data: all data for a specific field from a single state of
a data collection used to calculate an objective in an evaluation set.
:type field_data: ArrayLike
"""
[docs]
class ReturnPassedDataConditioner(DataConditionerBase):
"""
This data conditioner will make no changes to the data sets
included in the evaluation set. Its scale and offset values are
given by :math:`s=1` and :math:`o=0`
"""
[docs]
def get_scale_for_data_field(self, field_data):
return 1.0
[docs]
def get_offset_for_data_field(self, field_data):
return 0.0
[docs]
class RangeDataConditioner(DataConditionerBase):
"""
This data conditioner will condition data such that each
field from the initializing data list is in the range of
0 to 1. To do so the scale and offset values are calculated
as :math:`s=max\\left(\\mathbf{d}\\right)-min\\left(\\mathbf{d}\\right)` and
:math:`o=min\\left(\\mathbf{d}\\right)`.
"""
def _calculate_field_range(self, field_data):
range = (np.max(field_data) - np.min(field_data))
return range
[docs]
def get_scale_for_data_field(self, field_data):
return self._calculate_field_range(field_data)
[docs]
def get_offset_for_data_field(self, field_data):
return np.min(field_data)
[docs]
class MaxAbsDataConditioner(DataConditionerBase):
"""
This data conditioner will condition data such that each
field from the initializing data list is in the range of
-1 to 1. To do so, the scale values are calculated
as :math:`s=max\\left(\\left|\\mathbf{d}\\right|\\right)` and
:math:`o=0`. Note that this only guarantees the
data will be in the range of -1 to 1, it does not enforce
that the data spans the entirety of -1 to 1.
"""
[docs]
def get_scale_for_data_field(self, field_data):
return np.max(np.abs(field_data))
[docs]
def get_offset_for_data_field(self, field_data):
return 0.0
[docs]
class AverageAbsDataConditioner(DataConditionerBase):
"""
This data conditioner will condition data such that each
field from the initializing data list is on the order of
-1 to 1. To do so, the scale values are calculated
as :math:`s=mean\\left(\\left|\\mathbf{d}\\right|\\right)` and
:math:`o=0`.
Note that this likely puts the all data in the field
on the order of -1 to 1, but the data could be well outside
of this range depending on the values in the data.
"""
[docs]
def get_scale_for_data_field(self, field_data):
return np.average(np.abs(field_data))
[docs]
def get_offset_for_data_field(self, field_data):
return 0.0
[docs]
def combine_data_sets_in_data_list(data_list):
"""
Given a list of :class:`~matcal.core.data.Data` objects,
this function will return a dictionary where each
item is all values from the same field in from all data sets and
the key for the items are the field names.
:param data_list: list of data sets that will be combined.
:type data_list: list(:class:`~matcal.core.data.Data`)
"""
combined_data = OrderedDict()
for data in data_list:
for field_name in data.field_names:
field_data = data[field_name]
if not field_name in combined_data.keys():
combined_data[field_name] = field_data
else:
combined_data[field_name] = np.append(combined_data[field_name], field_data)
return combined_data
def _scale_data(scaling_collection, data):
scaled_data = data.copy()
for field_name in data.field_names:
if field_name in scaling_collection.keys():
for scale in scaling_collection[field_name]:
scaled_data = scale.apply_to_data(scaled_data)
return scaled_data
[docs]
def scale_data_collection(data_collection, field_name, scale, offset=0):
"""
Scales all data sets in a data collection that have
the requested field. It will apply the correct
scale factor and offset to each data set and return
a new data collection that is scaled. Note that if
both are used, the offset is applied first and then
the results are scaled. A new scaled data collection
is returned and the old one is unmodified.
:param data_collection: the data collection to be scaled
:type data_collection: :class:`~matcal.core.data.DataCollection`
:param field_name: the name of the field to be modified
:type fied_name: str
:param scale: a linear scale factor to scale the field
:type scale: float
:param offset: a constant offset to be added to the field
:type offset: float
:return: new scaled data collection
:rtype: :class:`~matcal.core.data.DataCollection`
"""
check_item_is_correct_type(data_collection, DataCollection, "data_collection")
check_value_is_nonempty_str(field_name, "field_name")
check_item_is_correct_type(scale, numbers.Real, "scale")
check_item_is_correct_type(offset, numbers.Real, "offset")
name = "scale_{}".format(field_name)
scaling_collection = ScalingCollection(name, Scaling(field_name, scale, offset))
scaled_data_collection = DataCollection(name+"_{}".format(data_collection.name))
for state in data_collection.keys():
for data in data_collection[state]:
scaled_data_collection.add(_scale_data(scaling_collection, data))
return scaled_data_collection
[docs]
def convert_data_to_dictionary(data):
"""
Converts a MatCal :class:`~matcal.core.data.Data`
class into a dictionary of np.arrays.
:param data: a MatCal data set
:type data: :class:`~matcal.core.data.Data`
:return: dictionary conversion of the data object
:rtype: OrderedDict
"""
if not isinstance(data, Data):
raise TypeError(f"The object passed to be converted to a dictionary"
f" must be a MatCal Data type. Received an object of type {type(data)}.")
d = OrderedDict()
for key in list(data.field_names):
kdata = data[key]
if isinstance(kdata, Data):
kdata = np.asarray(kdata)
d[key] = kdata
return d
[docs]
def convert_dictionary_to_data(dict_data):
"""
Takes a dictionary and attempts to create a
MatCal :class:`~matcal.core.data.Data` object.
The keys for the dictionary are expected to be
strings for the field names and the values
are expected to be valid numeric or string data.
:param dict_data: a dictionary with field names as keys and
the data values as the dictionary values.
:type dict_data: dict or OrderedDict
:return: a Data object with the default state :class:`~matcal.core.state.SolitaryState`.
:rtype: :class:`~matcal.core.data.Data`
"""
_check_dictionary_data(dict_data)
data = _create_array_from_dict(dict_data)
return Data(data)
def _check_dictionary_data(dict_data):
for value in dict_data.values():
if value is None:
raise TypeError("Attempting to put a None in a Data object")
def _create_array_from_dict(dict_data):
row_data = []
data_types = []
first_dim_length = None
for key, item in dict_data.items():
item = np.atleast_1d(np.array(item))
first_dim_length = _set_first_dim(first_dim_length, item.shape)
if not _confirm_first_dimension_length(first_dim_length, item.shape):
raise UnequalTimeDimensionSizeError(key)
data_types.append(_determine_data_type(item, key))
row_data.append(item)
converted_array = np.rec.array(row_data, dtype=data_types)
return converted_array
def _determine_data_type(item, key):
dtype=item.dtype
if issubclass(item.dtype.type, (numbers.Integral, numbers.Real)):
dtype=float
else:
dtype=dtype
if item.ndim <= 1:
type_to_return = (key, dtype)
else:
type_to_return = (key, dtype, item.shape[1:])
return type_to_return
class UnequalTimeDimensionSizeError(RuntimeError):
def __init__(self, key_name):
message = f"Field: {key_name}\n Does not have the same first (time) dimension length"
super().__init__(message)
def _confirm_first_dimension_length(ref_size, data_shape):
return ref_size == data_shape[0]
def _set_first_dim(old_first_dim, data_shape):
if old_first_dim is None:
return data_shape[0]
else:
return old_first_dim
def _serialize_data(data_to_serialize:Data)->dict:
out_dict = convert_data_to_dictionary(data_to_serialize)
for key, value in out_dict.items():
out_dict[key] = _format_serial(value)
return out_dict