Source code for matcal.full_field.field_mappers

from abc import ABC, abstractmethod
from collections import OrderedDict
import numpy as np
from scipy import sparse
from scipy.optimize import root
from typing import Callable

from matcal.core.utilities import (check_item_is_correct_type, check_value_is_nonempty_str, 
                                   _time_interpolate)
from matcal.full_field.data import FieldData, convert_dictionary_to_field_data
from matcal.full_field.data_importer import FieldSeriesData
from matcal.full_field.data_exporter import matcal_field_data_exporter_identifier 
from matcal.full_field.NodeData import NodeData
from matcal.full_field.shapefunctions import TwoDim4NodeBilinearShapeFunction

from matcal.core.logger import initialize_matcal_logger
logger = initialize_matcal_logger(__name__)

[docs] class FieldProjectorBase(ABC): @abstractmethod def _shape_function(self): """""" class InvalidFieldnameError(RuntimeError): def __init__(self, fieldname, possible_fieldnames): message = "Field '{}' is not available for projection.\n" \ "Possible fields:\n{}".format(fieldname, possible_fieldnames) super().__init__(self, message) def __init__(self, source_data, target_domain): self._source_data = source_data self._target_domain = target_domain self._node_data = NodeData() self._local_element_data = _local_element_systems(self._target_domain.cell_count, self._shape_function) @abstractmethod def project_field(self, fieldname): """""" @abstractmethod def get_results_data(self): """"""
class _local_element_systems: _contained_point_buffer = 200 def __init__(self, number_of_elements, shape_function): self.element_residual = np.zeros([number_of_elements, self._contained_point_buffer]) self.element_node_values = np.zeros([number_of_elements, self._contained_point_buffer, shape_function.number_of_functions]) self.contained_points = np.zeros([number_of_elements, self._contained_point_buffer], dtype=int) self.contained_point_count = np.zeros(number_of_elements, dtype=int) @property def buffer_size(self): return self._contained_point_buffer @property def number_of_points(self): return np.sum(self.contained_point_count) def reset_local_system(self): self.contained_point_count = self.contained_point_count * 0 self.element_residual = self.element_residual * 0 class _InterpolationMatrixGenerator: def __init__(self, shape_function): self._shape_function = shape_function def generate(self, grid, cloud_points): num_grid_points, num_cloud_points, num_elem = self._calc_counts(grid, cloud_points) M = sparse.lil_matrix((num_cloud_points, num_grid_points)) cloud_to_matrix_map = -np.ones(num_cloud_points, dtype=int) local_element_data = self._generate_local_element_data(num_elem, self._shape_function, grid, cloud_points) for element_index in range(num_elem): grid_point_indices = grid.cell_connectivity[element_index, :] num_contained_points = local_element_data.contained_point_count[element_index] for contained_point_index in range(num_contained_points): reference_cloud_index = local_element_data.contained_points[element_index, contained_point_index] M[reference_cloud_index, grid_point_indices] = local_element_data.element_node_values[element_index, contained_point_index, :] interp_mat = sparse.csr_matrix(M) e_col = 1-np.ones(interp_mat.shape[0],dtype=bool)*interp_mat.astype(bool) if np.any(e_col) > 0: logger.info("WARNING: Unsupported nodes in matrix. Use of the current mesh may cause problems for VFM calibrations") return interp_mat def _calc_counts(self, grid, cloud_points): num_grid_points = grid.node_count num_elem = grid.cell_count num_cloud_points = np.shape(cloud_points)[0] return num_grid_points,num_cloud_points,num_elem def _generate_local_element_data(self, num_elem, shape_function, grid, cloud_points): local_elem_data = _local_element_systems(num_elem, shape_function) cloud_points_to_element_map = grid.get_containing_cell(cloud_points) for point_index, element_index in enumerate(cloud_points_to_element_map): if element_index < 0: continue shape_function_values = self._caculate_shape_function_values(element_index, grid, cloud_points, point_index) local_elem_data = self._add_shapefunction_data_to_local_element_data(local_elem_data, shape_function_values, element_index, point_index) return local_elem_data def _caculate_shape_function_values(self, element_index, grid, cloud_points, point_index): mapper = _LabToParametricSpaceMapper(cloud_points[point_index, :], grid.get_cell_node_locations(element_index)) parametric_coordinates = mapper.calculate_parametric_location() shape_function_values = self._shape_function.values(parametric_coordinates) return shape_function_values def _add_shapefunction_data_to_local_element_data(self, local_element_data, array_values, element_index, point_index): equation_point_index = self._get_equation_point_index(element_index, local_element_data) local_element_data.element_node_values[element_index, equation_point_index, :] = array_values local_element_data.contained_points[element_index, equation_point_index] = point_index self._increment_equation_point_index(element_index, local_element_data) return local_element_data def _increment_equation_point_index(self, element_index, local_element_data): local_element_data.contained_point_count[element_index] += 1 def _get_equation_point_index(self, element_index, local_element_data): equation_index = local_element_data.contained_point_count[element_index] return equation_index class _TwoDimensionalFieldProjector(FieldProjectorBase): _shape_function = TwoDim4NodeBilinearShapeFunction() def __init__(self, source_data, target_domain): super().__init__(source_data, target_domain) self._matrix = None self._cloud_map = None def project_field(self, fieldname, frame_index): self._projection_error_checking(fieldname) if self._matrix_is_empty(): self._create_projection_matrix() self._node_data = NodeData() self._project(fieldname, frame_index) return self.get_results_data() def get_results_data(self): return self._node_data.get_full_data() def reset(self, new_source_data=None): if (new_source_data is not None): self._source_data = new_source_data self._create_projection_matrix() self._node_data = NodeData() return self def _projection_error_checking(self, field_name): if field_name not in self._source_data.field_names: raise self.InvalidFieldnameError(field_name, self._source_data.field_names) def _matrix_is_empty(self): return self._matrix == None def _create_projection_matrix(self): mat_generator = _InterpolationMatrixGenerator(self._shape_function) self._matrix = mat_generator.generate(self._target_domain, self._source_data.spatial_coords) def _project(self, fieldname, index): residual = self._source_data[fieldname][index] field_values = self._solve(residual) self._node_data.add_node_data(fieldname, np.array(field_values)) def _solve(self, residual): result = sparse.linalg.lsqr(self._matrix, residual.T,atol=1e-12, btol=1e-12) return result[0] class _NodeMapper: def __init__(self, buffer_size): self._gtol = -np.ones(buffer_size, dtype=int) self._ltog = -np.ones(buffer_size, dtype=int) self._item_count = 0 def append(self, value): if int(value) not in self._gtol: self._gtol[self._item_count] = value self._ltog[value] = self._item_count self._item_count += 1 def getGlobalToLocal(self, idx): return self._gtol[idx] def getLocalToGlobal(self, idx): g = self._ltog[idx] if np.all(g < 0): raise RuntimeError("Indexing Eliminated Node") return g @property def size(self): return self._item_count class _LabToParametricSpaceMapper: _shape_function = TwoDim4NodeBilinearShapeFunction() def __init__(self, location_array, element_location_array): self._goal_location = location_array self._cell_locations = element_location_array def calculate_parametric_location(self): initial_guess = np.zeros([1, 2]) results = root(self._calculate_residual, initial_guess, jac=self._calculate_tangent) return results.x.reshape((1,2)) def _calculate_residual(self, parametric_location): shape_function_values = self._shape_function.values(parametric_location.reshape([1,2])) cell_location = np.dot(shape_function_values, self._cell_locations) residual = cell_location - self._goal_location return residual[0] def _calculate_tangent(self, parametric_location): shape_function_grad = self._shape_function.gradients(parametric_location.reshape([1,2])) sf_x = shape_function_grad[0,0,:] sf_y = shape_function_grad[0,1,:] dpdx = np.dot(sf_x, self._cell_locations) dpdy = np.dot(sf_y, self._cell_locations) tangent = -1 * np.array([[dpdx[0],dpdy[0]], [dpdx[1], dpdy[1]]]) return tangent class _TwoDimensionalFieldInterpolator: _shape_function = TwoDim4NodeBilinearShapeFunction() def __init__(self, grid_geometry, cloud_points): self._grid = grid_geometry self._cloud_points = cloud_points self._matrix = self._generate_interpolation_matrix() def interpolate(self, grid_data): num_fields = self._parse_number_of_fields(grid_data) result = self._matrix.dot(grid_data) return result.reshape(-1,num_fields) @property def number_of_nodes_required(self): return self._matrix.shape[1] def _parse_number_of_fields(self, grid_data): if grid_data.ndim > 1: return grid_data.shape[1] else: return 1 def _generate_interpolation_matrix(self): M_i = self._make_interpolation_matrix_from_transposed_projection_matrix() return M_i def _make_interpolation_matrix_from_transposed_projection_matrix(self): mat_gen = _InterpolationMatrixGenerator(self._shape_function) proj_Mat = mat_gen.generate(self._grid, self._cloud_points) return proj_Mat class NonIntegerPolynomialOrderError(RuntimeError): def __init__(self, value): message = f"Polynomial Order must be of type int, passed {type(value)}" super().__init__(message) class SmallSearchRadiusError(RuntimeError): def __init__(self, mult): message = f"Search Radius multiplier should be greater than 1.05, passed {mult}" super().__init__(message) class BadPolynomialOrderError(RuntimeError): def __init__(self, order): message = f"Polynomial order should be >= 1 and < 11, passed {order}" super().__init__(message) class BadSearchTypeError(RuntimeError): def __init__(self, mult): message = f"Search radius multiplier needs to be of type double or int, passed {type(mult)}" super().__init__(message) def _check_gmls_parameters(polynomial_order, epsilon_multiplier): if not isinstance(polynomial_order, int): raise NonIntegerPolynomialOrderError(polynomial_order) if polynomial_order < 1 or polynomial_order > 10: raise BadPolynomialOrderError(polynomial_order) if not isinstance(epsilon_multiplier, (float, int)): raise BadSearchTypeError(epsilon_multiplier) if epsilon_multiplier < 1.05: raise SmallSearchRadiusError(epsilon_multiplier)
[docs] class MeshlessMapperGMLS: """ Class that performs meshless mapping. """ count = 0 default_polynomial_order=1 #: Default value for polynomial order. default_epsilon_multiplier=2.75 #: Default value for search radius multiplier def __init__(self, target_coords, source_coords, polynomial_order=default_polynomial_order, epsilon_multiplier=default_epsilon_multiplier, number_of_batches=2): _check_gmls_parameters(polynomial_order, epsilon_multiplier) self._increment_total_instances_count() n_dim = target_coords.shape[1] self._n_source_points = source_coords.shape[0] self._kokkos_parser = None self._gmls = None self._helper = None self._initialize_gmls_tool(target_coords, source_coords, polynomial_order, epsilon_multiplier, number_of_batches, n_dim) def _initialize_gmls_tool(self, target_coords, source_coords, polynomial_order, epsilon_multiplier, number_of_batches, n_dim): import pycompadre try: self._kokkos_parser = pycompadre.KokkosParser() self._gmls = pycompadre.GMLS(polynomial_order, n_dim) self._helper = pycompadre.ParticleHelper(self._gmls) except: self.finish() raise self.InitializeError() try: self._helper.generateKDTree(source_coords) self._helper.generateNeighborListsFromKNNSearchAndSet(target_coords, polynomial_order, n_dim, epsilon_multiplier) except (Exception, RuntimeError): self.finish() raise self.NeighborDectectionError() try: self._gmls.addTargets(pycompadre.TargetOperation.ScalarPointEvaluation) self._gmls.generateAlphas(number_of_batches, keep_coefficients=False) except Exception: self.finish() raise self.AlphaGenerationError() def finish(self): try: del self._gmls except: pass try: del self._helper except: pass if self._one_instance_remains(): try: del self._kokkos_parser except: pass self._decrement_total_instances_count() def _one_instance_remains(self): return __class__.count == 1 def _increment_total_instances_count(self): __class__.count += 1 def _decrement_total_instances_count(self): __class__.count -=1 def map(self, source_value): import pycompadre n_points = source_value.shape[0] if n_points != self._n_source_points: self.finish() raise self.IncorrectLengthError(self._n_source_points, n_points) try: map_values = self._helper.applyStencil(source_value, pycompadre.TargetOperation.ScalarPointEvaluation) except Exception: raise self.MappingError() return map_values class NeighborDectectionError(RuntimeError): def __init__(self): message = "Nearest Neighbor Detection Failure: Examine point clouds to ensure sufficient support" super().__init__(message) class InitializeError(RuntimeError): def __init__(self): message = 'Something went wrong with GMLS initialization' super().__init__(message) class AlphaGenerationError(RuntimeError): pass class MappingError(RuntimeError): pass class IncorrectLengthError(RuntimeError): def __init__(self, source_point_size, passed_point_size): message = f"Initialized point count({source_point_size}) is not equal to the length of the passed value array({passed_point_size})" super().__init__(message)
[docs] def meshless_remapping(field_data, fields_to_map, target_points, polynomial_order=MeshlessMapperGMLS.default_polynomial_order, search_radius_multiplier=MeshlessMapperGMLS.default_epsilon_multiplier, target_time=None, time_field=None): """ Stand alone function for performing meshless interpolation between two point clouds. This function uses generalized moving least squares(GMLS) to perform local interpolations. GMLS tools provided by pycompadre. This function is intended for interpolation, but has limited ability to do extrapolation. If extrapolation is expected, use lower order(<=2) polynomials to reduce errant edge effects of higher order polynomials. For default values of the search_radius_multiplier and polynomial_order see :class:`~matcal.full_field.field_mappers.MeshlessMapperGMLS` :param field_data: FieldData object that contains the data to be mapped to a new point cloud. :type field_data: :class:`~matcal.full_field.data.FieldData` :param fields_to_map: List of field names to be mapped from the provided field data to the target_points. :type fields_to_map: list(str) :param target_points: Two-dimensional array containing the points to be interpolated on. each column holding the coordinate, and each row representing a new point. Currently only tested for two-dimensional interpolation. Other dimensions may work as well, but should be used with caution. :type target_points: ArrayLike :param polynomial_order: The order of polynomial to use for interpolation/extrapolation. :type polynomial_order: int :param search_radius_multiplier: multiplier used to gather additional interpolation points once the minimum radius for a given polynomial order is reached. Higher values will include more points and in-general have a greater smoothing effect. Recommended values for this parameter are between 1.5 and 3. :type search_radius_multiplier: float """ mapping_tool = MeshlessMapperGMLS(target_points, field_data.spatial_coords, polynomial_order, search_radius_multiplier) mapped_data = {} for field in fields_to_map: cloud_field = field_data[field] n_time = cloud_field.shape[0] mesh_field = np.zeros([n_time, target_points.shape[0]]) for i_time in range(n_time): mesh_field[i_time, :] = mapping_tool.map(cloud_field[i_time,:]) mapped_data[field] = mesh_field for field in field_data.field_names: current_data = field_data[field] if _is_a_global_field(field, current_data): mapped_data[field] = current_data mapped_field_data = convert_dictionary_to_field_data(mapped_data) mapped_field_data.set_spatial_coords(target_points) mapping_tool.finish() if _has_information_for_time_interp(target_time, time_field): mapped_field_data = _map_in_time(target_time, time_field, mapped_field_data) return mapped_field_data
def _has_information_for_time_interp(target_time, time_field): return isinstance(time_field, str) and len(target_time) > 0 def _map_in_time(target_time, time_field, mapped_field_data): time_mapped_data = {} source_time = mapped_field_data[time_field] if source_time.ndim > 1: raise BadTimeDataShape(time_field, source_time.ndim) for field in mapped_field_data.field_names: source_data = mapped_field_data[field] target_data = _time_interpolate(target_time, source_time, source_data) time_mapped_data[field] = target_data time_mapped_data = convert_dictionary_to_field_data(time_mapped_data) time_mapped_data.set_spatial_coords(mapped_field_data.spatial_coords) return time_mapped_data def _is_a_global_field(field, current_data): is_global = current_data.ndim == 1 return is_global
[docs] class FullFieldCalculator: """ A class for generating spatially dependent measurements between different sets of full-field data. """ def __init__(self, reference_data, independent_field_vals, independent_field_name="time", position_names=['X', 'Y'], add_global_variables=True)->None: """ :param reference_data: Passed as either a path to a full-field data file or a MatCal :class:`~matcal.full_field.data.FieldData` object. This set the reference data file. :param independent_field_vals: An array of times or other time-like values to be used as the interpolation points in time for calculations. :param independent_field_name: The name of a field to be used as the time interpolation axis. Any monotonic global field can be used. For example applied displacement can be used for uniaxial tension data. This field must exist on all data sets used. :type time_field_name: str :param position_names: A list of the different position names contained in any data sets used. The number of names passed will change the dimensionality of the problem. (Passing ['x', 'y'] will create a 2D problem, while passing ['x', 'y', 'z'] will create a 3D problem.) This value defaults to ['X', 'Y']. :type position_names: list(str) :param add_global_variables: copy global variables from all data sources to the new field data class. :type add_global_variables: bool """ self._check_position_names_input(position_names) self._position_names = position_names check_item_is_correct_type(independent_field_vals, (list, np.ndarray), "independent_field_vals") self._independent_field_vals = independent_field_vals self._data_sets = OrderedDict() self._spatial_calculations = OrderedDict() check_item_is_correct_type(independent_field_name, str, "independent_field_name") self._independent_field_name = independent_field_name check_item_is_correct_type(reference_data, (str, FieldData), "reference_data") self._original_reference_data = reference_data self._reference_data = self._process_data_input(reference_data, self._position_names) self._mapping_polynomial_order = MeshlessMapperGMLS.default_polynomial_order self._mapping_search_radius_multiplier = MeshlessMapperGMLS.default_epsilon_multiplier check_item_is_correct_type(add_global_variables, bool, "global_variables") self._add_global_variables = add_global_variables def _check_position_names_input(self, position_names): check_item_is_correct_type(position_names, list, "position_names", call_depth=1) for pos_name in position_names: check_item_is_correct_type(pos_name, str, "position_name", call_depth=1)
[docs] def set_mapping_parameters(self, polynomial_order, search_radius_multiplier): """ Set the mapping parameters for the PyCompadre GMLS mapping algorithm. See :func:`~matcal.full_field.field_mappers.meshless_remapping` for more information on the mapping parameters. """ import numbers check_item_is_correct_type(polynomial_order, numbers.Integral, "polynomial_order") check_item_is_correct_type(search_radius_multiplier, numbers.Real, "search_radius_multiplier") self._mapping_polynomial_order=polynomial_order self._mapping_search_radius_multiplier=search_radius_multiplier
[docs] def calculate_and_export(self, export_filename:str, file_type=None)->None: """ Perform calculations on the reference data, and the added data sets. Loops over all added data files and calculations, added with :add_data: and: add_spatial_calculation:, respectively. The results are exported to an external data file defined by the user. Exported data includes all fields used in calculations. This data will appear as "<field_name>_ref" or "<field_name>_interp_<data_name>" with the former indicating values from the reference data and the later indicating the interpolated values from the additional data to the reference locations. The calculated fields will be named "<function_name>_<data_name>". :param export_filename: filename to store the calculated data to. :type export_filename: str :param file_type: By default MatCal will select the exporter and file type for export based on the extension of export_filename. However, the file type can be manually specified here. The "json" file type is supported in MatCal core; however, other file types may be available in other modules. :type file_type: str """ check_item_is_correct_type(export_filename, str, "export_filename") if file_type is not None: check_item_is_correct_type(file_type, str, "file_type") self._precalculate_check() fields_to_map = self._assemble_fields_to_map() dict_to_export = self._initialize_export_dict(fields_to_map) for current_data_name, data_input in self._data_sets.items(): current_data = self._process_data_input(data_input, self._position_names) if self._add_global_variables: dict_to_export.update(self._get_interp_global_fields(current_data, current_data_name)) mapped_data_dict = self._get_mapped_reference_data(current_data, current_data_name, fields_to_map) spatial_calc_data_dict = self._perform_spatial_calculations(mapped_data_dict, current_data_name) dict_to_export.update(mapped_data_dict) dict_to_export.update(spatial_calc_data_dict) if self._add_global_variables: dict_to_export.update(self._get_interp_global_fields(self._reference_data, "ref")) data_to_export = self._prepare_export_data(dict_to_export) self._export_data(export_filename, data_to_export, file_type)
def _assemble_fields_to_map(self): fields_to_map = [] for function, field_names in self._spatial_calculations.values(): for field_name in field_names: if field_name not in fields_to_map: fields_to_map.append(field_name) return fields_to_map def _initialize_export_dict(self, fields_to_map): export_dict = {} for field in fields_to_map: ref_field_name = self._get_mapped_field_name(field, "ref") export_dict[ref_field_name] = self._reference_data[field] return export_dict def _process_data_input(self, ref_data, position_names): data = None data = self._parse_data(ref_data, position_names) current_time_series = data[self._independent_field_name] if current_time_series.ndim > 1: raise BadTimeDataShape(self._independent_field_name, current_time_series.ndim) processed_data = _map_in_time(self._independent_field_vals, self._independent_field_name, data) processed_data._graph = data._graph return processed_data def _get_mapped_reference_data(self, current_data, current_data_name, fields_to_map): mapped_data = meshless_remapping(current_data, fields_to_map, self._reference_data.spatial_coords, self._mapping_polynomial_order, self._mapping_search_radius_multiplier, self._independent_field_vals, self._independent_field_name) mapped_data_dict = {} for field in fields_to_map: cur_field_name = self._get_mapped_field_name(field, current_data_name) mapped_data_dict[cur_field_name] = mapped_data[field] return mapped_data_dict def _get_mapped_field_name(self, field_name, current_data_name): return f"{field_name}_interp_{current_data_name}" def _perform_spatial_calculations(self, mapped_data_dict, current_data_name): dict_with_spatial_calc_fields = {} for calc_name, (calc_function, calc_fields) in self._spatial_calculations.items(): for calc_field in calc_fields: new_calc_field_name = self._get_calculated_field_name(calc_name, current_data_name, calc_field) mapped_calc_field_name = self._get_mapped_field_name(calc_field, current_data_name) calc_value = calc_function(self._reference_data[calc_field], mapped_data_dict[mapped_calc_field_name], self._reference_data.spatial_coords, self._independent_field_vals) dict_with_spatial_calc_fields[new_calc_field_name] = calc_value return dict_with_spatial_calc_fields def _get_calculated_field_name(self, calc_name, current_data_name, field): return f"{calc_name}_{current_data_name}_{field}" def _get_interp_global_fields(self, data, cur_data_name): global_fields_dict = {} for field in data.field_names: if len(np.shape(data[field])) <= 1: interped_global_field = _time_interpolate(self._independent_field_vals, data[self._independent_field_name], data[field]) updated_field_name = self._get_mapped_field_name(field, cur_data_name) global_fields_dict[updated_field_name] = interped_global_field return global_fields_dict def _prepare_export_data(self, dict_to_export): dict_to_export[self._independent_field_name] = self._independent_field_vals data_to_export = convert_dictionary_to_field_data(dict_to_export) data_to_export._graph = self._reference_data._graph return data_to_export def _export_data(self, export_filename, data_to_export, file_type): exporter = self._get_exporter(export_filename, file_type) fields_to_export = data_to_export.field_names exporter(target_filename=export_filename, data_to_export=data_to_export, fields=fields_to_export, reference_source_mesh=self._original_reference_data, independent_field=self._independent_field_name) def _get_exporter(self, export_filename, file_type): if file_type is None: file_type = export_filename.split(".")[-1] return matcal_field_data_exporter_identifier.identify(file_type)
[docs] def add_spatial_calculation(self, calculation_name:str, calculation_function:Callable, *field_names)->None: """ Add a new calculation to be preformed on the full-field data. :param calculation_name: Name that will serve as the base for referencing the results of the caculations performed on the various full-field data sets. This name must be unique. :type calculation_name: str :param calculation_function: Function used for performing calculations on the full-field data. Funcations expect a call signature of (reference_field[n_time, n_points], current_field[n_time, n_points], point_locations[n_points, n_dim], time[n_time]). :type calculation_function: Callable :param field_names: fields to pass into the calculation function. These field must exist on both the reference and additional data sets. :type field_names: list(str) """ self._check_measurement_inputs(calculation_name, calculation_function, field_names, self._spatial_calculations) self._spatial_calculations[calculation_name] = [calculation_function, field_names]
[docs] def add_data(self, data_name, field_data): """ Add full field data to be compared to the reference data field. :param data_name: a name that will be used by reference this data set. This name must be unique. :type data_name: str :param field_data: passed as either a path to a full-field data file or a MatCal :class:`~matcal.full_field.data.FieldData` object. This will add a new data set to used in the calculations. """ self._check_data_inputs(data_name, field_data) self._data_sets[data_name] = field_data
[docs] def get_calculation_functions(self)->dict: """ Return a dictionary of all the calculation functions """ all_measurements = {} all_measurements['spatial'] = self._spatial_calculations return all_measurements
[docs] def get_data(self)->dict: """ Return a dictionary of all the additional datasets """ return self._data_sets
def _parse_data(self, input_data, position_names): if isinstance(input_data, str): data = FieldSeriesData(input_data, position_names=position_names) elif isinstance(input_data, FieldData): data = input_data return data def _precalculate_check(self): if not self._has_measurements(): raise NoMeasurementError() n_data_sets = len(self._data_sets) n_required = 1 if n_data_sets < n_required: raise InsuffichentFieldDataSetsError(n_data_sets, n_required) def _check_measurement_inputs(self, calculation_name:str, calculation_function:Callable, field_names:str, measurement_record:dict)->None: check_item_is_correct_type(calculation_name, str, "calculation_name", call_depth=1) check_item_is_correct_type(calculation_function, Callable, "calculation_function", call_depth=1) check_item_is_correct_type(field_names, tuple, "field_names",call_depth=1) for field in field_names: check_value_is_nonempty_str(field, "field_name in field_names", call_depth=1) if calculation_name in list(measurement_record.keys()): old_function, old_fields = measurement_record[calculation_name] raise SameMeasurementNameError(calculation_name, old_function, calculation_function, old_fields, field_names) def _check_data_inputs(self, data_name, field_data): check_value_is_nonempty_str(data_name, "data_name", call_depth=1) check_item_is_correct_type(field_data, (str, FieldData), "field_data", call_depth=1) if data_name in list(self._data_sets.keys()): raise SameDataNameError(data_name) def _has_measurements(self): return len(self._spatial_calculations) > 0
class BadTimeDataShape(RuntimeError): def __init__(self, field_name, field_dim): message = f"time field name {field_name} is of dimension {field_dim}, must be 1D." super().__init__(message) class SameMeasurementNameError(RuntimeError): def __init__(self, name, old_function, new_function, old_field, new_field): message = "Measurements must have unique names for full-field statistics." message += f"\nAttempted to add redundant measurements for name: {name}" message += f"\nOriginal field and measurement: {old_field}\n{old_function}" message += f"\nAttempted add field and measurement: {new_field}\n{new_function}\n" super().__init__(message) class SameDataNameError(RuntimeError): def __init__(self, name): message = "Data must have unique names for full-field statistics." message += f"\nAttempted to add redundant data for name: {name}" super().__init__(message) class InsuffichentFieldDataSetsError(RuntimeError): def __init__(self, n_data_sets, n_required_sets): message = f"Require {n_required_sets} for statistics calculation. Currently have {n_data_sets}" super().__init__(message) class NoMeasurementError(RuntimeError): def __init__(self): message = "No measurements added to generate statistics please add them with:" message += " add_spatial_measurement(name, function, field_name)" super().__init__(message)