From 973e4e564bc7a6db14ae3a3cd1ef4e38e8bd015e Mon Sep 17 00:00:00 2001 From: Kenny Sharma <kenny.sharma@tum.de> Date: Mon, 20 Mar 2017 08:00:55 +0100 Subject: [PATCH] [NRRPLT-3902][NRRPLT-4058][NRRPLT-4840] Vizcluster support, stability, and test patch. This patch addresses 3 broad issues: [NRRPLT-3902] - provides the ability to run simulations on vizcluster nodes - parameterize absolute paths with appropriate NRP_ environment variable that can be properly expanded on the target host - provides generic interfaces that can be extended for different hosts [NRRPLT-4058] - refactors the init/run mechanisms to more closely mirror the single process version - this enables reporting of launch failures all the way through CLE initialization (which includes things like Gazebo launch failures, resource allocation failures, Exceptions) - checks the runtime return code to determine if the mpirun command was abnormally terminated and properly reports the error - propagates all errors appropriately back to the main pipeline [NRRPLT-4840] - increases test coverage of the new components back up to ~90% - the reamining coverage is primarily for workaround/temporary code that is difficult to unit test without running the platform and will be removed in future commits Change-Id: Ib33a1d35b26aeee6da10c95f30bb9be9243479dc --- .../launch/MUSICBrainProcess.py | 20 ++- .../launch/MUSICLauncher.py | 152 ++++++++---------- .../launch/MUSICMPILauncher.py | 90 ++++++++--- .../launch/host/LocalLauncher.py | 106 ++++++++++++ .../launch/host/LuganoLauncher.py | 142 ++++++++++++++++ .../launch/host/__init__.py | 73 +++++++++ .../tests/launch/host/__init__.py | 4 + .../tests/launch/host/test_interface.py | 42 +++++ .../tests/launch/host/test_local.py | 85 ++++++++++ .../tests/launch/host/test_lugano.py | 105 ++++++++++++ .../tests/launch/test_mpi_launcher.py | 77 +++++++-- .../tests/launch/test_music_launcher.py | 72 +++++++++ hbp_nrp_music_interface/setup.py | 3 +- 13 files changed, 848 insertions(+), 123 deletions(-) create mode 100644 hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LocalLauncher.py create mode 100644 hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LuganoLauncher.py create mode 100644 hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/__init__.py create mode 100644 hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/__init__.py create mode 100644 hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_interface.py create mode 100644 hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_local.py create mode 100644 hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_lugano.py create mode 100644 hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_music_launcher.py diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py index f225f12..d3be1aa 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py @@ -100,11 +100,17 @@ class MUSICBrainProcess(object): self._connect_tf(data) # command and control string from the CLE - elif isinstance(data, str) and data == 'ready': + elif data == 'ready': # TF loading is complete, we can start the simulation ready = True + # CLE launch has aborted, abandon the recv loop which may block MPI_ABORT commands + elif data == 'shutdown': + + # raise an Exception to terminate immediately + raise Exception('Received shutdown command from the CLELauncher, aborting!') + # unknown message, this is a critical failure since this should never happen # fully abort and log the condition else: @@ -155,7 +161,7 @@ class MUSICBrainProcess(object): nest.Connect([proxy], [brain_neuron], syn_spec=synapse) -if __name__ == '__main__': +if __name__ == '__main__': # pragma: no cover try: # write our pid to disk so that this process can be forecefully killed if it does not @@ -166,18 +172,18 @@ if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--bibi-file', dest='bibi_file', - help='the absolute bibi file path to load', required=True) + help='the bibi file path to load', required=True) args = parser.parse_args() - # construct brain and proxies - brain = MUSICBrainProcess(args.bibi_file) + # construct brain and proxies (expand environment variables in paths) + brain = MUSICBrainProcess(os.path.expandvars(args.bibi_file)) # run the brain until terminated, this is a blocking call brain.run() - except Exception: # pylint: disable=W0703 + except Exception: # pylint: disable=broad-except # print the traceback which should go back to the remote logger traceback.print_exc() # for any failures, terminate all other brain processes and the CLE - MPI.COMM_WORLD.Abort() + MPI.COMM_WORLD.Abort(-1) diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICLauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICLauncher.py index 7c81d9f..f508830 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICLauncher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICLauncher.py @@ -4,14 +4,10 @@ requested brain processes. """ from hbp_nrp_music_interface.bibi import bibi_music_config from hbp_nrp_music_interface.launch.MUSICMPILauncher import MUSICMPILauncher +from hbp_nrp_music_interface.launch.host.LocalLauncher import LocalLauncher +from hbp_nrp_music_interface.launch.host.LuganoLauncher import LuganoLauncher import os -import tempfile -import stat -import shutil -import glob -import signal -import sys class MUSICLauncher(object): @@ -37,18 +33,27 @@ class MUSICLauncher(object): :param timeout The default simulation timeout (time initially allocated). """ - self._exd_file = exd_file - self._bibi_file = bibi_file - self._env_file = env_file - self._exp_path = exp_path + # we need to replace absolute paths with relative environment variable-based paths for the + # remote hosts that may not share the same file structure + nrp_models_path = os.environ.get('NRP_MODELS_DIRECTORY').rstrip('/') + nrp_experiments_path = os.environ.get('NRP_EXPERIMENTS_DIRECTORY').rstrip('/') + + self._exd_file = exd_file.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') + self._bibi_file = bibi_file.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') + self._env_file = env_file.replace(nrp_models_path, '$NRP_MODELS_DIRECTORY') + self._exp_path = exp_path.replace(nrp_experiments_path, '$NRP_EXPERIMENTS_DIRECTORY') + + # store the other launch parameters as provided self._server_host = server_host - self._reservation = reservation if reservation else '' + self._reservation = reservation self._sim_id = sim_id self._timeout = timeout + # host specific launch configuration/allocation + self._launcher = None + + # the MPI process launcher for the CLE and brain processes self.cle_server = None - self._config = None - self._tmpdir = None def init(self, bibi, brain_processes): """ @@ -59,73 +64,60 @@ class MUSICLauncher(object): :param brain_processes The number of brain processses (does not include CLE process). """ - # create .sh launcher scripts for the CLE and brain processes - cle_launcher = self._create_launch_script('music_cle_launcher.sh', - 'hbp_nrp_cleserver.server.CLELauncher') - brain_launcher = self._create_launch_script('music_brain_launcher.sh', - 'hbp_nrp_music_interface.launch.' + - 'MUSICBrainProcess') - - # build a MUSIC configuration script with correct brain ports, launchers and arugments - self._config = bibi_music_config.MUSICConfiguration(bibi) - self._config.add_application('CLE', cle_launcher, - ['--exdconf={}'.format(self._exd_file), - '--environment={}'.format(self._env_file), - '--experiment-path={}'.format(self._exp_path), - '--gzserver-host={}'.format(self._server_host), - '--reservation={}'.format(self._reservation), - '--sim-id={}'.format(self._sim_id), - '--timeout={}'.format(str(self._timeout).replace(' ', '_')), - '--music'], - 1) - self._config.add_application('BRAIN', brain_launcher, - ['--bibi-file={}'.format(self._bibi_file)], - brain_processes) - self._config.save(self._tmpdir) - - # build an mpi launch command for the requested host configuration - self.cle_server = MUSICMPILauncher() + # create a host specific launcher if self._server_host == 'local': - # all processes (CLE + brain processes on localhost) - self.cle_server.add_host('localhost', self._tmpdir, brain_processes + 1) - + self._launcher = LocalLauncher() elif self._server_host == 'lugano': - - # TODO: SLURM allocate processes - 1 nodes (since CLE runs on localhost) - # copy all files (.music, .xml, .sh) over to remote hosts - # run .sh on remote hosts because we have to load modules/etc in mpi context - # keep this identical to a lcocal install for now (ignore comment below) - self.cle_server.add_host('localhost', self._tmpdir, brain_processes + 1) + self._launcher = LuganoLauncher(brain_processes + 1, self._timeout, self._reservation) else: raise Exception('Unknown server host {}, cannot configure and launch MUSIC!' .format(self._server_host)) - # construct the mpi command line with the above host/launch information - self.cle_server.build() - - def _create_launch_script(self, name, module): - """ - Create an executable script in the working temporary folder to launch the specified - Python module. These will be used by the MUSIC runtime on each of the hosts since there - are some quirks in launching "python -m <module> with <args>" directly throug MUSIC. + # create launch scripts for the CLE and brain processes + cle_launcher, brain_launcher = self._launcher.create_launch_scripts() - :param name The name for the launch script to create. - :param module The python module to launch with this script. - :return The absolute path to this launch script. - """ + # command line argument friendly versions of timeout and reservation arguments + # the receiving processes must understand how to convert these back + reservation_str = self._reservation if self._reservation else '' + timeout_str = str(self._timeout).replace(' ', '_') - if not self._tmpdir: - self._tmpdir = tempfile.mkdtemp() - path = os.path.join(self._tmpdir, name) + # build a MUSIC configuration script with correct brain ports, launchers and arugments + # save it to the host launcher temp directory, this is the same for every host + music_conf = bibi_music_config.MUSICConfiguration(bibi) + music_conf.add_application('CLE', + cle_launcher, + ['--exdconf={}'.format(self._exd_file), + '--environment={}'.format(self._env_file), + '--experiment-path={}'.format(self._exp_path), + '--gzserver-host={}'.format(self._server_host), + '--reservation={}'.format(reservation_str), + '--sim-id={}'.format(self._sim_id), + '--timeout={}'.format(timeout_str), + '--music'], + 1) + music_conf.add_application('BRAIN', + brain_launcher, + ['--bibi-file={}'.format(self._bibi_file)], + brain_processes) + music_conf.save(self._launcher.local_tmpdir) + + # deploy the generated configuration files / launch scripts to the target host + self._launcher.deploy() + + # build an mpi launch command for the requested host configuration, currently we launch + # all processes on the same host + self.cle_server = MUSICMPILauncher() + self.cle_server.add_host(self._launcher.hostname, + self._launcher.host_tmpdir, + brain_processes + 1) - # use the absolute path of the Python interpreter for our current process, this current - # process will not be executed through uwsgi, so this will be the correct interpreter - with open(path, 'w') as f: - f.write('#!/bin/bash\n') - f.write('{python} -m {module}\n'.format(python=sys.executable, module=module)) + # construct the mpi command line with the above host/launch information + self.cle_server.build() - os.chmod(path, stat.S_IRWXU) - return path + # for error propagation reasons, we have to launch and init the MPI processes to emulate + # the behavior of the single process launcher, if the mpirun command fails or the CLE/brain + # processes fail then the error will be properly propagated + self.cle_server.launch() def shutdown(self): """ @@ -137,19 +129,7 @@ class MUSICLauncher(object): self.cle_server.shutdown() self.cle_server = None - # terminate any lingering mpi processes that we have spawned, send a kill -9 since they do - # not need to terminate gracefully and are rogue if they are still running at this point - for lock_file in glob.iglob(os.path.join(self._tmpdir, '*.lock')): - with open(lock_file, 'r') as pf: - pid = int(pf.readline()) - try: - os.kill(pid, signal.SIGKILL) - except OSError: - pass - - # cleanup the localhost temporary directory - if self._tmpdir is not None and os.path.exists(self._tmpdir): - shutil.rmtree(self._tmpdir) - self._tmpdir = None - - # TODO: SLURM cleanup perform the above two tasks on remote hosts + # perform any launcher host specific cleanup + if self._launcher: + self._launcher.shutdown() + self._launcher = None diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py index eaf0dcf..355ae4c 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py @@ -7,9 +7,14 @@ Helper class to build and execute a formatted mpirun command for music in the fo where each of the hosts has a specific working directory with necessary config files already in place. Also passes environment variables required for NRP/CLE execution. """ +import logging import os import subprocess -import logging +import time + +import rospy +from std_msgs.msg import String +import json logger = logging.getLogger(__name__) @@ -23,6 +28,7 @@ class MUSICMPILauncher(object): self._hosts = [] self._cmd = None self._process = None + self._launched = False def add_host(self, hostname, tmpdir, processes=1): """ @@ -42,44 +48,89 @@ class MUSICMPILauncher(object): if len(self._hosts) == 0: raise Exception('No target host configurations specified for MUSIC MPI processes!') - # build a list of environment variables to set on the remote host for NRP - # TODO: when running on multiple hosts, it will be important to replace the 'localhost' - # in ROS_MASTER_URI with a fully accesible ip address (note: this was an - # issue with the local installation at FZI, so keep localhost as is for now until - # we can explore the issue during SLURM integration) - ros_master_uri = os.environ.get("ROS_MASTER_URI") - - # TODO: executable will be script that sets up environment if hosts > 1 - executable = "music cle.music" - # build the command with per host specific configuration - envstr = '-envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI {}'.format(ros_master_uri) - hoststr = ' : '.join(['{} {} {}'.format(envstr, h, executable) for h in self._hosts]) + hoststr = ' : '.join(['{} music cle.music'.format(h) for h in self._hosts]) self._cmd = 'mpirun {}'.format(hoststr) - def run(self): + def launch(self): """ - Execute the mpirun command and block until it terminates (meaning all child processes - should also be terminated if everything was successful). + Launch the mpirun command and wait for successful startup of the CLE. Blocks until the + CLE publishes completion on the status topic or if the mpirun command aborts. """ if not self._cmd: raise Exception('No command set for MUSIC MPI processes, build() was not called!') + # provide extra ssh flags if ssh is used on the vizcluster to ensure we can spawn + # other ssh sessions / processes for Gazebo/etc. + env_vars = dict(os.environ, HYDRA_LAUNCHER_EXTRA_ARGS="-M -K") + # Spawn the mpirun command, we need to do a few special things because mvapich2 will # send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this # would cause the entire Python stack to stop and create <defunct> processes. logger.info("Spawning MUSIC MPI processes: {}".format(self._cmd)) self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang preexec_fn=os.setsid, # create a new session/tree - stdin=subprocess.PIPE) # allow input to the process + stdin=subprocess.PIPE, # allow input to the process + env=env_vars) # environment variables # for some reason we have to send a newline to the new session after setsid self._process.stdin.write('\n\n') self._process.stdin.flush() - # wait until it terminates - self._process.wait() + # wait until the CLE subprocess initializes properly or fails, the conditions are: + # failure if process exits (bad command, failure to launch, etc.) + # success if the CLE publishes a status update indicating loading is complete + + # subscribe for loading status updates from the CLE, wait for completion message + status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status) + + # wait for one of the terminating conditions + while not self._process.poll() and not self._launched: + time.sleep(1) + + # disconnect the temporary status subscriber + status_sub.unregister() + + # launch has failed, propagate an error to the user + if not self._launched: + raise Exception('Distributed MPI launch failure, aborting.\nPlease contact ' + 'neurorobotics@humanbrainproject.eu if this problem persists.') + + def _on_status(self, msg): + """ + Listen for CLE status messages, parse the JSON format that is also sent to the frontend + progress bar. Wait for completion of the CLE loading task and mark the MPI process as + successfully launched. + + :param msg The ros message to parse. + """ + + status = json.loads(msg.data) + + # we may get a few extra status messages before unregistering, ignore them + if 'progress' not in status: + return + + # check for task completion + if status['progress']['task'] == 'Neurorobotics Closed Loop Engine': + if 'done' in status['progress'] and status['progress']['done']: + self._launched = True + + def run(self): + """ + Block until the mpirun command exits. Check the return code to determine if it was + successful or aborted. Backwards compatibility in naming convention, must be run. + """ + + if not self._process or not self._launched: + raise Exception('No MPI process launched, cannot run until launch() is called.') + + # wait until it terminates, if launch fails or is killed externally, propagate an + # Exception to notify the simulation factory to shutdown since the launch/run has failed + if self._process.wait() != 0: + raise Exception('Distributed MPI runtime failure, aborting.\nPlease contact ' + 'neurorobotics@humanbrainproject.eu if this problem persists.') def shutdown(self): """ @@ -96,5 +147,6 @@ class MUSICMPILauncher(object): except OSError: # the process has already cleanly terminated pass + self._launched = False self._process = None self._cmd = None diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LocalLauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LocalLauncher.py new file mode 100644 index 0000000..5bce4b6 --- /dev/null +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LocalLauncher.py @@ -0,0 +1,106 @@ +""" +localhost launch target configuration. +""" + +from hbp_nrp_music_interface.launch.host import IHostLauncher + +import glob +import os +import shutil +import signal +import stat +import sys +import tempfile + + +class LocalLauncher(IHostLauncher): + """ + This launch configuration targets the localhost for all processes and is suitable for local + installs or deployed installs where the newly spawned processes can run on the same host + as the REST backend. + """ + + def __init__(self): + """ + Create a local launcher to target localhost and create a local temporary directory to + write MUSIC configuration files. + """ + super(LocalLauncher, self).__init__() + + # this launcher only targets the localhost + self._hostname = 'localhost' + + # create a temporary directory for configuration files (the same for local/host) + self._local_tmpdir = tempfile.mkdtemp() + self._host_tmpdir = self._local_tmpdir + + def create_launch_scripts(self): + """ + Create a set of launch scripts for the CLE and individual brain processes with specific + implementations required for each host. Write the launch scripts to the temporary directory + for this launcher. + + Returns a tuple (path to CLE launch script, path to BrainProcess launc script). + """ + # create .sh launcher scripts for the CLE and brain processes + cle_launcher = self._create_launch_script('music_cle_launcher.sh', + 'hbp_nrp_cleserver.server.CLELauncher') + brain_launcher = self._create_launch_script('music_brain_launcher.sh', + 'hbp_nrp_music_interface.launch.' + + 'MUSICBrainProcess') + return (cle_launcher, brain_launcher) + + def _create_launch_script(self, name, module): + """ + Create an executable script in the working temporary folder to launch the specified + Python module. These will be used by the MUSIC runtime on each of the hosts since there + are some quirks in launching "python -m <module> with <args>" directly throug MUSIC. + + :param name The name for the launch script to create. + :param module The python module to launch with this script. + :return The absolute path to this launch script. + """ + + # absolute path to script in tmpdir + path = os.path.join(self._local_tmpdir, name) + + # use the absolute path of the Python interpreter for our current process, this current + # process will not be executed through uwsgi, so this will be the correct interpreter + with open(path, 'w') as f: + f.write('#!/bin/bash\n') + f.write('{python} -m {module}\n'.format(python=sys.executable, module=module)) + + os.chmod(path, stat.S_IRWXU) + + # return a relative path to the script, it's guaranteed to be run in the tmpdir + return './%s' % name + + def deploy(self): + """ + Nothing to deploy since the target is localhost. + """ + pass + + def shutdown(self): + """ + Shutdown by trying to kill any running processes and deleting the temporary directory. + """ + + if self._local_tmpdir is not None and os.path.exists(self._local_tmpdir): + + # terminate any lingering mpi processes that we have spawned, send a kill -9 since they + # do not need to terminate gracefully and are rogue if they are still running + for lock_file in glob.iglob(os.path.join(self._local_tmpdir, '*.lock')): + with open(lock_file, 'r') as pf: + pid = int(pf.readline()) + try: + os.kill(pid, signal.SIGKILL) + except OSError: + pass + + # finally, delete the directory + shutil.rmtree(self._local_tmpdir) + + # even though this class should not be reused, unset the tmpdir + self._local_tmpdir = None + self._host_tmpdir = None diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LuganoLauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LuganoLauncher.py new file mode 100644 index 0000000..3153bc1 --- /dev/null +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/LuganoLauncher.py @@ -0,0 +1,142 @@ +""" +Lugano vizcluster launch target configuration. +""" + +from hbp_nrp_music_interface.launch.host.LocalLauncher import LocalLauncher + +from hbp_nrp_commons.cluster.LuganoVizCluster import LuganoVizCluster + +from hbp_nrp_cle.cle import config + +import netifaces +import os +import stat + + +class LuganoLauncher(LuganoVizCluster, LocalLauncher): + """ + This launch configuration targets the Lugano vizcluster and handles brain/CLE process + allocation and distribution. This is executed before the CLE is launched, so launching will + immediately terminate if there are not enough cluster resources. + """ + + def __init__(self, processes, timeout, reservation): + """ + Immediately attempt to allocate cluster resources for the brain processes. If this fails + or causes Exceptions, it will be propagated up to the user appropriately. + + :param processes The total number of processes (brain + CLE) to reserve. + :param timeout The simulation timeout required by the vizcluster launcher. + :param reservation Resource reservation string to access reserved nodes. + """ + + # ensure both constructors are called, raise Exception if allocation fails + try: + super(LuganoLauncher, self).__init__(processes, 0, timeout.tzinfo, reservation) + self._allocate_job(reuse_nodes=True) # multiple brains can run on the same node + finally: + LocalLauncher.__init__(self) + + # override hostname with allocated node, remote tmp does not exist until written + self._hostname = self._node + self._host_tmpdir = None + + def _create_launch_script(self, name, module): + """ + Create an executable script in the working temporary folder to launch the specified + Python module. These will be used by the MUSIC runtime on each of the hosts since there + are some quirks in launching "python -m <module> with <args>" directly throug MUSIC. + + Handle specific vizcluster configuration (modules and environment). + + :param name The name for the launch script to create. + :param module The python module to launch with this script. + :return The absolute path to this launch script on the remote host. + """ + + # absolute path to script in local tmpdir + path = os.path.join(self._local_tmpdir, name) + + # determine if we are running on a dev or staging environment + environment = os.environ.get('ENVIRONMENT') + + # set the ROS master uri to use an actual IP instead of localhost + ifaddress = netifaces.ifaddresses(config.config.get('network', 'main-interface')) + local_ip = ifaddress[netifaces.AF_INET][0]['addr'] + ros_master_uri = os.environ.get("ROS_MASTER_URI").replace('localhost', local_ip) + + # create a launch script that configures the vizcluster environment properly + with open(path, 'w') as f: + f.write('#!/bin/bash\n') + + # set the terminal type to ensure we get the same behavior as from a backend VM + # this is also required in the Docker images + f.write('export TERM=linux\n') + + # environment variable configuration + f.write('source /opt/rh/python27/enable\n') + f.write('export ENVIRONMENT=%s\n' % environment) + if environment == 'staging': + with open(os.environ.get('NRP_VARIABLES_PATH')) as f: + content = f.readlines() + for version in [x.strip() for x in content if '_VERSION=' in x]: + f.write('%s\n' % version) + + # load the environment modules based on the above configuration + proj_path = '/gpfs/bbp.cscs.ch/project/proj30/neurorobotics/%s/' % environment + f.write('source %s/server-scripts/nrp-services-modules.sh\n' % proj_path) + + # set paths to models/experiments directory on gpfs + f.write('export NRP_MODELS_DIRECTORY=%s/models\n' % proj_path) + f.write('export NRP_EXPERIMENTS_DIRECTORY=%s/experiments\n' % proj_path) + + # set the PYTHONPATH to add NRP modules on gpfs + venv_path = '%s/platform_venv/lib/python2.7/site-packages' % proj_path + f.write('export PYTHONPATH=%s:$PYTHONPATH\n' % venv_path) + + # configure ROS and source the ros_venv before launching + f.write('export ROS_MASTER_URI=%s\n' % ros_master_uri) + f.write('source $ROS_PYTHON_VENV/bin/activate\n') + + # actually launch the module + f.write('python -m {module}\n'.format(module=module)) + + os.chmod(path, stat.S_IRWXU) + + # return a relative path to the script, it's guaranteed to be run in the tmpdir + return './%s' % name + + def deploy(self): + """ + Copy all configuration files to a temp directory on the remote host. The remote directory + is created here, so use it to set the launcher interface host tmpdir value. + """ + for f in os.listdir(self._local_tmpdir): + self._copy_to_remote(os.path.join(self._local_tmpdir, f)) + self._host_tmpdir = self._tmp_dir + + def shutdown(self): + """ + Shutdown by trying to kill any running processes and deleting the temporary directory on the + remote allocated node and localhost. Both are guaranteed to exist by the constructor. + """ + + # try to terminate any processes running on the allocated node, they should already be + # terminated before we get here, catch any Exceptions and ensure we cleanup below + if self._host_tmpdir: + try: + # terminate any processes, remote directory will be deleted by parent cleanup + clean_process = self._spawn_ssh_node() + clean_process.sendline('for L in %s/*.lock ; do kill -9 `basename $L .lock`; done' % + self._host_tmpdir) + clean_process.terminate() + except Exception: # pylint: disable=broad-except + pass + finally: + self._host_tmpdir = None + + # delete the remote temp directory and deallocate the node + LuganoVizCluster.stop(self) + + # cleanup any local processes and delete the remote temp directory + LocalLauncher.shutdown(self) diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/__init__.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/__init__.py new file mode 100644 index 0000000..d78c89b --- /dev/null +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/host/__init__.py @@ -0,0 +1,73 @@ +""" +This package contains host specific implementations for different distributed +simulation targets. +""" + + +class IHostLauncher(object): + """ + A generic interface to implement a host specific launcher. Guarantees necessary property and + functions are accessible in inherited classes. + """ + + def __init__(self): + """ + Interface constructor, guarantee that the hostname and tmpdir properties are available. + """ + self._hostname = None + self._local_tmpdir = None + self._host_tmpdir = None + + @property + def hostname(self): + """ + Return the target host for the launcher implementation. Raise an exception if the host + specific implementation does not set the hostname value. + """ + if not self._hostname: + raise NotImplementedError('Host specific implementation did not set target hostname!') + return self._hostname + + @property + def local_tmpdir(self): + """ + Return the temporary configuration directory that can be used to write MUSIC + configuration files. Raise an exception if the host specific implementation does not set + the tmpdir value. + """ + if not self._local_tmpdir: + raise NotImplementedError('Host specific implementation did not set temp directory!') + return self._local_tmpdir + + @property + def host_tmpdir(self): + """ + Return the temporary execution directory on the host that contains all necessary MUSIC + configuration files. Raise an exception if the host specific implementation does not set + the tmpdir value. + """ + if not self._host_tmpdir: + raise NotImplementedError('Host specific implementation did not set temp directory!') + return self._host_tmpdir + + def create_launch_scripts(self): + """ + Create a set of launch scripts for the CLE and individual brain processes with specific + implementations required for each host. Write the launch scripts to the temporary directory + for this launcher. + + Returns a tuple (path to CLE launch script, path to BrainProcess launc script). + """ + raise NotImplementedError('Host specific implementation is missing!') + + def deploy(self): + """ + Deploy the temporary directory contents to the target host if necessary. + """ + raise NotImplementedError('Host specific implementation is missing!') + + def shutdown(self): + """ + Shutdown and cleanup any host specific configuration. + """ + raise NotImplementedError('Host specific implementation is missing!') diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/__init__.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/__init__.py new file mode 100644 index 0000000..d6a3227 --- /dev/null +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/__init__.py @@ -0,0 +1,4 @@ +""" +This package contains all tests for the MUSIC interface launch.host package for host specific +launch configurations. +""" diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_interface.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_interface.py new file mode 100644 index 0000000..639f9fe --- /dev/null +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_interface.py @@ -0,0 +1,42 @@ +import unittest + +from hbp_nrp_music_interface.launch.host import IHostLauncher + +from mock import Mock, patch +import os + + +class TestHostLauncherInterface(unittest.TestCase): + + def setUp(self): + self.__launcher = IHostLauncher() + + def test_initial_values(self): + self.assertEqual(self.__launcher._hostname, None) + self.assertEqual(self.__launcher._local_tmpdir, None) + self.assertEqual(self.__launcher._host_tmpdir, None) + + def test_properties(self): + + # default values raise implementation errors + self.assertRaises(NotImplementedError, getattr, self.__launcher, 'hostname') + self.assertRaises(NotImplementedError, getattr, self.__launcher, 'local_tmpdir') + self.assertRaises(NotImplementedError, getattr, self.__launcher, 'host_tmpdir') + + # set some values to simulate an implementation + self.__launcher._hostname = 'foo' + self.__launcher._local_tmpdir = 'local' + self.__launcher._host_tmpdir = 'host' + self.assertEqual(self.__launcher.hostname, 'foo') + self.assertEqual(self.__launcher.local_tmpdir, 'local') + self.assertEqual(self.__launcher.host_tmpdir, 'host') + + def test_functions(self): + + # all functions raise implementation errors in the interface + self.assertRaises(NotImplementedError, self.__launcher.create_launch_scripts) + self.assertRaises(NotImplementedError, self.__launcher.deploy) + self.assertRaises(NotImplementedError, self.__launcher.shutdown) + +if __name__ == "__main__": + unittest.main() diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_local.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_local.py new file mode 100644 index 0000000..61458ab --- /dev/null +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_local.py @@ -0,0 +1,85 @@ +import unittest + +from hbp_nrp_music_interface.launch.host.LocalLauncher import LocalLauncher + +from mock import Mock, patch, mock_open, call +import os +import sys +import stat +import signal + + +class TestLocalLauncherInterface(unittest.TestCase): + + def setUp(self): + with patch('tempfile.mkdtemp') as mkdtemp_mock: + mkdtemp_mock.return_value = '/mock_tmp' + self.__launcher = LocalLauncher() + + def test_properties(self): + + # implementation should set all property values + self.assertEqual(self.__launcher.hostname, 'localhost') + self.assertEqual(self.__launcher.local_tmpdir, '/mock_tmp') + self.assertEqual(self.__launcher.host_tmpdir, '/mock_tmp') + + def test_create_launch_scripts(self): + + # ensure that the calls to create the cle and brain launch scripts are made + def mock_creator(name, module): + return name + + with patch.object(self.__launcher, '_create_launch_script', mock_creator): + self.assertEqual(self.__launcher.create_launch_scripts(), + ('music_cle_launcher.sh', 'music_brain_launcher.sh')) + + @patch('os.chmod') + def test_internal_create_launch_script(self, mock_chmod): + + # mock the script open and write commands + m = mock_open() + with patch('__builtin__.open', m, create=True): + self.assertEqual(self.__launcher._create_launch_script('name.sh', 'module'), + './name.sh') + + # the open call + m.assert_called_once_with('/mock_tmp/name.sh', 'w') + + # the write calls for the actual script + handle = m() + self.assertEqual(handle.write.call_args_list, + [call('#!/bin/bash\n'), + call('{python} -m module\n'.format(python=sys.executable))]) + + # ensure the chmod was called to make the script executable + mock_chmod.assert_called_once_with('/mock_tmp/name.sh', stat.S_IRWXU) + + def test_deploy(self): + + # should just call pass, exception means failure + self.__launcher.deploy() + + @patch('os.path.exists', return_value=True) + @patch('glob.iglob', return_value=['1234.lock']) + @patch('os.kill') + @patch('shutil.rmtree') + def test_shutdown(self, mock_rmtree, mock_kill, mock_iglob, mock_exists): + + # mock the lockfile open/readline command + m = mock_open() + m.return_value.readline.return_value = '1234' + with patch('__builtin__.open', m, create=True): + self.__launcher.shutdown() + + # verify the file was read and kill was called + mock_kill.assert_called_once_with(1234, signal.SIGKILL) + + # verify the rmtree is called to delete the tmpdir + mock_rmtree.assert_called_once_with('/mock_tmp') + + # verify the tmp dirs are unset + self.assertEqual(self.__launcher._local_tmpdir, None) + self.assertEqual(self.__launcher._host_tmpdir, None) + +if __name__ == "__main__": + unittest.main() diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_lugano.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_lugano.py new file mode 100644 index 0000000..c0edb46 --- /dev/null +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/host/test_lugano.py @@ -0,0 +1,105 @@ +import unittest + +from hbp_nrp_music_interface.launch.host.LuganoLauncher import LuganoLauncher + +from mock import Mock, patch, mock_open, call +import os +import sys +import stat +import signal +import netifaces + + +class TestLuganoLauncherInterface(unittest.TestCase): + + def setUp(self): + + # mock the Lugano specific calls made in the constructor + def mock_init(self, processes, gpus, timeout, reservation): + self._node = 'mock_node' + + with patch('tempfile.mkdtemp', return_value='/mock_tmp') as mkdtemp_mock,\ + patch('hbp_nrp_commons.cluster.LuganoVizCluster.LuganoVizCluster.__init__', mock_init) as init_mock,\ + patch('hbp_nrp_commons.cluster.LuganoVizCluster.LuganoVizCluster._allocate_job') as allocate_mock: + + self.__launcher = LuganoLauncher(4, Mock(), None) + + def test_properties(self): + + # implementation should set all property values + self.assertEqual(self.__launcher.hostname, 'mock_node') + self.assertEqual(self.__launcher.local_tmpdir, '/mock_tmp') + + @patch('hbp_nrp_cle.cle.config.config.get') + @patch('netifaces.ifaddresses', return_value={netifaces.AF_INET: [{'addr': '1.2.3.4'}]}) + @patch('os.chmod') + def test_internal_create_launch_script(self, mock_chmod, mock_ifaddresses, mock_config): + + def mock_environ_get(key): + if key == 'ENVIRONMENT': + return 'dev' + elif key == 'ROS_MASTER_URI': + return 'http://localhost:11311' + + # mock the script open and write commands + m = mock_open() + with patch('__builtin__.open', m, create=True), patch('os.environ.get', mock_environ_get): + self.assertEqual(self.__launcher._create_launch_script('name.sh', 'module'), + './name.sh') + + # the open call + m.assert_called_once_with('/mock_tmp/name.sh', 'w') + + # the write calls for the actual script + handle = m() + self.assertEqual(handle.write.call_args_list, + [call('#!/bin/bash\n'), + call('export TERM=linux\n'), + call('source /opt/rh/python27/enable\n'), + call('export ENVIRONMENT=dev\n'), + call('source /gpfs/bbp.cscs.ch/project/proj30/neurorobotics/dev//server-scripts/nrp-services-modules.sh\n'), + call('export NRP_MODELS_DIRECTORY=/gpfs/bbp.cscs.ch/project/proj30/neurorobotics/dev//models\n'), + call('export NRP_EXPERIMENTS_DIRECTORY=/gpfs/bbp.cscs.ch/project/proj30/neurorobotics/dev//experiments\n'), + call('export PYTHONPATH=/gpfs/bbp.cscs.ch/project/proj30/neurorobotics/dev//platform_venv/lib/python2.7/site-packages:$PYTHONPATH\n'), + call('export ROS_MASTER_URI=http://1.2.3.4:11311\n'), + call('source $ROS_PYTHON_VENV/bin/activate\n'), + call('python -m module\n')]) + + # ensure the chmod was called to make the script executable + mock_chmod.assert_called_once_with('/mock_tmp/name.sh', stat.S_IRWXU) + + def test_deploy(self): + + # set the remote temp dir name that is created by the copy command + def mock_copy(self, source): + self._tmp_dir = '/mock_remote_tmp' + + with patch('hbp_nrp_commons.cluster.LuganoVizCluster.LuganoVizCluster._copy_to_remote', mock_copy),\ + patch('os.listdir', return_value=['foo']): + # should just call pass, exception means failure + self.__launcher.deploy() + + # make sure the remote tmp variable is set correctly + self.assertEqual(self.__launcher.host_tmpdir, '/mock_remote_tmp') + + @patch('hbp_nrp_commons.cluster.LuganoVizCluster.LuganoVizCluster._spawn_ssh_node') + @patch('hbp_nrp_commons.cluster.LuganoVizCluster.LuganoVizCluster.stop') + @patch('hbp_nrp_music_interface.launch.host.LocalLauncher.LocalLauncher.shutdown') + def test_shutdown(self, mock_shutdown, mock_stop, mock_spawn): + + mock_ssh = Mock() + mock_spawn.return_value = mock_ssh + + # ensure deploy has been called first even if the test hasn't run + self.__launcher._host_tmpdir = '/mock_remote_tmp' + self.__launcher.shutdown() + + # verify the parent class shutdowns are called + mock_stop.assert_called_once() + mock_shutdown.assert_called_once() + + # verify the tmp dirs are unset + self.assertEqual(self.__launcher._host_tmpdir, None) + +if __name__ == "__main__": + unittest.main() diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py index 3caecb0..1782c55 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py @@ -4,6 +4,9 @@ from hbp_nrp_music_interface.launch.MUSICMPILauncher import MUSICMPILauncher from mock import Mock, patch import os +import json + +from std_msgs.msg import String class MockProcessIO(object): @@ -17,16 +20,28 @@ class MockProcessIO(object): class MockProcess(object): - def __init__(self): + def __init__(self, cmd, preexec_fn, stdin, env): self.stdin=MockProcessIO() + def poll(self): + return None + def wait(self): - pass + return 0 def kill(self): raise OSError('testing error') +class MockFailedProcess(MockProcess): + + def poll(self): + return 255 + + def wait(self): + return 255 + + class TestControlAdapter(unittest.TestCase): def setUp(self): @@ -48,25 +63,67 @@ class TestControlAdapter(unittest.TestCase): self.__launcher.add_host('bar', '/tmp/bar', 3) self.__launcher.build() self.assertEqual(self.__launcher._cmd, - 'mpirun -envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI foo ' + - '-np 1 -host foo -wdir /tmp/foo music cle.music : ' + - '-envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI foo ' + + 'mpirun -np 1 -host foo -wdir /tmp/foo music cle.music : ' + '-np 3 -host bar -wdir /tmp/bar music cle.music') - @patch('subprocess.Popen', new=Mock(return_value=MockProcess())) - def test_run(self): - self.__launcher._hosts = [] + def test_on_status(self): + + # reset launched flag if another test was run prior + self.__launcher._launched = False + + # not a progress message, no update to launched + msg = String() + msg.data = json.dumps({'something': 'else'}) + self.__launcher._on_status(msg) + self.assertEqual(self.__launcher._launched, False) + + # progress completion message + msg.data = json.dumps({'progress': {'task': 'Neurorobotics Closed Loop Engine', 'done': True}}) + self.__launcher._on_status(msg) + self.assertEqual(self.__launcher._launched, True) + + @patch('rospy.Subscriber') + def test_launch(self, rospy_mock): + + # unset any variables from other tests self.__launcher._cmd = None - self.assertRaises(Exception, self.__launcher.run) + self.__launcher._launched = None + + # not initialized, cannot run without a command + self.assertRaises(Exception, self.__launcher.launch) + # initialize with a valid process self.__launcher._cmd = 'foo' + with patch('subprocess.Popen', MockProcess): + self.__launcher._launched = True + self.__launcher.launch() + + # initialize with an invalid process + with patch('subprocess.Popen', MockFailedProcess): + self.__launcher._launched = False + self.assertRaises(Exception, self.__launcher.launch) + + def test_run(self): + + # not initialized or launched, failure expected + self.__launcher._process = None + self.__launcher._launched = False + self.assertRaises(Exception, self.__launcher.run) + + # successfully launched and completed + self.__launcher._launched = True + self.__launcher._process = MockProcess(None, None, None, None) self.__launcher.run() + # failed process run + self.__launcher._process = MockFailedProcess(None, None, None, None) + self.assertRaises(Exception, self.__launcher.run) + def test_shutdown(self): self.__launcher._process = None self.__launcher.shutdown() - self.__launcher._process = MockProcess() + self.__launcher._process = MockProcess(None, None, None, None) self.__launcher.shutdown() if __name__ == "__main__": diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_music_launcher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_music_launcher.py new file mode 100644 index 0000000..6148bfe --- /dev/null +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_music_launcher.py @@ -0,0 +1,72 @@ +import unittest + +from hbp_nrp_music_interface.launch.MUSICLauncher import MUSICLauncher + +from mock import Mock, patch +import os + + +class MockLocalLauncher(Mock): + + @property + def hostname(self): + return 'localhost' + + @property + def local_tmpdir(self): + return '/mock_tmpdir' + + @property + def host_tmpdir(self): + return '/mock_host_tmpdir' + + def create_launch_scripts(self): + return ('cle.sh', 'brain.sh') + + +class TestMUSICLauncher(unittest.TestCase): + + @patch('os.environ.get', return_value='') + def setUp(self, mock_environ_get): + self.__launcher = MUSICLauncher('exd_conf', 'bibi_file', 'env_file', '/exp_path', + 'local', None, 1, '1234 T 567') + + @patch('hbp_nrp_music_interface.launch.MUSICLauncher.LocalLauncher', MockLocalLauncher) + @patch('hbp_nrp_music_interface.launch.MUSICLauncher.bibi_music_config.MUSICConfiguration') + @patch('hbp_nrp_music_interface.launch.MUSICLauncher.MUSICMPILauncher') + def test_init(self, mock_mpi, mock_conf): + + # mock all of the local launcher functionality + self.__launcher.init('bibi', 10) + + # call to generate launch scripts + self.__launcher._launcher.deploy.assert_called_once() + + # assert that all of the MPI configuration/launch commands were called + self.__launcher.cle_server.add_host.assert_called_once_with('localhost', '/mock_host_tmpdir', 11) + self.__launcher.cle_server.build.assert_called_once() + self.__launcher.cle_server.launch.assert_called_once() + + def test_shutdown(self): + + # mock the cleserver and launcher + mock_cle_server = Mock() + mock_cle_server.shutdown = Mock() + mock_launcher = Mock() + mock_launcher.shutdown = Mock() + + # set values like in init() and shutdown + self.__launcher.cle_server = mock_cle_server + self.__launcher._launcher = mock_launcher + self.__launcher.shutdown() + + # make sure the mocked shutdowns are called + mock_cle_server.shutdown.assert_called_once() + mock_launcher.shutdown.assert_called_once() + + # verify the local variables are unset + self.assertEqual(self.__launcher.cle_server, None) + self.assertEqual(self.__launcher._launcher, None) + +if __name__ == "__main__": + unittest.main() diff --git a/hbp_nrp_music_interface/setup.py b/hbp_nrp_music_interface/setup.py index 97a1775..863ccb5 100644 --- a/hbp_nrp_music_interface/setup.py +++ b/hbp_nrp_music_interface/setup.py @@ -40,7 +40,8 @@ config = { 'packages': ['hbp_nrp_music_interface', 'hbp_nrp_music_interface.bibi', 'hbp_nrp_music_interface.cle', - 'hbp_nrp_music_interface.launch'], + 'hbp_nrp_music_interface.launch', + 'hbp_nrp_music_interface.launch.host'], 'scripts': [], 'name': 'hbp-nrp-music-interface', 'include_package_data': True, -- GitLab