"""
The simulator module contains classes that launch models and
process results to be passed back to MatCal after being run.
The only user facing class is the SimulatorResults class.
"""
from abc import ABC, abstractmethod
import os
from collections import OrderedDict
from matcal.core.constants import DESIGN_PARAMETER_FILE, STATE_PARAMETER_FILE, MATCAL_WORKDIR_STR
from matcal.core.data import convert_dictionary_to_data
from matcal.core.external_executable import matcal_external_executable_factory
from matcal.core.file_modifications import process_template_file
from matcal.core.logger import initialize_matcal_logger
from matcal.core.parameters import _get_parameters_according_to_precedence
from matcal.core.object_factory import DefaultObjectFactory, ObjectCreator
from matcal.core.reporter import matcal_parameter_reporter_identifier
from matcal.core.serializer_wrapper import matcal_save
from matcal.core.utilities import matcal_name_format
logger = initialize_matcal_logger(__name__)
import contextlib
@contextlib.contextmanager
def string_out_err_capture():
import sys
from io import StringIO
oldout,olderr = sys.stdout, sys.stderr
try:
out=[StringIO(), StringIO()]
sys.stdout,sys.stderr = out
yield out
finally:
sys.stdout,sys.stderr = oldout, olderr
out[0] = out[0].getvalue()
out[1] = out[1].getvalue()
return out
[docs]
class SimulatorResults:
"""Results data structure returned from a simulator."""
[docs]
def __init__(self, results_data, stdout, stderr, return_code, source_filename=None):
self._results_data = results_data
self._stdout = stdout
self._stderr = stderr
self._return_code = return_code
self._source_filename = source_filename
@property
def results_data(self):
"""
Returns the simulation results or None. None is returned if no
results are expected.
:return: simulation results data
:rtype: :class:`~matcal.core.data.Data`, None
"""
return self._results_data
@property
def stdout(self):
"""
:return: simulation execution standard output
:rtype: str
"""
return self._stdout
@property
def stderr(self):
"""
:return: simulation execution error output
:rtype: str
"""
return self._stderr
@property
def return_code(self):
"""
:return: simulation return code
:rtype: int
"""
return self._return_code
@property
def source_filename(self):
"""
:return: Relevant source file for data
:rtype: str
"""
return self._source_filename
class SimulatorFailureResults(SimulatorResults):
def __init__(self, stdout, stderr, return_code, state):
self._state = state
super().__init__(None, stdout, stderr, return_code)
@property
def state(self):
return(self._state)
class Simulator(ABC):
"""
Not intended for users: base class for simulators which run models.
"""
def __init__(self,
model_name,
compute_information,
results_information,
state,
template_dir='.'):
self._model_name = model_name
self._state = state
self._compute_information = compute_information
self._results_information = results_information
self._template_dir = template_dir
self._commands = None
self._modules_to_load = None
self._initial_working_dir = os.getcwd()
@abstractmethod
def run(self, parameters, working_dir=None) -> SimulatorResults:
""""""
def _select_working_dir(self, working_dir):
model_state_dir = os.path.join(matcal_name_format(self.model_name),
matcal_name_format(self._state.name))
if working_dir is None:
working_dir = os.path.abspath(os.path.join(os.getcwd(), model_state_dir))
else:
working_dir = os.path.abspath(os.path.join(working_dir, model_state_dir))
logger.debug("Simulator working directory: {}".format(working_dir))
return working_dir
def _change_to_working_dir(self, working_dir=None):
self._initial_working_dir = os.getcwd()
os.chdir(self._select_working_dir(working_dir))
@property
def results_filename(self):
return self._results_information.results_filename
@property
def model_name(self):
return self._model_name
@property
def commands(self):
return self._commands
@property
def number_of_cores(self):
return self._compute_information.number_of_cores
@property
def state(self):
return self._state
@property
def computer(self):
return self._compute_information.computer
@abstractmethod
def get_results(self):
""""""
@property
def fail_calibration_on_simulation_failure(self):
return self._compute_information.fail_on_simulation_failure
class ExecutableSimulator(Simulator):
"""
Not intended for users: runs models that require and external executable.
"""
def __init__(self,
model_name,
compute_information,
results_information,
state, model_constants,
template_dir='.',
commands=[]):
super().__init__(model_name,
compute_information,
results_information,
state,
template_dir=template_dir)
self._commands = list(commands)
self._model_constants = model_constants
def run(self, parameters, working_dir=None, get_results=True):
workdir_full_path = self._select_working_dir(working_dir)
external_executable = matcal_external_executable_factory.create(self._commands,
self._modules_to_load, self._compute_information.computer,
working_directory=workdir_full_path)
model_params = _get_parameters_according_to_precedence(self.state,
self._model_constants,
parameters)
self._pass_parameters_to_simulators(workdir_full_path, model_params)
self._write_parameters_file(workdir_full_path, parameters)
results = None
stdout = None
stderr = None
return_code = None
try:
stdout, stderr, return_code = self._execute_external(external_executable)
except Exception as e:
if self.fail_calibration_on_simulation_failure:
error_str = (f"State \"{self.state.name}\" for model \"{self.model_name}\" "+
f"failed with error:\n{repr(e)}"+
" Usually, this is caused by an incorrect executable name. Exiting.")
raise RuntimeError(error_str)
else:
logger.error(f"Continuing after state \"{self.state.name}\" for "+
f"model \"{self.model_name}\" "+
f"failed with error:\n{repr(e)}.")
if (return_code is not None and return_code != 0 and
self.fail_calibration_on_simulation_failure):
error_str = (f"State \"{self.state.name}\" for model \"{self.model_name}\" "+
f"failed with exit code {return_code}. "+
f"Exiting. The following error was output from the executable:\n{stderr}")
raise RuntimeError(error_str)
if get_results:
results = self._gather_results(workdir_full_path, stdout, stderr, return_code)
else:
results = SimulatorResults(None, stdout, stderr, return_code, None)
return results
def _gather_results(self, workdir_full_path, stdout, stderr, return_code):
model_results, read_error = self.get_results(workdir_full_path)
if model_results is not None and read_error is None:
results_file = os.path.join(workdir_full_path,
self._results_information.results_filename)
results = SimulatorResults(model_results, stdout, stderr, return_code, results_file)
elif not self.fail_calibration_on_simulation_failure and model_results is None:
results = SimulatorFailureResults(stdout, stderr, return_code, self.state)
else:
raise read_error
return results
def _pass_parameters_to_simulators(self, workdir_full_path, parameters):
files = [f.path for f in os.scandir(workdir_full_path)]
for file in files:
logger.debug(f"\t\tPreprocessing file {os.path.basename(file)}")
if os.path.basename(file) not in [STATE_PARAMETER_FILE, DESIGN_PARAMETER_FILE]:
process_template_file(file, parameters)
def _write_parameters_file(self, workdir_full_path, parameters):
params_file = os.path.join(workdir_full_path, DESIGN_PARAMETER_FILE)
dictionary_reporter = matcal_parameter_reporter_identifier.identify()
dictionary_reporter(params_file, parameters)
def _execute_external(self, external_executable):
logger.debug("Executing external application...")
stdout, stderr, return_code = external_executable.run()
simulation_out = "simulation.out"
simulation_err = "simulation.err"
if external_executable.working_directory:
simulation_out = os.path.join(external_executable.working_directory,"simulation.out")
simulation_err = os.path.join(external_executable.working_directory,"simulation.err")
with open(simulation_out, "w") as fout:
fout.write(stdout)
with open(simulation_err, "w") as ferr:
ferr.write(stderr)
logger.debug("Simulator running complete! \n")
return stdout, stderr, return_code
def get_results(self, working_dir=None):
data_filename = os.path.join(working_dir, self._results_information.results_filename)
results = None
error = None
try:
results = self._results_information.read(data_filename)
results.set_state(self._state)
except Exception as e:
logger.error(f"Cannot read results file "+
f"with name \"{os.path.basename(data_filename)}\" for "+
f"model \"{self.model_name}\" and state \"{self.state.name}\". "+
"Caught the following "
f"error:\n{repr(e)}")
error = e
return results, error
class PythonSimulator(Simulator):
"""
Not intended for users: Runs python models.
"""
def __init__(self, name, compute_information, results_information, state,
model, field_coordinates=None, pass_evaluation_number=False,
pass_params_by_category=False):
super().__init__(name, compute_information, results_information, state)
self._workdir = None
self._orig_stdout = None
self._orig_stderr = None
self._model = model
self._field_coordinates = field_coordinates
self._archive_name = None
self._pass_evaluation_number=pass_evaluation_number
self._pass_params_by_category=pass_params_by_category
self._save_dir = "matcal_python_results_archive"
if not os.path.exists(self._save_dir):
os.mkdir(self._save_dir)
def get_results(self):
pass
def run(self, parameters, working_dir=None):
output = None
try:
logger.debug("Python Simulation: Running")
with string_out_err_capture() as out:
results = self._run_python_simulation(parameters, working_dir)
self._archive_results(results, parameters)
logger.debug("Python Simulation: Finished")
except Exception as e:
logger.error("Python Model \"{}\" Error: {}".format(self.model_name, repr(e)))
if self.fail_calibration_on_simulation_failure:
raise e
else:
logger.error("Continuing Study After Model \"{}\" Error.".format(self.model_name,
repr(e)))
stdout, stderr = self._extract_out_and_err(out)
output = SimulatorFailureResults(stdout, stderr, None, self._state)
if output == None:
stdout, stderr = self._extract_out_and_err(out)
output = SimulatorResults(results, stdout, stderr, None, self._archive_name)
return output
def _extract_out_and_err(self, out):
try:
stdout, stderr = out
except AttributeError:
stdout = "doc building no output catch"
stderr = ""
return stdout,stderr
def _archive_results(self, results, parameters):
self._archive_name = os.path.join(self._save_dir, self.model_name)
for name, value in parameters.items():
self._archive_name+= f"_{name}={value}"
self._archive_name += ".joblib"
matcal_save(self._archive_name, results)
def _run_python_simulation(self, parameters, working_dir):
model_constants = self._model.get_model_constants(self._state)
run_variables = parameters
eval_number = self._get_eval_number(working_dir)
logger.debug("{}".format(run_variables))
kwargs = OrderedDict()
if not self._pass_params_by_category:
kwargs = _get_parameters_according_to_precedence(self._state, model_constants,
run_variables)
else:
kwargs["model_parameters"] = run_variables
kwargs["model_constants"] = model_constants
kwargs["state_parameters"] = self._state.params
if eval_number is not None:
kwargs["evaluation_number"] = eval_number
results = self._python_function(**kwargs)
results = self._convert_to_data(results)
results.set_state(self._state)
return results
def _get_eval_number(self, working_dir):
eval_number = None
if working_dir is not None and self._pass_evaluation_number:
if MATCAL_WORKDIR_STR in working_dir:
eval_number = int(working_dir.split(MATCAL_WORKDIR_STR+".")[-1])
else:
logger.debug(f"No \"{MATCAL_WORKDIR_STR}\". Evaluation number not identified.")
return eval_number
def _convert_to_data(self, results):
converter = matcal_data_reader_factory.create(self._is_field_simulation(),
self._field_coordinates)
results = converter(results)
return results
def _is_field_simulation(self):
return self._field_coordinates != None
def _python_function(self, **run_variables):
return self._model.python_function(**run_variables)
@property
def _results_file_path(self):
return None
@property
def results_filename(self):
return self._archive_name
class ProbeDataReaderCreator(ObjectCreator):
def __call__(self, *args, **kwargs):
return convert_dictionary_to_data
class DataReaderFactory(DefaultObjectFactory):
pass
matcal_data_reader_factory = DataReaderFactory(ProbeDataReaderCreator())