From 764175dd2969b620a23de85b7df4f56c3f7d9baf Mon Sep 17 00:00:00 2001 From: Kenny Sharma <kenny.sharma@tum.de> Date: Fri, 8 Dec 2017 17:11:23 +0100 Subject: [PATCH] [NRRPLT-5674] [NRRPLT-5675] Add support for distributed Nest without MUSIC. This patch implements all functionality required for dynamic, distributed Nest interfaces that behave the same as single-process nest for local installs. The existing MUSIC interfaces are deprecated and now subclass the adapters and interfaces in the new hbp_distributed_nest package. This patch does not support distributed, cluster launching, but this can be adapted in the future from the existing MUSIC launchers. Change-Id: I1aac3a59843d9fdfe051a9674566f3b3f5040ac1 --- Makefile | 8 +- README | 2 +- hbp_nrp_distributed_nest/MANIFEST.in | 1 + hbp_nrp_distributed_nest/README.txt | 2 + .../hbp_nrp_distributed_nest/__init__.py | 5 + .../DistributedPyNNCommunicationAdapter.py | 169 +++++++++++ .../cle/DistributedPyNNControlAdapter.py | 76 +++++ .../hbp_nrp_distributed_nest/cle/__init__.py | 3 + .../launch/DistributedCLEProcess.py | 159 ++++++++++ .../launch/DistributedNestProcess.py | 85 ++++++ .../launch/MPILauncher.py | 215 ++++++++++++++ .../launch/NestBrainProcess.py | 279 ++++++++++++++++++ .../launch/NestLauncher.py | 173 +++++++++++ .../launch/__init__.py | 3 + .../launch/host/LocalLauncher.py | 79 +++++ .../launch/host/__init__.py | 73 +++++ .../hbp_nrp_distributed_nest/version.py | 2 + hbp_nrp_distributed_nest/requirements.txt | 2 + .../requirements_extension_tests.txt | 3 + hbp_nrp_distributed_nest/setup.py | 54 ++++ .../cle/MUSICPyNNCommunicationAdapter.py | 11 +- .../cle/MUSICPyNNControlAdapter.py | 6 +- .../launch/MUSICBrainProcess.py | 126 +++----- .../launch/MUSICCLEProcess.py | 129 +------- .../launch/MUSICLauncher.py | 90 +----- .../launch/MUSICMPILauncher.py | 169 +---------- .../launch/host/LocalLauncher.py | 45 +-- .../launch/host/__init__.py | 72 +---- .../tests/launch/host/test_interface.py | 2 +- .../tests/launch/host/test_local.py | 3 - .../tests/launch/test_mpi_launcher.py | 2 +- setVersion.sh | 4 +- 32 files changed, 1467 insertions(+), 585 deletions(-) create mode 100644 hbp_nrp_distributed_nest/MANIFEST.in create mode 100644 hbp_nrp_distributed_nest/README.txt create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/__init__.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNCommunicationAdapter.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNControlAdapter.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/__init__.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedCLEProcess.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedNestProcess.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/MPILauncher.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestBrainProcess.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestLauncher.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/__init__.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/host/LocalLauncher.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/host/__init__.py create mode 100644 hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/version.py create mode 100644 hbp_nrp_distributed_nest/requirements.txt create mode 100644 hbp_nrp_distributed_nest/requirements_extension_tests.txt create mode 100644 hbp_nrp_distributed_nest/setup.py diff --git a/Makefile b/Makefile index 4522ca3..83a5ee6 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,14 @@ #modules that have tests -TEST_MODULES=hbp_nrp_music_xml/hbp_nrp_music_xml hbp_nrp_music_interface/hbp_nrp_music_interface +TEST_MODULES=hbp_nrp_music_xml/hbp_nrp_music_xml hbp_nrp_music_interface/hbp_nrp_music_interface hbp_nrp_distributed_nest/hbp_nrp_distributed_nest #modules that are installable (ie: ones w/ setup.py) -INSTALL_MODULES=hbp_nrp_music_xml hbp_nrp_music_interface +INSTALL_MODULES=hbp_nrp_music_xml hbp_nrp_music_interface hbp_nrp_distributed_nest #packages to cover -COVER_PACKAGES=hbp_nrp_music_xml hbp_nrp_music_interface +COVER_PACKAGES=hbp_nrp_music_xml hbp_nrp_music_interface hbp_nrp_distributed_nest #documentation to build -#DOC_MODULES=hbp_nrp_music_xml/doc hbp_nrp_music_interface/doc +#DOC_MODULES=hbp_nrp_music_xml/doc hbp_nrp_music_interface/doc hbp_nrp_distributed_nest/doc ##### DO NOT MODIFY BELOW ##################### diff --git a/README b/README index 4183f85..1f3c88d 100644 --- a/README +++ b/README @@ -1 +1 @@ -MUSIC utilities and bindings for pyNN/NEST simulation support. +Distributed brain simulation packages. diff --git a/hbp_nrp_distributed_nest/MANIFEST.in b/hbp_nrp_distributed_nest/MANIFEST.in new file mode 100644 index 0000000..540b720 --- /dev/null +++ b/hbp_nrp_distributed_nest/MANIFEST.in @@ -0,0 +1 @@ +include requirements.txt \ No newline at end of file diff --git a/hbp_nrp_distributed_nest/README.txt b/hbp_nrp_distributed_nest/README.txt new file mode 100644 index 0000000..0fb5718 --- /dev/null +++ b/hbp_nrp_distributed_nest/README.txt @@ -0,0 +1,2 @@ +This package provides interfaces for standalone distributed Nest simulation without +any MUSIC dependencies for the CLE/ExDBackend. \ No newline at end of file diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/__init__.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/__init__.py new file mode 100644 index 0000000..b1b92b1 --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/__init__.py @@ -0,0 +1,5 @@ +""" +This package contains NRP specific implementations for multi-process, distributed Nest. +""" + +from hbp_nrp_distributed_nest.version import VERSION as __version__ # pylint: disable=W0611 diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNCommunicationAdapter.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNCommunicationAdapter.py new file mode 100644 index 0000000..6c803c4 --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNCommunicationAdapter.py @@ -0,0 +1,169 @@ +# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER +# This file is part of the Neurorobotics Platform software +# Copyright (C) 2014,2015,2016,2017 Human Brain Project +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END +""" +Extensions of the base CLE PyNNCommunicationAdapter to communicate with distributed +processes. Maxmimum code reuse and minimal duplication where possible. +""" +from hbp_nrp_cle.brainsim.pynn_nest.PyNNNestCommunicationAdapter import PyNNNestCommunicationAdapter +from hbp_nrp_cle.brainsim.pynn_nest.devices.__NestDeviceGroup import PyNNNestDevice + +from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess +import hbp_nrp_cle.tf_framework.config as tf_config + +from mpi4py import MPI + +import logging +import time + +logger = logging.getLogger(__name__) + + +class DistributedPyNNCommunicationAdapter(PyNNNestCommunicationAdapter): + """ + Represents a distributed Nest communication adapter for the neuronal simulation + """ + + def initialize(self): + """ + Marks the adapter as initialized. + """ + logger.info("Distributed Nest communication adapter initialized") + super(DistributedPyNNCommunicationAdapter, self).initialize() + + def register_spike_source(self, populations, spike_generator_type, **params): + """ + Intercepts default PyNNNestCommunicationAdapter request and notifies all other + processes to do the same. + + :param populations: A reference to the populations to which the spike generator + should be connected + :param spike_generator_type: A spike generator type (see documentation + or a list of allowed values) + :param params: A dictionary of configuration parameters + :return: A communication object or a group of objects + """ + ts = self.__notify_processes_register('source', populations, spike_generator_type, params) + + device = super(DistributedPyNNCommunicationAdapter, self). \ + register_spike_source(populations, spike_generator_type, **params) + device.timestep = ts + + # mark the device as MPI-aware, only used by Nest-specific devices + if isinstance(device, PyNNNestDevice): + setattr(device, 'mpi_aware', True) + return device + + def register_spike_sink(self, populations, spike_detector_type, **params): + """ + Intercepts default PyNNNestCommunicationAdapter request and notifies all other + processes to do the same. + + :param populations: A reference to the populations which should be connected + to the spike detector + :param spike_detector_type: A spike detector type (see documentation + for a list of allowed values) + :param params: A dictionary of configuration parameters + :return: A Communication object or a group of objects + """ + ts = self.__notify_processes_register('sink', populations, spike_detector_type, params) + + device = super(DistributedPyNNCommunicationAdapter, self). \ + register_spike_sink(populations, spike_detector_type, **params) + device.timestep = ts + + # mark the device as MPI-aware, only used by Nest-specific devices + if isinstance(device, PyNNNestDevice): + setattr(device, 'mpi_aware', True) + return device + + def unregister_spike_source(self, device): + """ + Disconnects and unregisters the given spike generator device and notifies all other + processes to do the same. + + :param device: The spike generator device to deregister. + """ + self.__notify_processes_unregister(device.timestep) + super(DistributedPyNNCommunicationAdapter, self).unregister_spike_source(device) + + def unregister_spike_sink(self, device): + """ + Disconnects and unregisters the given spike detector device and notifies all other + processes to do the same. + + :param device: The spike detector device to deregister. + """ + self.__notify_processes_unregister(device.timestep) + super(DistributedPyNNCommunicationAdapter, self).unregister_spike_sink(device) + + @staticmethod + def __notify_processes_register(kind, populations, device, params): + """ + Notify remote MPI Brain Processes that they must complete this transfer function + connection by duplicating the parameters for this device connection. + + :param kind Either 'source' or 'sink'. + :param populations The target population to connect to. + :param device The device to create. + :param params The dictionary of adapter params to use. + """ + + # timestamp the adapter creation, we have no other way of tracking it between + # the CLE and remote processes + timestep = int(round(time.time() * 1e6)) + + # population information to send to the remote MPI nodes, we can't pickle Populations + # directly and those references wouldn't be valid on the remote nodes anyway + if populations.label in tf_config.brain_root.__dict__: + label = populations.label + else: + label = populations.parent.label + mask = populations.mask if populations.mask else slice(0, len(populations), 1) + + # propagate the synapse creation parameters to all remote notes, they will run the same + # connection/creation commands after receiving these messages, guaranteed to be + # run from CLE MPI process 0 only + for rank in xrange(1, MPI.COMM_WORLD.Get_size()): + MPI.COMM_WORLD.send({'command': 'ConnectTF', 'type': kind, 'label': label, + 'mask': mask, 'device': device, 'timestep': timestep, + 'params': params}, + dest=rank, tag=NestBrainProcess.MPI_MSG_TAG) + + # return the timestep for the device in this process + return timestep + + @staticmethod + def __notify_processes_unregister(timestep): + """ + Notify remote MPI Brain Processes that they must delete transfer function + connection by duplicating the calls in this process. + + :param timestep The creation timestep of the TF to delete. + """ + + # propagate the deletion configuration to all other processes, guaranteed to be + # run from CLE MPI process 0 only + for rank in xrange(1, MPI.COMM_WORLD.Get_size()): + MPI.COMM_WORLD.send({'command': 'DeleteTF', 'timestep': timestep}, + dest=rank, tag=NestBrainProcess.MPI_MSG_TAG) diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNControlAdapter.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNControlAdapter.py new file mode 100644 index 0000000..d5c3c56 --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNControlAdapter.py @@ -0,0 +1,76 @@ +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END +""" +This module defined a CLE control adapter that notifies all remote brain processes +when they should step the simulation. +""" + +from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter +from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess + +from mpi4py import MPI + + +class DistributedPyNNControlAdapter(PyNNControlAdapter): + """ + This class is required as multi-threading the brain process to receive MPI messages + while blocking to step the brain causes segmentation faults in the CLE. The overhead + is minimal here and allows us to have dynamic behavior in the brain processes between + simulation steps. + """ + + def load_brain(self, network_file, **populations): + """ + Notify all remote brain processes to load the brain with population definitions + specified. + + TODO: the network_file will have to be accessed from common storage between + processes for any configuration other than local installataions + + :param network_file: The path to the python file containing the network + :param populations: A named list of populations to create + """ + + # notify all other processes, blocking send calls for them to receive + for rank in xrange(MPI.COMM_WORLD.Get_size()): + if rank == MPI.COMM_WORLD.Get_rank(): + continue + MPI.COMM_WORLD.send({'command': 'LoadBrain', 'file': network_file, + 'populations': populations}, + dest=rank, tag=NestBrainProcess.MPI_MSG_TAG) + + # run the actual brain load on this process + super(DistributedPyNNControlAdapter, self).load_brain(network_file, **populations) + + def run_step(self, dt): + """ + Notify all remote brain processes to run a simulation step, then run the step in + this process. + """ + + # notify all other processes, blocking send calls for them to receive + for rank in xrange(MPI.COMM_WORLD.Get_size()): + if rank == MPI.COMM_WORLD.Get_rank(): + continue + MPI.COMM_WORLD.send('step', dest=rank, tag=NestBrainProcess.MPI_MSG_TAG) + + # run the actual simulation step + super(DistributedPyNNControlAdapter, self).run_step(dt) diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/__init__.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/__init__.py new file mode 100644 index 0000000..56f6fbc --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/__init__.py @@ -0,0 +1,3 @@ +""" +This module contains CLE interface implementations. +""" diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedCLEProcess.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedCLEProcess.py new file mode 100644 index 0000000..7a59e4d --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedCLEProcess.py @@ -0,0 +1,159 @@ +# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER +# This file is part of the Neurorobotics Platform software +# Copyright (C) 2014,2015,2016,2017 Human Brain Project +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END +""" +This module contains the CLE process logic for the simulation assembly +""" + +import os +import argparse +import logging + +logger = logging.getLogger(__name__) + + +# pylint: disable=too-many-statements, too-many-locals +def launch_cle(argv, assembly_class): # pragma: no cover + """ + Launch the distributed CLE process with given assembly class. Process the command line and + handles all shutdown events to terminate other MPI processes. + + :param assembly_class An invokable CLEGazeboSimulationAssembly class definition to use. + """ + + # import MPI here, must be done after Nest/MUSIC in subclass adapters + from mpi4py import MPI + + simulation = None + + # exit code, 0 for success and -1 for any failures + mpi_returncode = 0 + try: + if os.environ["ROS_MASTER_URI"] == "": + raise Exception("You should run ROS first.") + + parser = argparse.ArgumentParser() + parser.add_argument('--exdconf', dest='exd_file', + help='specify the ExDConfiguration file', required=True) + parser.add_argument('--bibi', dest='bibi_file', + help='specify the BIBI file', required=True) + parser.add_argument('--environment', dest='environment_file', + help='specify the environment file', required=True) + parser.add_argument('--experiment-path', dest='path', + help='specify the base experiment path', required=True) + parser.add_argument('--gzserver-host', dest='gzserver_host', + help='the gzserver target host', required=True) + parser.add_argument('--reservation', dest='reservation', default=None, + help='cluster resource reservation', required=False) + parser.add_argument('--sim-id', dest='sim_id', type=int, + help='the simulation id to use', required=True) + parser.add_argument('--timeout', dest='timeout', + help='the simulation default time allocated', required=True) + parser.add_argument('--rng-seed', dest='rng_seed', + help='the global experiment RNG seed', required=True) + parser.add_argument('-v', '--verbose', action='store_true', + help='increase output verbosity') + + args = parser.parse_args(argv) + + # expand any parameters (e.g. NRP_EXPERIMENTS_DIRECTORY) in paths + args.exd_file = os.path.expandvars(args.exd_file) + args.environment_file = os.path.expandvars(args.environment_file) + args.path = os.path.expandvars(args.path) + + # simplified launch process below from ROSCLESimulationFactory.py, avoid circular depdency + # by importing here + import rospy + from hbp_nrp_cleserver.server import ROS_CLE_NODE_NAME + from hbp_nrp_cleserver.server.ROSCLESimulationFactory import set_up_logger + from hbp_nrp_cleserver.server.ROSCLESimulationFactory import get_experiment_data + + # reconfigure the logger to stdout as done in ROSCLESimulationFactory.py otherwise all + # output will be trapped by the ROS logger after the first ROS node is + # initialized + rospy.init_node(ROS_CLE_NODE_NAME, anonymous=True) + set_up_logger(None, args.verbose) + + exd, bibi = get_experiment_data(args.exd_file) + + # parse the timeout string command line argument into a valid datetime + import dateutil.parser as datetime_parser + timeout_parsed = datetime_parser.parse(args.timeout.replace('_', ' ')) + + # check the reservation argument, if empty default to None + if args.reservation == '': + args.reservation = None + + # override the experiment RNG seed with the command line value + exd.rngSeed = int(args.rng_seed) + + # construct the simulation with given assembly class + simulation = assembly_class(args.sim_id, + exd, + bibi, + gzserver_host=args.gzserver_host, + reservation=args.reservation, + timeout=timeout_parsed) + simulation.initialize(args.environment_file, None) + if simulation.cle_server is None: + raise Exception( + "Error in cle_function_init. Cannot start simulation.") + + # FIXME: This should be done more cleanly within the adapter, see [NRRPLT-4858] + # the tag is a magic number to avoid circular build/release dependency for now but + # this will be removed when the referenced bug is fixed + # notify MPI processes that configuration is complete + for rank in xrange(MPI.COMM_WORLD.Get_size()): + if rank != MPI.COMM_WORLD.Get_rank(): + MPI.COMM_WORLD.send('ready', dest=rank, tag=100) + MPI.COMM_WORLD.Barrier() + + logger.info('Starting CLE.') + simulation.run() # This is a blocking call, not to be confused with + # threading.Thread.start + + except Exception as e: # pylint: disable=broad-except + + # if running through MPI, catch Exception and terminate below to ensure brain processes + # are also killed + logger.error( + 'CLE aborted with message {}, terminating.'.format(e.message)) + # if no logger + print 'CLE aborted with message {}, terminating.'.format(e.message) + logger.exception(e) + mpi_returncode = -1 + + finally: + + # always attempt to shutdown the CLE launcher and release resources + if simulation: + logger.info('Shutting down CLE.') + simulation.shutdown() + logger.info('Shutdown complete, terminating.') + + # terminate the spawned brain processes + # send a shutdown message in case the brain processes are in a recv loop at startup since they + # seem to block and ignore the Abort command until receiving a message + for rank in xrange(MPI.COMM_WORLD.Get_size()): + MPI.COMM_WORLD.isend('shutdown', dest=rank, tag=100) + MPI.COMM_WORLD.Abort(mpi_returncode) diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedNestProcess.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedNestProcess.py new file mode 100644 index 0000000..a487d12 --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedNestProcess.py @@ -0,0 +1,85 @@ +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END +""" +This module contains the distributed Nest process logic and simulation assembly +""" + +# The Nest imports below somehow delete/inject command line arguments that cause +# issues with argparse in each of the launchers, save the valid arguments now and +# clear the argument list... +import sys +argv_backup = list(sys.argv[1:]) +sys.argv = [sys.argv[0]] + +import hbp_nrp_cle.brainsim.pynn_nest # pylint: disable=unused-import + +import logging + +from hbp_nrp_cleserver.server.CLEGazeboSimulationAssembly import CLEGazeboSimulationAssembly +from hbp_nrp_distributed_nest.cle.DistributedPyNNCommunicationAdapter \ + import DistributedPyNNCommunicationAdapter +from hbp_nrp_distributed_nest.cle.DistributedPyNNControlAdapter \ + import DistributedPyNNControlAdapter + +from hbp_nrp_distributed_nest.launch.DistributedCLEProcess import launch_cle +from hbp_nrp_distributed_nest.launch.NestBrainProcess import launch_brain + +import pyNN.nest as sim + +logger = logging.getLogger(__name__) + + +class DistributedCLESimulationAssembly(CLEGazeboSimulationAssembly): + """ + Defines the assembly of a distributed Nest simulation + """ + + def __init__(self, sim_id, exc, bibi_model, **par): + """ + Creates a new simulation assembly to simulate an experiment using the CLE and Gazebo + :param sim_id: The simulation id + :param exc: The experiment configuration + :param bibi_model: The BIBI configuration + """ + super(DistributedCLESimulationAssembly, self).__init__(sim_id, exc, bibi_model, **par) + + def _create_brain_adapters(self): + """ + Creates the adapter components for the neural simulator + + :return: A tuple of the communication and control adapter for the neural simulator + """ + logger.info('Using distributed configuration and adapters for CLE') + + # return the assembled control adapters + braincomm = DistributedPyNNCommunicationAdapter() + braincontrol = DistributedPyNNControlAdapter(sim) + return braincomm, braincontrol + +if __name__ == '__main__': # pragma: no cover + + # use the MPI process rank to determine if we should launch CLE or brain process + # both launch commands are blocking until shutdown occurs + from mpi4py import MPI + if MPI.COMM_WORLD.Get_rank() == 0: + launch_cle(argv_backup, DistributedCLESimulationAssembly) + else: + launch_brain(argv_backup) diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/MPILauncher.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/MPILauncher.py new file mode 100644 index 0000000..f5454c7 --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/MPILauncher.py @@ -0,0 +1,215 @@ +# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER +# This file is part of the Neurorobotics Platform software +# Copyright (C) 2014,2015,2016,2017 Human Brain Project +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END + +""" +Helper class to build and execute a formatted mpirun command in the format: + + mpirun -envlist <vars> -np <proc> -host <hostname/ip> -wdir <temporary work dir> <command> : ... + +where each of the hosts has a specific working directory with necessary config files already +in place. Also passes environment variables required for NRP/CLE execution. +""" +import logging +import os +import subprocess +import time + +import rospy +import rosnode +from std_msgs.msg import String +import json + +from hbp_nrp_cle import config + +logger = logging.getLogger(__name__) + + +class MPILauncher(object): + """ + Class constructs and executes the mpi launch command. + """ + + def __init__(self, executable): + self._exe = executable + + self._hosts = [] + self._cmd = None + self._process = None + self._launched = False + self._gazebo_master_uri = None + + def add_host(self, hostname, tmpdir, processes=1): + """ + Add a target host to the mpi launch configuration. + + :param hostname The remote host name or ip. + :param tmpdir A valid temporary directory on the remote host to launch in. + :param processes The number of processes for this host. + """ + self._hosts.append('-np {p} -host {h} -wdir {t}' + .format(p=processes, h=hostname, t=tmpdir)) + + def build(self): + """ + Construct the mpirun command line string with all hosts provided. + """ + + if len(self._hosts) == 0: + raise Exception('No target host configurations specified for MPI processes!') + + # build the command with per host specific configuration + hoststr = ' : '.join(['{h} {e}'.format(h=host, e=self._exe) for host in self._hosts]) + self._cmd = 'mpirun {}'.format(hoststr) + + def launch(self): + """ + Launch the mpirun command and wait for successful startup of the CLE. Blocks until the + CLE publishes completion on the status topic or if the mpirun command aborts. + """ + + if not self._cmd: + raise Exception('No command set for MPI processes, build() was not called!') + + # provide extra ssh flags if ssh is used on the vizcluster to ensure we can spawn + # other ssh sessions / processes for Gazebo/etc. + env_vars = dict(os.environ, HYDRA_LAUNCHER_EXTRA_ARGS="-M -K") + + # Spawn the mpirun command, we need to do a few special things because mvapich2 will + # send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this + # would cause the entire Python stack to stop and create <defunct> processes. + logger.info("Spawning MPI processes: {}".format(self._cmd)) + self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang + preexec_fn=os.setsid, # create a new session/tree + stdin=subprocess.PIPE, # hangs without valid stdin + env=env_vars) # environment variables + + # wait until the CLE subprocess initializes properly or fails, the conditions are: + # failure if process exits (bad command, failure to launch, etc.) + # success if the CLE publishes a status update indicating loading is complete + + # subscribe for loading status updates from the CLE, wait for completion message + status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status) + + # wait for the process to abort or be successfully launched + while not self._process.poll() and not self._launched: + + # ensure gzbridge is running and accessible in deployed configurations + self._check_gzbridge() + + # very short sleep to be as responsive as possible + time.sleep(0.1) + + # disconnect the temporary status subscriber + status_sub.unregister() + + # launch has failed, propagate an error to the user + if not self._launched: + raise Exception('Distributed MPI launch failure, aborting.\nPlease contact ' + 'neurorobotics@humanbrainproject.eu if this problem persists.') + + def _on_status(self, msg): + """ + Listen for CLE status messages, parse the JSON format that is also sent to the frontend + progress bar. Wait for completion of the CLE loading task and mark the MPI process as + successfully launched. + + :param msg The ros message to parse. + """ + + status = json.loads(msg.data) + + # ignore status bar messages, these don't give us information if a launch has failed + # since the loading task will always be set to 'done' when it aborts + if 'progress' in status: + return + + # received a simulation status "tick", everything has launched successfully + self._launched = True + + def _check_gzbridge(self): + """ + gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients + that are configured and able to reach the backend machines. If Gazebo is launched on a + remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start + gzbridge on this host (a backend server). + """ + + if self._gazebo_master_uri: + return + + # request the ip of the Gazebo node, result will be -1 if not found + res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True) + if res == -1: + return + + # replace the ROS port with the Gazebo port, configure env, and run gzbridge + self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345' + + # only need to start the gzbridge if running in a deployed configuration + if '127.0.0.1' not in self._gazebo_master_uri: + + # this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we + # cannot import and reuse due to circular dependencies + os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri + os.system(config.config.get('gzbridge', 'restart-cmd')) + + def run(self): + """ + Block until the mpirun command exits. Check the return code to determine if it was + successful or aborted. Backwards compatibility in naming convention, must be run. + """ + + if not self._process or not self._launched: + raise Exception('No MPI process launched, cannot run until launch() is called.') + + # wait until it terminates, if launch fails or is killed externally, propagate an + # Exception to notify the simulation factory to shutdown since the launch/run has failed + if self._process.wait() != 0: + raise Exception('Distributed MPI runtime failure, aborting.\nPlease contact ' + 'neurorobotics@humanbrainproject.eu if this problem persists.') + + def shutdown(self): + """ + Attempt to forecfully shutdown the mpirun command if it is still running and has not + cleanly shut itself down. Guaranteed to be called after launch success or failure. + """ + + # try to terminate the mpirun command, mpirun will automatically exit nautrally when all + # of its spawned child processes exit or are killed, so this isn't explicitly necessary + if self._process: + try: + self._process.kill() + except OSError: # the process has already cleanly terminated + pass + + # terminate the gzbrige process/websocket if we started it above + if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri: + os.system(config.config.get('gzbridge', 'stop-cmd')) + + # reset all class variables to prevent class reuse + self._gazebo_master_uri = None + self._launched = False + self._process = None + self._cmd = None + self._exe = None diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestBrainProcess.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestBrainProcess.py new file mode 100644 index 0000000..a94e1cf --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestBrainProcess.py @@ -0,0 +1,279 @@ +# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER +# This file is part of the Neurorobotics Platform software +# Copyright (C) 2014,2015,2016,2017 Human Brain Project +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END +""" +A distributed brain process that can be launched standalone on remote hosts. +""" +from hbp_nrp_cle.brainsim import config + +import pyNN.nest as sim +import nest +import hbp_nrp_cle.tf_framework.config as tf_config + +from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter +from hbp_nrp_cle.brainsim.pynn_nest.PyNNNestCommunicationAdapter import PyNNNestCommunicationAdapter +from hbp_nrp_cle.cle.ClosedLoopEngine import ClosedLoopEngine + +from hbp_nrp_cle.brainsim.pynn_nest.devices.__NestDeviceGroup import PyNNNestDevice +from hbp_nrp_cle.tf_framework._CleanableTransferFunctionParameter import \ + ICleanableTransferFunctionParameter + +from hbp_nrp_commons.generated import bibi_api_gen +from hbp_nrp_commons.bibi_functions import get_all_neurons_as_dict + +import argparse +import os + +from collections import OrderedDict + +from mpi4py import MPI +import traceback + + +class NestBrainProcess(object): + """ + A distributed Nest brain process that can be launched standalone on remote hosts. + """ + + # tag to listen for MPI configuration/command messages + MPI_MSG_TAG = 100 + + def __init__(self, bibi_file, rng_seed): + """ + Nest will automatically allocate the brain in a round-robin + fashion under the hood, we do not need to do anything explicitly. + + :param bibi_file The absolute path to the BIBI file for this experiment. + """ + + # load the bibi, extract specified populations and brain file to be used + with open(bibi_file) as f: + self._bibi = bibi_api_gen.CreateFromDocument(f.read()) + pop_dict = get_all_neurons_as_dict(self._bibi.brainModel.populations) + + # load the models path on this node and set absolute bibi path + models_path = os.environ.get('NRP_MODELS_DIRECTORY') + if models_path is None: + raise Exception("Unable to determine bibi path, NRP_MODELS_DIRECTORY is not " + "defined on target node!") + brain_file = os.path.join(models_path, self._bibi.brainModel.file) + + # set the RNG seed before initializing any PyNN interfaces in the brain controller + config.rng_seed = int(rng_seed) + + # extract the simulation timestep from the BIBI if set or default to CLE value (in ms) + self._timestep = ClosedLoopEngine.DEFAULT_TIMESTEP * 1000.0 + if self._bibi.timestep: + self._timestep = float(self._bibi.timestep) + + # spawn CLE components that will handle loading the brain file and interfaces + self._brain_controller = PyNNControlAdapter(sim) + self._brain_controller.initialize() + self._brain_controller.load_brain(brain_file, **pop_dict) + + # spawn the communication adapter for use as needed + self._brain_communicator = PyNNNestCommunicationAdapter() + self._brain_communicator.initialize() + + # store created devices, to be used when we can add/delete them dynamically + self.devices = OrderedDict() + + # status variables for this process + self._ready = False + self._running = False + + #pylint: disable=too-many-branches + def run(self): + """ + Blocking run loop for this brain process. First accept any transfer function configuration + via MPI messages and then block running the brain until terminated externally by the CLE + shutting down. + """ + + # listen for transfer function creation messages until the CLE tells us to start + self._ready = False + self._running = True + + # run until shutdown or an Exception is raised + while self._running: + + # block and read messages from the CLE + data = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=NestBrainProcess.MPI_MSG_TAG) + + # new transfer function, dictionary of parameters + if isinstance(data, dict): + + if 'command' not in data: + raise Exception('Remote brain process received unknown message: %s' % str(data)) + command = data['command'] + + # create and connect a TF + if command == 'ConnectTF': + self._connect_tf(data) + + # delete a previously created TF + elif command == 'DeleteTF': + self._delete_tf(data) + + # load/reload the brain and populations + elif command == 'LoadBrain': + self._load_brain(data) + + # handle updates that use Nest directly (PyNNNest adapters) + elif command == 'SetStatus': + nest.SetStatus(data['ids'], data['params']) + + # command and control string from the CLE + elif data == 'ready': + + # TF loading is complete, we can start the simulation + self._ready = True + + # notify all processes start at the same time (all Nest commands barrier themselves) + MPI.COMM_WORLD.Barrier() + + # step the simulation, commanded by the CLE + elif data == 'step': + + # run the coordinated simulation step + self._brain_controller.run_step(self._timestep) + self._brain_communicator.refresh_buffers(0.0) + + # CLE abort/shutdown, abandon the recv loop which may block MPI_ABORT commands + elif data == 'shutdown': + + # handle shutdown event from CLE, no barrier to prevent blocking on abort + self._running = False + + # unknown message, this is a critical failure since this should never happen + # fully abort and log the condition + else: + raise Exception('Remote brain process received unknown message: %s' % str(data)) + + def _connect_tf(self, params): + """ + Reflect a transfer function connection made on the CLE side by performing the same + connection on this side. + + :param params The connectivity/synapse parameters passed by the CLE. + """ + + # get the population of neurons from our dictionary, we can guarantee this is a valid + # population that has been declared in the BIBI at this point as the CLE will validate + # before sending us the create message + brain_pop = sim.PopulationView(tf_config.brain_root.__dict__[params['label']], + selector=params['mask']) + + # perform the actual device creation/connection + if params['type'] == 'sink': + device = self._brain_communicator.register_spike_sink(brain_pop, + params['device'], + **params['params']) + else: + device = self._brain_communicator.register_spike_source(brain_pop, + params['device'], + **params['params']) + + # mark the timestamp from CLE process for tracking between processes + device.timestep = params['timestep'] + + # mark the device as MPI-aware, only used by Nest-specific devices + if isinstance(device, PyNNNestDevice): + setattr(device, 'mpi_aware', True) + + # store the device in a way that we can easily retrieve later + self.devices[device.timestep] = device + + def _delete_tf(self, params): + """ + Disconnect the specified device and remove it from tracking. + + :param params The device parameters passed by the CLE. + """ + + device = self.devices.pop(params['timestep'], None) + if device is not None: + if device in self._brain_communicator.generator_devices: + self._brain_communicator.unregister_spike_source(device) + else: + self._brain_communicator.unregister_spike_sink(device) + + if isinstance(device, ICleanableTransferFunctionParameter): + device.cleanup() + + def _load_brain(self, params): + """ + Load/reload the brain file and population definitions. + + :param params The brain parameters passed by the CLE. + """ + + # ignore any commands during simulation construction + if not self._ready: + return + + # mirror the CLE brain loading order, the CLE process will handle errors + # preserve the brain communicator if there is an error in the brain to mirror the CLE + try: + # discard any previously loaded brain and load the new spec + self._brain_controller.shutdown() + self._brain_controller.load_brain(params['file'], **params['populations']) + + # reinitialize the communication adapter to connect to the "new" brain + self._brain_communicator.shutdown() + self._brain_communicator.initialize() + + # remove the list of old TF devices, no need to shut them down + self.devices.clear() + + # pylint: disable=bare-except + except: + traceback.print_exc() + + +def launch_brain(argv): # pragma: no cover + """ + Load the brain process for communication with the master CLE process, run the simulation + blocking until the MPI processes are terminated by CLE shutdown. + """ + + try: + parser = argparse.ArgumentParser() + parser.add_argument('--bibi', dest='bibi_file', + help='the bibi file path to load', required=True) + parser.add_argument('--rng-seed', dest='rng_seed', + help='the global experiment RNG seed', required=True) + args, _ = parser.parse_known_args(argv) + + # construct brain and proxies (expand environment variables in paths) + brain = NestBrainProcess(os.path.expandvars(args.bibi_file), args.rng_seed) + + # run the brain until terminated, this is a blocking call + brain.run() + + except Exception: # pylint: disable=broad-except + # print the traceback which should go back to the remote logger + traceback.print_exc() + + # for any failures, terminate all other brain processes and the CLE + MPI.COMM_WORLD.Abort(-1) diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestLauncher.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestLauncher.py new file mode 100644 index 0000000..dfb2230 --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestLauncher.py @@ -0,0 +1,173 @@ +# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER +# This file is part of the Neurorobotics Platform software +# Copyright (C) 2014,2015,2016,2017 Human Brain Project +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END +""" +Setup, build, and launch a distributed Nest instance that will spawn the CLE and +requested brain processes. +""" + +from hbp_nrp_distributed_nest.launch.host.LocalLauncher import LocalLauncher +from hbp_nrp_distributed_nest.launch.MPILauncher import MPILauncher + +import os +import random +import sys + + +# This class intentionally does not inherit SimulationServer (even though it is an implementation of +# it) in order to avoid duplicate notificators +class NestLauncher(object): + """ + Setup, build, and launch a distributed Nest instance that will spawn the CLE and + requested brain processes. + """ + + def __init__(self, sim_id, exc, bibi, **par): + """ + Store all experiment configuration parameters so that they can be propagated + to the remote hosts. + + :param exc: the experiment configuration + :param bibi: the BIBI configuration. + :param server_host Target Gazebo/brain process host (e.g. local or lugano) + :param reservation Reservation string for cluster backend (None is a valid option) + :param sim_id The id of the simulation/experiment to be launched. + :param timeout The default simulation timeout (time initially allocated). + """ + + # we need to replace absolute paths with relative environment variable-based paths for the + # remote hosts that may not share the same file structure + nrp_experiments_path = os.environ.get('NRP_EXPERIMENTS_DIRECTORY').rstrip('/') + + self._exd_file = exc.path.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') + self._bibi_file = bibi.path.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') + self._exp_path = exc.dir.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') + + # store the other launch parameters as provided + self._server_host = par['gzserver_host'] + self._reservation = par['reservation'] + self._sim_id = sim_id + self._timeout = par['timeout'] + + self._exc = exc + self._bibi = bibi + self._env_file = None + + # extract the multiprocess RNG seed to use or generate one if needed + self._rng_seed = self._exc.rngSeed if self._exc.rngSeed is not None else \ + random.randint(1, sys.maxint) + + # host specific launch configuration/allocation + self._launcher = None + + # the MPI process launcher for the CLE and brain processes + self.mpilauncher = None + + # we should call the except_hook when something goes wrong in the simulation, but currently + # we don't + # pylint: disable=unused-argument + def initialize(self, environment_file, except_hook): + """ + Construct the launch configuration that will spawn CLE + brain processes + on distributed hosts. + """ + + # extract the environment file path + nrp_models_path = os.environ.get('NRP_MODELS_DIRECTORY').rstrip('/') + self._env_file = environment_file.replace(nrp_models_path, '$NRP_MODELS_DIRECTORY') + + # create a host specific launcher + if self._server_host == 'local': + self._launcher = LocalLauncher() + else: + raise Exception('Unsupported server host {}, cannot configure distributed launch!' + .format(self._server_host)) + + # command line argument friendly versions of timeout and reservation arguments + # the receiving processes must understand how to convert these back + reservation_str = self._reservation if self._reservation else '' + timeout_str = str(self._timeout).replace(' ', '_') + + # construct the actual MPI launcher with the process that determines if the CLE or + # standalone brain should be launched + args = ['--exdconf={}'.format(self._exd_file), + '--bibi={}'.format(self._bibi_file), + '--environment={}'.format(self._env_file), + '--experiment-path={}'.format(self._exp_path), + '--gzserver-host={}'.format(self._server_host), + '--reservation={}'.format(reservation_str), + '--sim-id={}'.format(self._sim_id), + '--timeout={}'.format(timeout_str), + '--rng-seed={}'.format(self._rng_seed)] + exe = '{python} -m hbp_nrp_distributed_nest.launch.DistributedNestProcess {args}'\ + .format(python=sys.executable, args=' '.join(args)) + self.mpilauncher = MPILauncher(exe) + + # build and deploy configuration + self._build() + + def _build(self): + """ + Perform launcher and MPI build and deployment, can be invoked by subclasses after their + implementation specific initialize. + """ + + # deploy the generated configuration files / launch scripts to the target host + self._launcher.deploy() + + # construct the actual MPI launcher based on the deployed configuration + self.mpilauncher.add_host(self._launcher.hostname, + self._launcher.host_tmpdir, + self._exc.bibiConf.processes + 1) + + # construct the mpi command line with the above host/launch information + self.mpilauncher.build() + + # for error propagation reasons, we have to launch and init the MPI processes to emulate + # the behavior of the single process launcher, if the mpirun command fails or the CLE/brain + # processes fail then the error will be properly propagated + self.mpilauncher.launch() + + def run(self): + """ + Runs the assembled simulation + """ + self.mpilauncher.run() + + def shutdown(self): + """ + Shutdown all spawned processes and cleanup temporary files. + """ + + # terminate the mpirun command (if it is still running) + if self.mpilauncher is not None: + self.mpilauncher.shutdown() + self.mpilauncher = None + + # perform any launcher host specific cleanup + if self._launcher: + self._launcher.shutdown() + self._launcher = None + + # finally, cleanup the roscore and any registrations launched by the above + os.system("echo 'y' | timeout -s SIGKILL 10s rosnode cleanup >/dev/null 2>&1") diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/__init__.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/__init__.py new file mode 100644 index 0000000..e9d94fc --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/__init__.py @@ -0,0 +1,3 @@ +""" +This package contains MPI launch implementation for distributed Nest. +""" diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/host/LocalLauncher.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/host/LocalLauncher.py new file mode 100644 index 0000000..798b692 --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/host/LocalLauncher.py @@ -0,0 +1,79 @@ +# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER +# This file is part of the Neurorobotics Platform software +# Copyright (C) 2014,2015,2016,2017 Human Brain Project +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END +""" +localhost launch target configuration. +""" + +from hbp_nrp_distributed_nest.launch.host import IHostLauncher + +import os +import shutil +import tempfile + + +class LocalLauncher(IHostLauncher): + """ + This launch configuration targets the localhost for all processes and is suitable for local + installs or deployed installs where the newly spawned processes can run on the same host + as the REST backend. + """ + + def __init__(self): + """ + Create a local launcher to target localhost. + """ + super(LocalLauncher, self).__init__() + + # this launcher only targets the localhost + self._hostname = 'localhost' + + # create a temporary directory for configuration files (the same for local/host) + self._local_tmpdir = tempfile.mkdtemp() + self._host_tmpdir = self._local_tmpdir + + def create_launch_scripts(self): + """ + Nothing to create since the target is localhost. + """ + pass + + def deploy(self): + """ + Nothing to deploy since the target is localhost. + """ + pass + + def shutdown(self): + """ + Shutdown by trying to kill any running processes and deleting the temporary directory. + """ + + if self._local_tmpdir is not None and os.path.exists(self._local_tmpdir): + + # finally, delete the directory + shutil.rmtree(self._local_tmpdir) + + # even though this class should not be reused, unset the tmpdir + self._local_tmpdir = None + self._host_tmpdir = None diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/host/__init__.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/host/__init__.py new file mode 100644 index 0000000..790732f --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/host/__init__.py @@ -0,0 +1,73 @@ +""" +This package contains host specific implementations for different distributed +simulation targets. +""" + + +class IHostLauncher(object): + """ + A generic interface to implement a host specific launcher. Guarantees necessary property and + functions are accessible in inherited classes. + """ + + def __init__(self): + """ + Interface constructor, guarantee that the hostname and tmpdir properties are available. + """ + self._hostname = None + self._local_tmpdir = None + self._host_tmpdir = None + + @property + def hostname(self): + """ + Return the target host for the launcher implementation. Raise an exception if the host + specific implementation does not set the hostname value. + """ + if not self._hostname: + raise NotImplementedError('Host specific implementation did not set target hostname!') + return self._hostname + + @property + def local_tmpdir(self): + """ + Return the temporary configuration directory that can be used to write + configuration files. Raise an exception if the host specific implementation does not set + the tmpdir value. + """ + if not self._local_tmpdir: + raise NotImplementedError('Host specific implementation did not set temp directory!') + return self._local_tmpdir + + @property + def host_tmpdir(self): + """ + Return the temporary execution directory on the host that contains all necessary + configuration files. Raise an exception if the host specific implementation does not set + the tmpdir value. + """ + if not self._host_tmpdir: + raise NotImplementedError('Host specific implementation did not set temp directory!') + return self._host_tmpdir + + def create_launch_scripts(self): + """ + Create a set of launch scripts for the CLE and individual brain processes with specific + implementations required for each host. Write the launch scripts to the temporary directory + for this launcher. + + Returns a tuple (path to CLE launch script, path to BrainProcess launc script). + """ + raise NotImplementedError('Host specific implementation is missing!') + + def deploy(self): + """ + Deploy the temporary directory contents to the target host if necessary. + """ + raise NotImplementedError('Host specific implementation is missing!') + + def shutdown(self): + """ + Shutdown and cleanup any host specific configuration. + """ + raise NotImplementedError('Host specific implementation is missing!') diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/version.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/version.py new file mode 100644 index 0000000..5011fe8 --- /dev/null +++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/version.py @@ -0,0 +1,2 @@ +'''version string - generated by setVersion.sh''' +VERSION = '1.3.0' diff --git a/hbp_nrp_distributed_nest/requirements.txt b/hbp_nrp_distributed_nest/requirements.txt new file mode 100644 index 0000000..f9d1c29 --- /dev/null +++ b/hbp_nrp_distributed_nest/requirements.txt @@ -0,0 +1,2 @@ +# third party requirements +mpi4py==2.0.0 diff --git a/hbp_nrp_distributed_nest/requirements_extension_tests.txt b/hbp_nrp_distributed_nest/requirements_extension_tests.txt new file mode 100644 index 0000000..031ce95 --- /dev/null +++ b/hbp_nrp_distributed_nest/requirements_extension_tests.txt @@ -0,0 +1,3 @@ +#the following is required for the unit testing +mock==1.0.1 +testfixtures==3.0.2 diff --git a/hbp_nrp_distributed_nest/setup.py b/hbp_nrp_distributed_nest/setup.py new file mode 100644 index 0000000..0778ff3 --- /dev/null +++ b/hbp_nrp_distributed_nest/setup.py @@ -0,0 +1,54 @@ +'''setup.py''' + +from setuptools import setup + +import hbp_nrp_distributed_nest +import pip + +from pip.req import parse_requirements +from optparse import Option +options = Option('--workaround') +options.skip_requirements_regex = None +reqs_file = './requirements.txt' +# Hack for old pip versions +# Versions greater than 1.x have a required parameter "session" in +# parse_requirements +if pip.__version__.startswith('1.'): + install_reqs = parse_requirements(reqs_file, options=options) +else: + from pip.download import PipSession # pylint:disable=no-name-in-module + options.isolated_mode = False + install_reqs = parse_requirements( # pylint:disable=unexpected-keyword-arg + reqs_file, + session=PipSession, + options=options + ) +reqs = [str(ir.req) for ir in install_reqs] + +# ensure we install numpy before the main list of requirements, ignore +# failures if numpy/cython are not requirements and just proceed (future proof) +try: + cython_req = next(r for r in reqs if r.startswith('cython')) + numpy_req = next(r for r in reqs if r.startswith('numpy')) + pip.main(['install', '--no-clean', cython_req, numpy_req]) +# pylint: disable=bare-except +except: + pass + +config = { + 'description': 'Distributed Nest interface support for CLE/ExDBackend for HBP SP10', + 'author': 'HBP Neurorobotics', + 'url': 'http://neurorobotics.net', + 'author_email': 'neurorobotics@humanbrainproject.eu', + 'version': hbp_nrp_distributed_nest.__version__, + 'install_requires': reqs, + 'packages': ['hbp_nrp_distributed_nest', + 'hbp_nrp_distributed_nest.cle', + 'hbp_nrp_distributed_nest.launch', + 'hbp_nrp_distributed_nest.launch.host'], + 'scripts': [], + 'name': 'hbp-nrp-distributed-nest', + 'include_package_data': True, +} + +setup(**config) diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNCommunicationAdapter.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNCommunicationAdapter.py index 3b5b618..7359250 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNCommunicationAdapter.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNCommunicationAdapter.py @@ -31,7 +31,7 @@ import pyNN.nest as sim import hbp_nrp_cle.tf_framework.config as tf_config from hbp_nrp_music_interface.bibi.bibi_music_config import MUSICConfiguration -from hbp_nrp_music_interface.launch.MUSICBrainProcess import MUSICBrainProcess +from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess from mpi4py import MPI @@ -166,12 +166,9 @@ class MUSICPyNNCommunicationAdapter(PyNNNestCommunicationAdapter): if rank == MPI.COMM_WORLD.Get_rank(): continue - MPI.COMM_WORLD.send({'label': label, 'mask': mask, 'synapses': synapse_params}, - dest=rank, tag=MUSICBrainProcess.MPI_MSG_TAG) - - # wait for the remote brain processes before continuing, use the global MPI comm - # handle that include all of the MUSIC processes, not just the local group ones - MPI.COMM_WORLD.Barrier() + MPI.COMM_WORLD.send({'command': 'ConnectTF', 'label': label, 'mask': mask, + 'synapses': synapse_params}, + dest=rank, tag=NestBrainProcess.MPI_MSG_TAG) def __get_population_proxy(self, population, proxy_type): """ diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py index 955e4f6..2d003a6 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py @@ -26,7 +26,7 @@ Extensions of the base CLE PyNNControlAdapter to use MUSIC specific proxies instead of direct population access. Maxmimum code reuse and minimal duplication where possible. """ -from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter +from hbp_nrp_distributed_nest.cle.DistributedPyNNControlAdapter import DistributedPyNNControlAdapter from hbp_nrp_music_interface.cle import MUSICBrainLoader import music @@ -36,7 +36,7 @@ import os logger = logging.getLogger(__name__) -class MUSICPyNNControlAdapter(PyNNControlAdapter): +class MUSICPyNNControlAdapter(DistributedPyNNControlAdapter): """ Represents a MUSIC/PyNN proxy controller object for the neuronal simulator """ @@ -46,7 +46,7 @@ class MUSICPyNNControlAdapter(PyNNControlAdapter): Load MUSIC/PyNN brain proxy populations rather than loading and instantiating the brain - overrides functionality for both python and h5 brains. - :param network_file: The path to the .h5 file containing the network + :param network_file: The path to the python file containing the network :param populations: A named list of populations to create """ self.__load_music_brain_proxies(**populations) diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py index 35560fd..1def9d8 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py @@ -22,17 +22,15 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # ---LICENSE-END """ -A distributed brain process that can be launched standalone on remote hosts. +A distributed MUSIC brain process that can be launched standalone on remote hosts. """ -from hbp_nrp_cle.brainsim import config +import hbp_nrp_cle.brainsim.pynn_nest # pylint: disable=unused-import + +from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess + import pyNN.nest as sim -from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter -from hbp_nrp_cle.cle.ClosedLoopEngine import ClosedLoopEngine import hbp_nrp_cle.tf_framework.config as tf_config -from hbp_nrp_commons.generated import bibi_api_gen -from hbp_nrp_commons.bibi_functions import get_all_neurons_as_dict - from hbp_nrp_music_xml.pynn.factory import PyNNProxyFactory from hbp_nrp_music_xml.pynn.connector_factory import PyNNConnectorFactory from hbp_nrp_music_xml.pynn.xml_factory import XmlFactory @@ -45,14 +43,11 @@ from mpi4py import MPI import traceback -class MUSICBrainProcess(object): +class MUSICBrainProcess(NestBrainProcess): """ - A distributed brain process that can be launched standalone on remote hosts. + A distributed MUSIC brain process that can be launched standalone on remote hosts. """ - # tag to listen for MPI configuration/command messages - MPI_MSG_TAG = 100 - def __init__(self, bibi_file, rng_seed): """ Load the distributed brain and construct MUSIC proxies for communication @@ -62,27 +57,11 @@ class MUSICBrainProcess(object): :param bibi_file The absolute path to the BIBI file for this experiment. """ + # setup MUSIC before any brain loading attempt self._music_setup = music.Setup() - # load the bibi, extract specified populations and brain file to be used - with open(bibi_file) as f: - self._bibi = bibi_api_gen.CreateFromDocument(f.read()) - pop_dict = get_all_neurons_as_dict(self._bibi.brainModel.populations) - - # load the models path on this node and set absolute bibi path - models_path = os.environ.get('NRP_MODELS_DIRECTORY') - if models_path is None: - raise Exception("Unable to determine bibi path, NRP_MODELS_DIRECTORY is not " - "defined on target node!") - brain_file = os.path.join(models_path, self._bibi.brainModel.file) - - # set the RNG seed before initializing any PyNN interfaces in the brain controller - config.rng_seed = int(rng_seed) - - # spawn CLE components that will handle loading the brain file and interfaces - self._brain_controller = PyNNControlAdapter(sim) - self._brain_controller.initialize() - self._brain_controller.load_brain(brain_file, **pop_dict) + # load the brain and set parameters + super(MUSICBrainProcess, self).__init__(bibi_file, rng_seed) # load the MUSIC proxies for the spawned brain music_path = os.environ.get('NRP_MUSIC_DIRECTORY') @@ -99,59 +78,6 @@ class MUSICBrainProcess(object): self._proxies = xml_factory.create_proxies(music_xml) - # extract the simulation timestep from the BIBI if set or default to CLE value (in ms) - self._timestep = ClosedLoopEngine.DEFAULT_TIMESTEP * 1000.0 - if self._bibi.timestep: - self._timestep = float(self._bibi.timestep) - - def run(self): - """ - Blocking run loop for this brain process. First accept any transfer function configuration - via MPI messages and then block running the brain until terminated externally by the CLE - shutting down. - """ - - # listen for transfer function creation messages until the CLE tells us to start - # NOTE: this is done on the same thread because MVAPICH2 defaults to single threaded - # when compiled for performance reasons, attempting to read/write to the MPI - # communicator on other threads causes freezing/segfault so don't do it unless - # we change how MVAPICH2 is compiled! - ready = False - while not ready: - - # block and read messages from the CLE - data = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=MUSICBrainProcess.MPI_MSG_TAG) - - # new transfer function, dictionary of parameters - if isinstance(data, dict): - - # create the tf based on the proxy direction from the CLE - self._connect_tf(data) - - # command and control string from the CLE - elif data == 'ready': - - # TF loading is complete, we can start the simulation - ready = True - - # CLE launch has aborted, abandon the recv loop which may block MPI_ABORT commands - elif data == 'shutdown': - - # raise an Exception to terminate immediately - raise Exception('Received shutdown command from the CLELauncher, aborting!') - - # unknown message, this is a critical failure since this should never happen - # fully abort and log the condition - else: - raise Exception('Remote brain process received unknown message: %s' % str(data)) - - # ensure the CLE and other brain processes have all processed the message - MPI.COMM_WORLD.Barrier() - - # run until forcefully terminated - while True: - self._brain_controller.run_step(self._timestep) - def _connect_tf(self, params): """ Reflect a transfer function connection made on the CLE side by connecting proxy neurons @@ -161,6 +87,10 @@ class MUSICBrainProcess(object): :param params The connectivity/synapse parameters passed by the CLE. """ + # connections only supported during simulation construction + if self._ready: + raise Exception("The distributed MUSIC-Nest implementation does not dynamic TFs!") + # get the population of neurons from our dictionary, we can guarantee this is a valid # population that has been declared in the BIBI at this point as the CLE will validate # before sending us the create message @@ -189,16 +119,32 @@ class MUSICBrainProcess(object): # for a source, we only need to connect the input proxy to the real neuron nest.Connect([proxy], [brain_neuron], syn_spec=synapse) + def _delete_tf(self, params): + """ + Currently unsupported, unable to dynamically create or destroy MUSIC ports. + """ + + # ignore any commands during simulation construction + if not self._ready: + return + + raise Exception("The distributed MUSIC-Nest implementation does not support TF deletion!") + + def _load_brain(self, params): + """ + Currently unsupported, unable to dynamically create or destroy MUSIC ports. + """ + + # ignore any commands during simulation construction + if not self._ready: + return + + raise Exception("The distributed MUSIC-Nest implementation does not support brain changes!") + if __name__ == '__main__': # pragma: no cover try: - # write our pid to disk so that this process can be forecefully killed if it does not - # terminate gracefully via MPI or through the experiment ending - pid = os.getpid() - with open('{}.lock'.format(pid), 'w') as pf: - pf.write('{}'.format(pid)) - parser = argparse.ArgumentParser() parser.add_argument('--bibi-file', dest='bibi_file', help='the bibi file path to load', required=True) diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICCLEProcess.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICCLEProcess.py index 1706d2c..4b26c5a 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICCLEProcess.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICCLEProcess.py @@ -25,13 +25,17 @@ This module contains the CLE process logic for the simulation assembly using MUSIC """ -import os -import argparse import logging +import sys + from hbp_nrp_cleserver.server.CLEGazeboSimulationAssembly import CLEGazeboSimulationAssembly from hbp_nrp_music_interface.cle.MUSICPyNNCommunicationAdapter import MUSICPyNNCommunicationAdapter from hbp_nrp_music_interface.cle.MUSICPyNNControlAdapter import MUSICPyNNControlAdapter +from hbp_nrp_distributed_nest.launch.DistributedCLEProcess import launch_cle + +# maintain this import order and both must be done before mpi4py +import hbp_nrp_cle.brainsim.pynn_nest # pylint: disable=unused-import import pyNN.nest as nestsim import music @@ -60,12 +64,6 @@ class MusicCLESimulationAssembly(CLEGazeboSimulationAssembly): """ logger.info('Using MUSIC configuration and adapters for CLE') - # write pid to lock file so launcher can always terminate us - # (failsafe) - pid = os.getpid() - with open('{}.lock'.format(pid), 'w') as pf: - pf.write('{}'.format(pid)) - # initialize music and set the CLE to use MUSIC adapters music.Setup() @@ -74,117 +72,6 @@ class MusicCLESimulationAssembly(CLEGazeboSimulationAssembly): return braincomm, braincontrol if __name__ == '__main__': # pragma: no cover - # TODO: This should be separated into its own method such that we can unit - # test this code - simulation = None - - # exit code, 0 for success and -1 for any failures - mpi_returncode = 0 - try: - if os.environ["ROS_MASTER_URI"] == "": - raise Exception("You should run ROS first.") - - parser = argparse.ArgumentParser() - parser.add_argument('--exdconf', dest='exd_file', - help='specify the ExDConfiguration file', required=True) - parser.add_argument('--environment', dest='environment_file', - help='specify the environment file', required=True) - parser.add_argument('--experiment-path', dest='path', - help='specify the base experiment path', required=True) - parser.add_argument('--gzserver-host', dest='gzserver_host', - help='the gzserver target host', required=True) - parser.add_argument('--reservation', dest='reservation', default=None, - help='cluster resource reservation', required=False) - parser.add_argument('--sim-id', dest='sim_id', type=int, - help='the simulation id to use', required=True) - parser.add_argument('--timeout', dest='timeout', - help='the simulation default time allocated', required=True) - parser.add_argument('--rng-seed', dest='rng_seed', - help='the global experiment RNG seed', required=True) - parser.add_argument('-v', '--verbose', action='store_true', - help='increase output verbosity') - - args = parser.parse_args() - - # expand any parameters (e.g. NRP_EXPERIMENTS_DIRECTORY) in paths - args.exd_file = os.path.expandvars(args.exd_file) - args.environment_file = os.path.expandvars(args.environment_file) - args.path = os.path.expandvars(args.path) - - # simplified launch process below from ROSCLESimulationFactory.py, avoid circular depdency - # by importing here - import rospy - from hbp_nrp_cleserver.server import ROS_CLE_NODE_NAME - from hbp_nrp_cleserver.server.ROSCLESimulationFactory import set_up_logger - from hbp_nrp_cleserver.server.ROSCLESimulationFactory import get_experiment_data - - # reconfigure the logger to stdout as done in ROSCLESimulationFactory.py otherwise all - # output will be trapped by the ROS logger after the first ROS node is - # initialized - rospy.init_node(ROS_CLE_NODE_NAME, anonymous=True) - set_up_logger(None, args.verbose) - - exd, bibi = get_experiment_data(args.exd_file) - - # parse the timeout string command line argument into a valid datetime - import dateutil.parser as datetime_parser - timeout_parsed = datetime_parser.parse(args.timeout.replace('_', ' ')) - - # check the reservation argument, if empty default to None - if args.reservation == '': - args.reservation = None - - # override the experiment RNG seed with the command line value - exd.rngSeed = int(args.rng_seed) - - simulation = MusicCLESimulationAssembly(args.sim_id, - exd, - bibi, - gzserver_host=args.gzserver_host, - reservation=args.reservation, - timeout=timeout_parsed) - simulation.initialize(args.environment_file, None) - if simulation.cle_server is None: - raise Exception( - "Error in cle_function_init. Cannot start simulation.") - - # FIXME: This should be done more cleanly within the adapter, see [NRRPLT-4858] - # the tag is a magic number to avoid circular build/release dependency for now but - # this will be removed when the referenced bug is fixed - # notify MPI/music processes that configuration is complete - from mpi4py import MPI - for rank in xrange(MPI.COMM_WORLD.Get_size()): - if rank != MPI.COMM_WORLD.Get_rank(): - MPI.COMM_WORLD.send('ready', dest=rank, tag=100) - MPI.COMM_WORLD.Barrier() - - logger.info('Starting CLE.') - simulation.run() # This is a blocking call, not to be confused with - # threading.Thread.start - - except Exception as e: # pylint: disable=broad-except - - # if running through MPI, catch Exception and terminate below to ensure brain processes - # are also killed - logger.error( - 'CLE aborted with message {}, terminating.'.format(e.message)) - # if no logger - print 'CLE aborted with message {}, terminating.'.format(e.message) - logger.exception(e) - mpi_returncode = -1 - - finally: - - # always attempt to shutdown the CLE launcher and release resources - if simulation: - logger.info('Shutting down CLE.') - simulation.shutdown() - logger.info('Shutdown complete, terminating.') - # terminate the MUSIC spawned brain processes - # send a shutdown message in case the brain processes are in a recv loop at startup since they - # seem to block and ignore the Abort command until receiving a message - from mpi4py import MPI - for rank in xrange(MPI.COMM_WORLD.Get_size()): - MPI.COMM_WORLD.isend('shutdown', dest=rank, tag=100) - MPI.COMM_WORLD.Abort(mpi_returncode) + # guaranteed to only be launched in one process by MUSIC, launch the CLE with defined assembly + launch_cle(sys.argv[1:], MusicCLESimulationAssembly) diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICLauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICLauncher.py index f6e2e6c..5da6ead 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICLauncher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICLauncher.py @@ -25,19 +25,19 @@ Setup, build, and launch a distributed MUSIC instance that will spawn the CLE and requested brain processes. """ +from hbp_nrp_distributed_nest.launch.NestLauncher import NestLauncher + from hbp_nrp_music_interface.bibi import bibi_music_config from hbp_nrp_music_interface.launch.MUSICMPILauncher import MUSICMPILauncher from hbp_nrp_music_interface.launch.host.LocalLauncher import LocalLauncher from hbp_nrp_music_interface.launch.host.LuganoLauncher import LuganoLauncher import os -import random -import sys # This class intentionally does not inherit SimulationServer (even though it is an implementation of # it) in order to avoid duplicate notificators -class MUSICLauncher(object): +class MUSICLauncher(NestLauncher): """ Setup, build, and launch a distributed MUSIC instance that will spawn the CLE and requested brain processes. @@ -55,29 +55,7 @@ class MUSICLauncher(object): :param sim_id The id of the simulation/experiment to be launched. :param timeout The default simulation timeout (time initially allocated). """ - - # we need to replace absolute paths with relative environment variable-based paths for the - # remote hosts that may not share the same file structure - nrp_experiments_path = os.environ.get('NRP_EXPERIMENTS_DIRECTORY').rstrip('/') - - self._exd_file = exc.path.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') - self._bibi_file = bibi.path.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') - self._exp_path = exc.dir.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') - - # store the other launch parameters as provided - self._server_host = par['gzserver_host'] - self._reservation = par['reservation'] - self._sim_id = sim_id - self._timeout = par['timeout'] - - self.__exc = exc - self.__bibi = bibi - self._env_file = None - - # host specific launch configuration/allocation - self._launcher = None - # the MPI process launcher for the CLE and brain processes - self.mpilauncher = None + super(MUSICLauncher, self).__init__(sim_id, exc, bibi, **par) # we should call the except_hook when something goes wrong in the simulation, but currently # we don't @@ -88,6 +66,7 @@ class MUSICLauncher(object): on distributed hosts. """ + # extract the environment file path nrp_models_path = os.environ.get('NRP_MODELS_DIRECTORY').rstrip('/') self._env_file = environment_file.replace(nrp_models_path, '$NRP_MODELS_DIRECTORY') @@ -95,7 +74,7 @@ class MUSICLauncher(object): if self._server_host == 'local': self._launcher = LocalLauncher() elif self._server_host == 'lugano': - self._launcher = LuganoLauncher(self.__exc.bibiConf.processes + 1, + self._launcher = LuganoLauncher(self._exc.bibiConf.processes + 1, self._timeout, self._reservation) else: @@ -110,69 +89,30 @@ class MUSICLauncher(object): reservation_str = self._reservation if self._reservation else '' timeout_str = str(self._timeout).replace(' ', '_') - # extract the multiprocess RNG seed to use or generate one if needed - rng_seed = self.__exc.rngSeed if self.__exc.rngSeed is not None else \ - random.randint(1, sys.maxint) - # build a MUSIC configuration script with correct brain ports, launchers and arugments # save it to the host launcher temp directory, this is the same for every host - music_conf = bibi_music_config.MUSICConfiguration(self.__bibi) + music_conf = bibi_music_config.MUSICConfiguration(self._bibi) music_conf.add_application('CLE', cle_launcher, ['--exdconf={}'.format(self._exd_file), + '--bibi={}'.format(self._bibi_file), '--environment={}'.format(self._env_file), '--experiment-path={}'.format(self._exp_path), '--gzserver-host={}'.format(self._server_host), '--reservation={}'.format(reservation_str), '--sim-id={}'.format(self._sim_id), '--timeout={}'.format(timeout_str), - '--rng-seed={}'.format(rng_seed)], + '--rng-seed={}'.format(self._rng_seed)], 1) music_conf.add_application('BRAIN', brain_launcher, ['--bibi-file={}'.format(self._bibi_file), - '--rng-seed={}'.format(rng_seed)], - self.__exc.bibiConf.processes) + '--rng-seed={}'.format(self._rng_seed)], + self._exc.bibiConf.processes) music_conf.save(self._launcher.local_tmpdir) - # deploy the generated configuration files / launch scripts to the target host - self._launcher.deploy() - - # build an mpi launch command for the requested host configuration, currently we launch - # all processes on the same host - self.mpilauncher = MUSICMPILauncher() - self.mpilauncher.add_host(self._launcher.hostname, - self._launcher.host_tmpdir, - self.__exc.bibiConf.processes + 1) - - # construct the mpi command line with the above host/launch information - self.mpilauncher.build() - - # for error propagation reasons, we have to launch and init the MPI processes to emulate - # the behavior of the single process launcher, if the mpirun command fails or the CLE/brain - # processes fail then the error will be properly propagated - self.mpilauncher.launch() - - def run(self): - """ - Runs the assembled simulation - """ - self.mpilauncher.run() - - def shutdown(self): - """ - Shutdown all spawned processes and cleanup temporary files. - """ - - # terminate the mpirun command (if it is still running) - if self.mpilauncher is not None: - self.mpilauncher.shutdown() - self.mpilauncher = None - - # perform any launcher host specific cleanup - if self._launcher: - self._launcher.shutdown() - self._launcher = None + # construct the actual MPI launcher + self.mpilauncher = MUSICMPILauncher('music cle.music') - # finally, cleanup the roscore and any registrations launched by the above - os.system("echo 'y' | timeout -s SIGKILL 10s rosnode cleanup >/dev/null 2>&1") + # build and deploy configuration + self._build() diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py index 0258084..0bc240f 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py @@ -30,36 +30,20 @@ Helper class to build and execute a formatted mpirun command for music in the fo where each of the hosts has a specific working directory with necessary config files already in place. Also passes environment variables required for NRP/CLE execution. """ -import logging -import os -import subprocess -import time +from hbp_nrp_distributed_nest.launch.MPILauncher import MPILauncher -import rospy -import rosnode -from std_msgs.msg import String -import json -from hbp_nrp_cle import config - -logger = logging.getLogger(__name__) - - -class MUSICMPILauncher(object): +class MUSICMPILauncher(MPILauncher): """ Class constructs and executes the MUSIC mpi launch command. """ - def __init__(self): - self._hosts = [] - self._cmd = None - self._process = None - self._launched = False - self._gazebo_master_uri = None + def __init__(self, executable): + super(MUSICMPILauncher, self).__init__(executable) def add_host(self, hostname, tmpdir, processes=1): """ - Add a target host to the mpi launch configuration. + Add a target host to the mpi launch configuration with MUSIC working directory set. :param hostname The remote host name or ip. :param tmpdir A valid temporary directory on the remote host to launch in. @@ -67,146 +51,3 @@ class MUSICMPILauncher(object): """ self._hosts.append('-np {p} -host {h} -wdir {t} -genv NRP_MUSIC_DIRECTORY {t}' .format(p=processes, h=hostname, t=tmpdir)) - - def build(self): - """ - Construct the mpirun command line string with all hosts provided. - """ - - if len(self._hosts) == 0: - raise Exception('No target host configurations specified for MUSIC MPI processes!') - - # build the command with per host specific configuration - hoststr = ' : '.join(['{} music cle.music'.format(h) for h in self._hosts]) - self._cmd = 'mpirun {}'.format(hoststr) - - def launch(self): - """ - Launch the mpirun command and wait for successful startup of the CLE. Blocks until the - CLE publishes completion on the status topic or if the mpirun command aborts. - """ - - if not self._cmd: - raise Exception('No command set for MUSIC MPI processes, build() was not called!') - - # provide extra ssh flags if ssh is used on the vizcluster to ensure we can spawn - # other ssh sessions / processes for Gazebo/etc. - env_vars = dict(os.environ, HYDRA_LAUNCHER_EXTRA_ARGS="-M -K") - - # Spawn the mpirun command, we need to do a few special things because mvapich2 will - # send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this - # would cause the entire Python stack to stop and create <defunct> processes. - logger.info("Spawning MUSIC MPI processes: {}".format(self._cmd)) - self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang - preexec_fn=os.setsid, # create a new session/tree - stdin=subprocess.PIPE, # hangs without valid stdin - env=env_vars) # environment variables - - # wait until the CLE subprocess initializes properly or fails, the conditions are: - # failure if process exits (bad command, failure to launch, etc.) - # success if the CLE publishes a status update indicating loading is complete - - # subscribe for loading status updates from the CLE, wait for completion message - status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status) - - # wait for the process to abort or be successfully launched - while not self._process.poll() and not self._launched: - - # ensure gzbridge is running and accessible in deployed configurations - self._check_gzbridge() - - # very short sleep to be as responsive as possible - time.sleep(0.1) - - # disconnect the temporary status subscriber - status_sub.unregister() - - # launch has failed, propagate an error to the user - if not self._launched: - raise Exception('Distributed MPI launch failure, aborting.\nPlease contact ' - 'neurorobotics@humanbrainproject.eu if this problem persists.') - - def _on_status(self, msg): - """ - Listen for CLE status messages, parse the JSON format that is also sent to the frontend - progress bar. Wait for completion of the CLE loading task and mark the MPI process as - successfully launched. - - :param msg The ros message to parse. - """ - - status = json.loads(msg.data) - - # ignore status bar messages, these don't give us information if a launch has failed - # since the loading task will always be set to 'done' when it aborts - if 'progress' in status: - return - - # received a simulation status "tick", everything has launched successfully - self._launched = True - - def _check_gzbridge(self): - """ - gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients - that are configured and able to reach the backend machines. If Gazebo is launched on a - remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start - gzbridge on this host (a backend server). - """ - - if self._gazebo_master_uri: - return - - # request the ip of the Gazebo node, result will be -1 if not found - res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True) - if res == -1: - return - - # replace the ROS port with the Gazebo port, configure env, and run gzbridge - self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345' - - # only need to start the gzbridge if running in a deployed configuration - if '127.0.0.1' not in self._gazebo_master_uri: - - # this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we - # cannot import and reuse due to circular dependencies - os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri - os.system(config.config.get('gzbridge', 'restart-cmd')) - - def run(self): - """ - Block until the mpirun command exits. Check the return code to determine if it was - successful or aborted. Backwards compatibility in naming convention, must be run. - """ - - if not self._process or not self._launched: - raise Exception('No MPI process launched, cannot run until launch() is called.') - - # wait until it terminates, if launch fails or is killed externally, propagate an - # Exception to notify the simulation factory to shutdown since the launch/run has failed - if self._process.wait() != 0: - raise Exception('Distributed MPI runtime failure, aborting.\nPlease contact ' - 'neurorobotics@humanbrainproject.eu if this problem persists.') - - def shutdown(self): - """ - Attempt to forecfully shutdown the mpirun command if it is still running and has not - cleanly shut itself down. Guaranteed to be called after launch success or failure. - """ - - # try to terminate the mpirun command, mpirun will automatically exit nautrally when all - # of its spawned child processes exit or are killed, so this isn't explicitly necessary - if self._process: - try: - self._process.kill() - except OSError: # the process has already cleanly terminated - pass - - # terminate the gzbrige process/websocket if we started it above - if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri: - os.system(config.config.get('gzbridge', 'stop-cmd')) - - # reset all class variables to prevent class reuse - self._gazebo_master_uri = None - self._launched = False - self._process = None - self._cmd = None diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LocalLauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LocalLauncher.py index d9d6b43..c383c02 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LocalLauncher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LocalLauncher.py @@ -25,18 +25,14 @@ localhost launch target configuration. """ -from hbp_nrp_music_interface.launch.host import IHostLauncher +from hbp_nrp_distributed_nest.launch.host.LocalLauncher import LocalLauncher as ILocalLauncher -import glob import os -import shutil -import signal import stat import sys -import tempfile -class LocalLauncher(IHostLauncher): +class LocalLauncher(ILocalLauncher): """ This launch configuration targets the localhost for all processes and is suitable for local installs or deployed installs where the newly spawned processes can run on the same host @@ -50,13 +46,6 @@ class LocalLauncher(IHostLauncher): """ super(LocalLauncher, self).__init__() - # this launcher only targets the localhost - self._hostname = 'localhost' - - # create a temporary directory for configuration files (the same for local/host) - self._local_tmpdir = tempfile.mkdtemp() - self._host_tmpdir = self._local_tmpdir - def create_launch_scripts(self): """ Create a set of launch scripts for the CLE and individual brain processes with specific @@ -97,33 +86,3 @@ class LocalLauncher(IHostLauncher): # return a relative path to the script, it's guaranteed to be run in the tmpdir return './%s' % name - - def deploy(self): - """ - Nothing to deploy since the target is localhost. - """ - pass - - def shutdown(self): - """ - Shutdown by trying to kill any running processes and deleting the temporary directory. - """ - - if self._local_tmpdir is not None and os.path.exists(self._local_tmpdir): - - # terminate any lingering mpi processes that we have spawned, send a kill -9 since they - # do not need to terminate gracefully and are rogue if they are still running - for lock_file in glob.iglob(os.path.join(self._local_tmpdir, '*.lock')): - with open(lock_file, 'r') as pf: - pid = int(pf.readline()) - try: - os.kill(pid, signal.SIGKILL) - except OSError: - pass - - # finally, delete the directory - shutil.rmtree(self._local_tmpdir) - - # even though this class should not be reused, unset the tmpdir - self._local_tmpdir = None - self._host_tmpdir = None diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/__init__.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/__init__.py index d78c89b..51aa846 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/__init__.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/__init__.py @@ -1,73 +1,3 @@ """ -This package contains host specific implementations for different distributed -simulation targets. +This package contains host specific implementations for distributed MUSIC targets. """ - - -class IHostLauncher(object): - """ - A generic interface to implement a host specific launcher. Guarantees necessary property and - functions are accessible in inherited classes. - """ - - def __init__(self): - """ - Interface constructor, guarantee that the hostname and tmpdir properties are available. - """ - self._hostname = None - self._local_tmpdir = None - self._host_tmpdir = None - - @property - def hostname(self): - """ - Return the target host for the launcher implementation. Raise an exception if the host - specific implementation does not set the hostname value. - """ - if not self._hostname: - raise NotImplementedError('Host specific implementation did not set target hostname!') - return self._hostname - - @property - def local_tmpdir(self): - """ - Return the temporary configuration directory that can be used to write MUSIC - configuration files. Raise an exception if the host specific implementation does not set - the tmpdir value. - """ - if not self._local_tmpdir: - raise NotImplementedError('Host specific implementation did not set temp directory!') - return self._local_tmpdir - - @property - def host_tmpdir(self): - """ - Return the temporary execution directory on the host that contains all necessary MUSIC - configuration files. Raise an exception if the host specific implementation does not set - the tmpdir value. - """ - if not self._host_tmpdir: - raise NotImplementedError('Host specific implementation did not set temp directory!') - return self._host_tmpdir - - def create_launch_scripts(self): - """ - Create a set of launch scripts for the CLE and individual brain processes with specific - implementations required for each host. Write the launch scripts to the temporary directory - for this launcher. - - Returns a tuple (path to CLE launch script, path to BrainProcess launc script). - """ - raise NotImplementedError('Host specific implementation is missing!') - - def deploy(self): - """ - Deploy the temporary directory contents to the target host if necessary. - """ - raise NotImplementedError('Host specific implementation is missing!') - - def shutdown(self): - """ - Shutdown and cleanup any host specific configuration. - """ - raise NotImplementedError('Host specific implementation is missing!') diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_interface.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_interface.py index 3bc6a0f..36525a6 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_interface.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_interface.py @@ -23,7 +23,7 @@ # ---LICENSE-END import unittest -from hbp_nrp_music_interface.launch.host import IHostLauncher +from hbp_nrp_distributed_nest.launch.host import IHostLauncher from mock import Mock, patch import os diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_local.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_local.py index 0d20836..0615167 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_local.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_local.py @@ -94,9 +94,6 @@ class TestLocalLauncherInterface(unittest.TestCase): with patch('__builtin__.open', m, create=True): self.__launcher.shutdown() - # verify the file was read and kill was called - mock_kill.assert_called_once_with(1234, signal.SIGKILL) - # verify the rmtree is called to delete the tmpdir mock_rmtree.assert_called_once_with('/mock_tmp') diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py index 1f477e5..8b55e82 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py @@ -68,7 +68,7 @@ class MockFailedProcess(MockProcess): class TestControlAdapter(unittest.TestCase): def setUp(self): - self.__launcher = MUSICMPILauncher() + self.__launcher = MUSICMPILauncher('music cle.music') def test_add_hosts(self): self.__launcher.add_host('foo', '/tmp/foo') diff --git a/setVersion.sh b/setVersion.sh index e24a582..66e0673 100755 --- a/setVersion.sh +++ b/setVersion.sh @@ -50,7 +50,7 @@ function subVersion() { version=$1 file=$2 echo " ... "$file - list="hbp-nrp-music-xml hbp-nrp-music-interface" + list="hbp-nrp-music-xml hbp-nrp-music-interface hbp-nrp-distributed-nest" for i in $list; do sed -i "/$i/c\\$i==$version" $file done @@ -89,9 +89,11 @@ echo "Setting versions to '"$version"'" in ... setVersion $version hbp_nrp_music_xml/hbp_nrp_music_xml/version.py setVersion $version hbp_nrp_music_interface/hbp_nrp_music_interface/version.py +setVersion $version hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/version.py subVersion $version hbp_nrp_music_xml/requirements.txt subVersion $version hbp_nrp_music_interface/requirements.txt +subVersion $version hbp_nrp_distributed_nest/requirements.txt echo done. echo -- GitLab