Source code for matcal.full_field.data_importer

"""
The classes and functions in this module are intended 
to import data into MatCal from external sources for use 
in MatCal studies.
"""

import os
import numpy as np
from abc import ABC, abstractmethod

from matcal.core.object_factory import BasicIdentifier, ObjectCreator, SpecificObjectFactory
from matcal.core.data_importer import FileData

from matcal.full_field.TwoDimensionalFieldGrid import (MeshSkeleton,
                                            MeshSkeletonTwoDimensionalMesh)
from matcal.full_field.data import FieldData, convert_dictionary_to_field_data

from matcal.core.data import Data, convert_dictionary_to_data 
from matcal.core.logger import initialize_matcal_logger
from matcal.core.serializer_wrapper import json_serializer
from matcal.core.state import SolitaryState
from matcal.core.utilities import get_current_time_string


logger = initialize_matcal_logger(__name__)


[docs] def FieldSeriesData(global_filename, series_directory="./", position_names = ['X','Y'], state=SolitaryState(), file_type=None, n_cores=1): """ A function used to import a MatCal :class:`~matcal.core.data.Data` object from series field data. The user needs to use this function to load experimental data from a file or series of files into MatCal :param global_filename: the name of the file or primary file to be loaded. :type filename: str :param series_directory: the name of the directory where all files are located. Defaults to current working directory. :type series_directory: str :param position_names: optional names of the fields that store point/nodal coordinates. Defaults to ['X', 'Y']. :type position_names: list(str) :param state: optional state to be assigned to the data being imported :type state: :class:`~matcal.core.state.State` :param file_type: optional file type passed by the user. MatCal will attempt to guess the file type based on the file prefix. MatCal recognizes "csv", "e" file types. :type file_type: str :param n_cores: the number of cores to be used to load the data. This is only active when reading file data from separate files such as DIC data saved as CSV files. :return: a populated :class:`~matcal.full_field.data.FieldData` object. """ _check_filename_type(global_filename) _check_series_directory(series_directory) _check_position_names(position_names) _check_n_cores(n_cores) file_type = _get_file_type(global_filename, file_type) return _import_field_data(global_filename, series_directory, position_names, state, file_type, n_cores)
def _import_field_data(global_filename, series_directory="./", position_names = ['X','Y'], state=SolitaryState(), file_type=None, n_cores=1): try: field_parser = matcal_field_data_factory.create(file_type, global_filename, series_directory, n_cores=n_cores) except KeyError: err_str = (f"Data file \"{global_filename}\" of type \"{file_type}\" " + "is not a supported file type. MatCal supports the following data types:" +f"\n{list(matcal_field_data_factory.keys())}") raise RuntimeError(err_str) _log_with_time(global_filename, "Start: Parsing Field Series Data") series_array = _create_series_data_array(field_parser, position_names) series_data = FieldData(series_array) series_data.set_state(state) series_data = _create_position_data(series_data, field_parser, position_names) try: connectivity = field_parser._get_connectivity() series_data.set_connectivity(connectivity) except AttributeError: logger.debug(f"Not importing mesh connectivity for file {global_filename}. "+ "Not a mesh file format.") field_parser.close() _log_with_time(global_filename, "Done: Parsing Field Series Data") return series_data def _log_with_time(global_filename, message): current_time = get_current_time_string() logger.info(f"{message}({current_time}): {global_filename}") def _get_file_type(filename, file_type): if file_type is None: file_type = filename.split(".")[-1] _check_file_type_is_string(file_type) file_type = file_type.lower() return file_type def _check_file_type_is_string(file_type): try: assert isinstance(file_type, str) except AssertionError: raise TypeError("The file type passed to a data importer " "must be a string. Received " "variable of type {}".format(type(file_type))) def _check_filename_type(filename): try: assert isinstance(filename, str) except AssertionError: raise TypeError("The filename passed to a data importer " "must be a string. Received " "variable of type '{}'".format(type(filename))) def _check_series_directory(dirname): if not isinstance(dirname, str): raise TypeError("The parameter 'series_directory' passed " " to FieldSeriesData must be of type 'string'." f" Received variable of type '{type(dirname)}'.") def _check_n_cores(n_cores): from numbers import Integral is_int = isinstance(n_cores, Integral) greater_than_1 = False if is_int: greater_than_1 = n_cores >= 1 if not is_int or not greater_than_1: raise ValueError("The parameter 'n_cores' passed to FieldSeriesData must" f" be an integer greater than 0. Received '{n_cores}'.") def _check_position_names(position_names): if not isinstance(position_names, (list, tuple)): raise TypeError("The parameter 'position_names' passed to " "FieldSeriesData must be of type 'list' or 'tuple'." f" Received variable of type '{type(position_names)}'.") for idx, name in enumerate(position_names): if not isinstance(name, str): raise TypeError("The parameter 'position_names' passed " "to FieldSeriesData must" f" contain only strings. Received variable of type '{type(name)}'" f" in position {idx} of the 'position_names'.")
[docs] class FieldDataParserBase(ABC): def __init__(self, data_file, series_directory, n_cores=1): self._data_file = self._confirm_path_exists_and_return(data_file) self._series_directory = self._confirm_path_exists_and_return(series_directory) self._state = None #self._coord_names = ['X','Y','Z'] self._n_cores = n_cores class FieldDataDataSeriesMissingPathObject(RuntimeError): def __init__(self, filename, *args): super().__init__("File not found: {}".format(filename), *args) class FieldDataDataSeriesBadFrameIndex(RuntimeError): def __init__(self, frame_index, frame_limit): super().__init__(f"Bad frame index: {frame_index} \n" f" Total Number of Frames: {frame_limit}.") def __str__(self): return self._data_file @property def filename(self): return self._data_file @property def state(self): return self._state @property def number_of_cores(self): return self._n_cores
[docs] def set_state(self, state): """ Sets the optional state value for the data. :param state: The state for this particular data set. :type state: :class:`~matcal.core.state.State` """ self._state = state
@property @abstractmethod def number_of_frames(self) -> int: """""" @property @abstractmethod def number_of_nodes(self) -> int: """""" @property @abstractmethod def number_of_elements(self) -> int: """""" @property @abstractmethod def global_field_names(self) -> list: """""" @property @abstractmethod def node_field_names(self) -> list: """""" @property @abstractmethod def element_field_names(self) -> list: """"""
[docs] @abstractmethod def get_frame(self, frame_index): """ returns a Data instance """
[docs] @abstractmethod def get_global_data(self): """ returns a Data instance """
[docs] @abstractmethod def get_surfaces(self) -> dict: """ Returns a dict of the surfaces names to their corresponding nodes, 0 indexed """
@abstractmethod def _files_in_parallel(self, filename) -> bool: """ """ def _confirm_path_exists_and_return(self, path_object): if not os.path.exists(path_object) and not self._files_in_parallel(path_object): raise self.FieldDataDataSeriesMissingPathObject(path_object) return path_object def close(self): """""" def _get_connectivity(self): return None
class _JSONFullFieldParser(FieldDataParserBase): def __init__(self, json_filename, series_directory='./', n_cores=1): super().__init__(json_filename, series_directory, n_cores=1) self._data = _import_full_field_data_from_json(self._data_file) self._global_names, self._node_names = self._parse_field_names() self._showed_element_warning = False @property def number_of_frames(self) -> int: return self._data.length @property def number_of_nodes(self) -> int: return self._data.skeleton.spatial_coords.shape[0] @property def number_of_elements(self) -> int: return len(self._data.skeleton.connectivity) @property def global_field_names(self) -> list: return self._global_names @property def node_field_names(self) -> list: return self._node_names @property def element_field_names(self) -> list: if not self._showed_element_warning: logger.warning("JSON parser currently does not support element data import.") self._showed_element_warning = True return [] def get_frame(self, frame_index): out_dict = {} for n_name in self._node_names: out_dict[n_name] = self._data[n_name][frame_index,:] position_names = ['X', 'Y', 'Z'] for pos_idx in range(self._data.skeleton.spatial_coords.shape[1]): out_dict[position_names[pos_idx]] = self._data.skeleton.spatial_coords[:,pos_idx] out_data = convert_dictionary_to_data(out_dict) out_data.set_state(self._data.state) return out_data def get_global_data(self): out_dict = {} for g_name in self._global_names: out_dict[g_name] = self._data[g_name] out_data = convert_dictionary_to_data(out_dict) out_data.set_state(self._data.state) return out_data def get_surfaces(self) -> dict: return self._data.skeleton.surfaces def _files_in_parallel(self, filename)->bool: return False def _parse_field_names(self): names = self._data.field_names field_names = [] global_names = [] for name in names: if self._data[name].ndim > 1: field_names.append(name) else: global_names.append(name) return global_names, field_names def _get_connectivity(self): return self._data.skeleton.connectivity
[docs] class CSVFieldDataSeriesParser(FieldDataParserBase): """ Class used to import a series of field data from file sources. The file must contain a field called "file", which lists the filenames for the field data files. :param global_data_filename: path to a csv file containing the series filenames_list, and respective global variables. :type filename: str :param series_directory: path to directory containing the field data snapshots described in the global data file. :type filename: str """ def __init__(self, global_data_file, series_directory, n_cores=1, file_type=None): super().__init__(global_data_file, series_directory, n_cores) self._state = SolitaryState() self._number_of_frames = None self._field_data_file_list = None self._global_fields = None self._number_of_nodes = None self._file_type = file_type self._n_cores=n_cores self._setUp() @property def number_of_frames(self): """ Get the number of frames present in the data series. Frames align with the different time steps, if applicable. """ return self._number_of_frames @property def number_of_nodes(self): return self._number_of_nodes @property def number_of_elements(self): return 0 @property def global_field_names(self): """ get a list of the imported global field names :return: the field names :rtype: list """ gf_names = list(self._global_fields.field_names) return gf_names @property def state(self): """ :return: The physical state of the data corresponding to the experimental conditions. :rtype: :class:`~matcal.core.state.State` """ return self._state @property def node_field_names(self): """ get a list of the imported node field names :return: the field names :rtype: list """ return list(self.get_frame(0).keys()) @property def element_field_names(self): return []
[docs] def get_frame(self, frame_index): """ Return an instance of the appropriate field data object. :param frame_index: index of frame data desired. (0 indexed) :type frame_index: int :return: frame data :rtype: :class:`matcal.full_field.data.FieldData` """ self._check_index(frame_index) frame_name = os.path.join(self._series_directory,self._field_data_file_list[frame_index]) return FileData(frame_name, file_type=self._file_type)
[docs] def get_global_data(self): """ Return all global field data as MatCal Data class. :rtype: :class:`~matcal.core.data.Data` """ return self._global_fields
[docs] def get_surfaces(self) -> dict: message = "Surface information not currently collected from csv data." logger.info(message) return {}
def _check_index(self, frame_index): if frame_index is None or frame_index >= self._number_of_frames or frame_index < 0 or not \ isinstance(frame_index, int): raise self.FieldDataDataSeriesBadFrameIndex(frame_index, self.number_of_frames) def _setUp(self): global_data = self._parse_global_data_file() self._assign_field_data_files(global_data) self._assign_number_of_frames() self._assign_global_fields(global_data) self._number_of_nodes = self.get_frame(0).length def _parse_global_data_file(self): return FileData(self._data_file, import_strings=True) def _assign_field_data_files(self, global_data): self._field_data_file_list = list(np.atleast_1d(global_data['file_'])) def _assign_number_of_frames(self): self._number_of_frames = len(self._field_data_file_list) def _extract_key(self, global_data, key): return global_data[key] def _assign_global_fields(self, global_data): global_data_fields = list(global_data.field_names) global_data_fields.remove("file_") self._global_fields = global_data[global_data_fields] def _files_in_parallel(self, filename): return False
def _is_csv(filename): extension = filename.split('.')[-1] return extension == 'csv' def _get_number_of_points_and_frames(field_parser): n_times = field_parser.number_of_frames n_points = field_parser.number_of_nodes return n_points, n_times, field_parser.number_of_elements def _create_series_data_array(field_parser, position_names): n_points, n_times, n_ele = _get_number_of_points_and_frames(field_parser) global_keys, node_keys, element_keys = _get_field_parser_info(field_parser) ignore_keys = ['file'] + position_names data_list = _add_global_data_type(global_keys, ignore_keys) data_list = _add_space_data_type(data_list, node_keys, ignore_keys, n_points) data_list = _add_space_data_type(data_list, element_keys, ignore_keys, n_ele) data = np.zeros(n_times, dtype=data_list) logger.info(f"{field_parser.filename}: Reading Global Data") for gkey in global_keys: data[gkey] = field_parser.get_global_data()[gkey] if field_parser.number_of_cores > 1: _read_data_in_parallel(field_parser, data, ignore_keys) else: _read_data_in_serial(field_parser, data, ignore_keys) return Data(data) def _read_data_in_parallel(field_parser, data, ignore_keys): n_times = field_parser.number_of_frames from concurrent.futures import ProcessPoolExecutor futures = [] with ProcessPoolExecutor(max_workers=field_parser.number_of_cores) as executor: for time_index in range(n_times): futures.append(executor.submit(_get_frame_data, field_parser, time_index)) for time_index, future in enumerate(futures): _add_frame_data_to_data(future.result(), data, time_index, ignore_keys, field_parser) def _read_data_in_serial(field_parser, data, ignore_keys): n_times = field_parser.number_of_frames for time_index in range(n_times): frame_data = _get_frame_data(field_parser, time_index) _add_frame_data_to_data(frame_data, data, time_index, ignore_keys, field_parser) def _get_field_parser_info(field_parser): global_field_names = field_parser.global_field_names node_field_names = field_parser.node_field_names ele_field_names = field_parser.element_field_names return global_field_names, node_field_names, ele_field_names def _get_frame_data(field_parser, time_index): _log_frame_import(field_parser, field_parser.number_of_frames, time_index) frame_data = field_parser.get_frame(time_index) return frame_data def _add_frame_data_to_data(frame_data, data, time_index, ignore_keys, field_parser): _log_frame_processing(field_parser, field_parser.number_of_frames, time_index) global_keys, node_keys, element_keys = _get_field_parser_info(field_parser) for skey in node_keys: if skey in ignore_keys: continue node_data = frame_data[skey] data[skey][time_index] = node_data for ekey in element_keys: if ekey in ignore_keys: continue data[ekey][time_index] = frame_data[ekey] def _log_frame_processing(field_parser, n_times, time_index): if _output_store_data(time_index, n_times, 5): logger.info(f"{field_parser.filename}: Processing Frame {time_index}") def _log_frame_import(field_parser, n_times, time_index): if _output_store_data(time_index, n_times, 5): logger.info(f"{field_parser.filename}: Reading Frame {time_index}") def _output_store_data(current_index, max_index, max_out): if max_index < max_out: return True freq = max_index // max_out if current_index%freq == 0: return True else: return False def _add_global_data_type(global_keys, ignore_keys): data_list = [] for gkey in global_keys: if gkey in ignore_keys: continue data_list.append((gkey, np.double)) return data_list def _add_space_data_type(data_list, space_keys, ignore_keys, n_space): for skey in space_keys: if skey in ignore_keys: continue data_list.append((skey, np.double, (n_space,))) return data_list def _create_position_data(series_data, parser, position_names): x = [] frame = parser.get_frame(0) for pos_name in position_names: x.append(frame[pos_name]) series_data.set_spatial_coords(np.array(x).T) series_data.add_node_sets(**parser.get_surfaces()) return series_data class _FieldDataImporterSelector(SpecificObjectFactory): pass class _CSVFieldDataImporterCreator(ObjectCreator): def __call__(self, *args, **kwargs): return CSVFieldDataSeriesParser(*args, **kwargs) class _JSONFiledDataImporterCreator(ObjectCreator): def __call__(self, *args, **kwargs): return _JSONFullFieldParser(*args, **kwargs) matcal_field_data_factory = _FieldDataImporterSelector() matcal_field_data_factory.register_creator('csv', _CSVFieldDataImporterCreator()) matcal_field_data_factory.register_creator('json', _JSONFiledDataImporterCreator())
[docs] class MeshFileScraperSelector(BasicIdentifier): def identify(self, mesh_filename:str): extension = self._extract_extension(mesh_filename) return super().identify(extension) def _extract_extension(self, mesh_filename:str)->str: return mesh_filename.split('.')[-1]
def _json_mesh_skeleton_scraper(filename:str, subset_name:str=None): with open(filename, 'r') as f: mesh_dict = json_serializer.load(f) mesh_skele = _convert_ff_dict_to_mesh_skeleton(subset_name, mesh_dict) return mesh_skele def _convert_ff_dict_to_mesh_skeleton(subset_name, mesh_dict): if subset_name == None: mesh_skele = _full_json_import(mesh_dict) else: mesh_skele = _surface_json_import(subset_name, mesh_dict) return mesh_skele def _surface_json_import(subset_name, mesh_dict): node_ids = mesh_dict['surfaces'][subset_name] mesh_skele = MeshSkeleton(np.array(mesh_dict['spatial_coords'])[node_ids,:]) mesh_skele.subset_name = subset_name return mesh_skele def _full_json_import(mesh_dict): mesh_skele = MeshSkeleton(np.array(mesh_dict['spatial_coords']), np.array(mesh_dict['connectivity'])) mesh_skele.subset_name = mesh_dict['subset_name'] surfaces = {} for name, node_list in mesh_dict['surfaces'].items(): surfaces[name] = np.array(node_list) mesh_skele.add_node_sets(**surfaces) return mesh_skele matcal_mesh_file_scraper_selector = MeshFileScraperSelector() matcal_mesh_file_scraper_selector.register('json', _json_mesh_skeleton_scraper)
[docs] def mesh_file_to_skeleton(mesh_filename:str, subset_name:str=None)->MeshSkeleton: """ This will load a mesh file and return a data structure containing the mesh cloud points, connectivity and side set information. """ scraper = matcal_mesh_file_scraper_selector.identify(mesh_filename) return scraper(mesh_filename, subset_name)
[docs] class ImportedTwoDimensionalMesh(MeshSkeletonTwoDimensionalMesh): def __init__(self, mesh_filename): mesh_skeleton = mesh_file_to_skeleton(mesh_filename) super().__init__(mesh_skeleton)
def _import_full_field_data_from_json(source_filename:str): new_dict = None with open(source_filename, 'r') as f: new_dict = json_serializer.load(f) skeleton = _convert_ff_dict_to_mesh_skeleton(None, new_dict) cleaned_data_dict = _remove_skeleton_fields(new_dict) new_data = convert_dictionary_to_field_data(cleaned_data_dict) new_data._graph = skeleton return new_data def _remove_skeleton_fields(data_dict): fields = ['spatial_coords', 'connectivity', 'subset_name', 'surfaces'] for field in fields: data_dict.pop(field) return data_dict