Skip to content
Snippets Groups Projects
Commit 6d9d9bfc authored by Hossain Mahmud's avatar Hossain Mahmud
Browse files

[NRRPLT-6813] test stuffs for daint

parent bed1f2b2
No related branches found
No related tags found
No related merge requests found
# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
Helper class to build and execute a formatted srun command
"""
import logging
import os
import subprocess
import time
import rospy
import rosnode
from std_msgs.msg import String
import json
from hbp_nrp_cle import config
logger = logging.getLogger(__name__)
class DaintLauncher(object):
"""
Class constructs and executes the mpi in daint environment
"""
def __init__(self, executable):
self._exe = executable
self._cmd = None
self._process = None
self._launched = False
self._gazebo_master_uri = None
# TODO: change NestLaucher and remove this method
def add_host(self, hostname, tmpdir, processes=1):
"""
Ignore add_host in daint. Slurm (srun) manages this internally
:param hostname: ignored
:param tmpdir: ignored
:param processes: ignored
"""
logger.info("Igoring add_host for {} in {} for {} processes".format(
hostname, tmpdir, processes))
pass
def build(self):
"""
Construct the srun command line string with all hosts provided.
"""
# build the command with per host specific configuration
_shifter = '/apps/daint/system/opt/shifter/18.06.0/bin/shifter run --mpi ' \
'--writable-volatile=/home2/bbpnrsoa/nginx ' \
'--writable-volatile=/home2/bbpnrsoa/.ros ' \
'--writable-volatile=/home2/bbpnrsoa/.gazebo ' \
'--writable-volatile=/home2/bbpnrsoa/.nano ' \
'--writable-volatile=/home2/bbpnrsoa/.config ' \
'--writable-volatile=/home2/bbpnrsoa/.sdformat ' \
'' \
'--writable-volatile=/home2/bbpnrsoa/.local/var ' \
'--writable-volatile=/home2/bbpnrsoa/.local/etc/nginx ' \
'' \
'--writable-volatile=/var/run ' \
'--writable-volatile=/var/log ' \
'' \
'--writable-volatile=/home2/bbpnrsoa/nrp/src/ExDBackend' \
'--writable-volatile=/home2/bbpnrsoa/nrp/src/BrainSimulation' \
'' \
'--mount=type=bind,source=$HOME,destination=$HOME' \
'index.docker.io/hbpneurorobotics/nrp:sruntest'
self._cmd = 'srun -N2 -n2 {shifter} {command}'.format(shifter=_shifter, command=self._exe)
def launch(self):
"""
Launch the srun command and wait for successful startup of the CLE. Blocks until the
CLE publishes completion on the status topic or if the srun command aborts.
"""
if not self._cmd:
raise Exception('No command set for MPI processes, build() was not called!')
# provide extra ssh flags if ssh is used on the vizcluster to ensure we can spawn
# other ssh sessions / processes for Gazebo/etc.
env_vars = dict(os.environ, HYDRA_LAUNCHER_EXTRA_ARGS="-M -K")
# Spawn the srun command, we need to do a few special things because mvapich2 will
# send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this
# would cause the entire Python stack to stop and create <defunct> processes.
logger.info("Spawning MPI processes: {}".format(self._cmd))
self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang
preexec_fn=os.setsid, # create a new session/tree
stdin=subprocess.PIPE, # hangs without valid stdin
env=env_vars) # environment variables
# wait until the CLE subprocess initializes properly or fails, the conditions are:
# failure if process exits (bad command, failure to launch, etc.)
# success if the CLE publishes a status update indicating loading is complete
# subscribe for loading status updates from the CLE, wait for completion message
status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status)
# wait for the process to abort or be successfully launched
while not self._process.poll() and not self._launched:
# ensure gzbridge is running and accessible in deployed configurations
self._check_gzbridge()
# very short sleep to be as responsive as possible
time.sleep(0.1)
# disconnect the temporary status subscriber
status_sub.unregister()
# launch has failed, propagate an error to the user
if not self._launched:
raise Exception('Distributed MPI launch failure, aborting.\nPlease contact '
'neurorobotics@humanbrainproject.eu if this problem persists.')
def _on_status(self, msg):
"""
Listen for CLE status messages, parse the JSON format that is also sent to the frontend
progress bar. Wait for completion of the CLE loading task and mark the MPI process as
successfully launched.
:param msg The ros message to parse.
"""
status = json.loads(msg.data)
# ignore status bar messages, these don't give us information if a launch has failed
# since the loading task will always be set to 'done' when it aborts
if 'progress' in status:
return
# received a simulation status "tick", everything has launched successfully
self._launched = True
def _check_gzbridge(self):
"""
gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients
that are configured and able to reach the backend machines. If Gazebo is launched on a
remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start
gzbridge on this host (a backend server).
"""
if self._gazebo_master_uri:
return
# request the ip of the Gazebo node, result will be -1 if not found
res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True)
if res == -1:
return
# replace the ROS port with the Gazebo port, configure env, and run gzbridge
self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345'
# only need to start the gzbridge if running in a deployed configuration
if '127.0.0.1' not in self._gazebo_master_uri:
# this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we
# cannot import and reuse due to circular dependencies
os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri
os.system(config.config.get('gzbridge', 'restart-cmd'))
def run(self):
"""
Block until the srun command exits. Check the return code to determine if it was
successful or aborted. Backwards compatibility in naming convention, must be run.
"""
if not self._process or not self._launched:
raise Exception('No MPI process launched, cannot run until launch() is called.')
# wait until it terminates, if launch fails or is killed externally, propagate an
# Exception to notify the simulation factory to shutdown since the launch/run has failed
if self._process.wait() != 0:
raise Exception('Distributed MPI runtime failure, aborting.\nPlease contact '
'neurorobotics@humanbrainproject.eu if this problem persists.')
def shutdown(self):
"""
Attempt to forecfully shutdown the srun command if it is still running and has not
cleanly shut itself down. Guaranteed to be called after launch success or failure.
"""
# try to terminate the srun command, srun will automatically exit nautrally when all
# of its spawned child processes exit or are killed, so this isn't explicitly necessary
if self._process:
try:
self._process.kill()
except OSError: # the process has already cleanly terminated
pass
# terminate the gzbrige process/websocket if we started it above
if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri:
os.system(config.config.get('gzbridge', 'stop-cmd'))
# reset all class variables to prevent class reuse
self._gazebo_master_uri = None
self._launched = False
self._process = None
self._cmd = None
self._exe = None
......@@ -47,7 +47,6 @@ def launch_cle(argv, assembly_class): # pragma: no cover
simulation = None
# exit code, 0 for success and -1 for any failures
mpi_returncode = 0
try:
if os.environ["ROS_MASTER_URI"] == "":
raise Exception("You should run ROS first.")
......@@ -78,7 +77,7 @@ def launch_cle(argv, assembly_class): # pragma: no cover
import rospy
from hbp_nrp_cleserver.server import ROS_CLE_NODE_NAME
from hbp_nrp_cleserver.server.ROSCLESimulationFactory import set_up_logger
from hbp_nrp_cleserver.sim_config.SimConfig import SimConfig
from hbp_nrp_commons.sim_config.SimConfig import SimConfig
# reconfigure the logger to stdout as done in ROSCLESimulationFactory.py otherwise all
# output will be trapped by the ROS logger after the first ROS node is initialized
......@@ -131,7 +130,6 @@ def launch_cle(argv, assembly_class): # pragma: no cover
# if no logger
print 'CLE aborted with message {}, terminating.'.format(e.message)
logger.exception(e)
mpi_returncode = -1
finally:
......@@ -146,4 +144,4 @@ def launch_cle(argv, assembly_class): # pragma: no cover
# seem to block and ignore the Abort command until receiving a message
for rank in xrange(MPI.COMM_WORLD.Get_size()):
MPI.COMM_WORLD.isend('shutdown', dest=rank, tag=100)
MPI.COMM_WORLD.Abort(mpi_returncode)
MPI.COMM_WORLD.Abort(-1)
......@@ -101,7 +101,7 @@ def launch_brain(argv): # pragma: no cover
help='the simulation id to use', required=True)
args, _ = parser.parse_known_args(argv)
from hbp_nrp_cleserver.sim_config.SimConfig import SimConfig
from hbp_nrp_commons.sim_config.SimConfig import SimConfig
sim_config = SimConfig(args.exd_file,
sim_id=args.sim_id,
rng_seed=int(args.rng_seed))
......@@ -117,8 +117,8 @@ def launch_brain(argv): # pragma: no cover
traceback.print_exc()
print str(ex)
# for any failures, terminate all other brain processes and the CLE
MPI.COMM_WORLD.Abort(-1)
# for any failures, terminate all other brain processes and the CLE
MPI.COMM_WORLD.Abort(-1)
if __name__ == '__main__': # pragma: no cover
......@@ -128,6 +128,11 @@ if __name__ == '__main__': # pragma: no cover
from mpi4py import MPI
if MPI.COMM_WORLD.Get_rank() == 0:
print '================ MPI RANK ================ ' + str(MPI.COMM_WORLD.Get_rank())
launch_cle(argv_backup, DistributedCLESimulationAssembly)
else:
# import pydevd
# pydevd.settrace('localhost', port=50003, stdoutToServer=True, stderrToServer=True)
print '================ MPI RANK ================ ' + str(MPI.COMM_WORLD.Get_rank())
launch_brain(argv_backup)
......@@ -80,7 +80,7 @@ class MPILauncher(object):
# build the command with per host specific configuration
hoststr = ' : '.join(['{h} {e}'.format(h=host, e=self._exe) for host in self._hosts])
self._cmd = 'mpirun -launcher ssh {}'.format(hoststr)
self._cmd = 'mpirun {}'.format(hoststr)
def launch(self):
"""
......
......@@ -28,18 +28,23 @@ requested brain processes.
from hbp_nrp_distributed_nest.launch.host.LocalLauncher import LocalLauncher
from hbp_nrp_distributed_nest.launch.MPILauncher import MPILauncher
from hbp_nrp_distributed_nest.launch.DaintLauncher import DaintLauncher
import os
import random
import sys
import logging
logger = logging.getLogger(__name__)
# This class intentionally does not inherit SimulationServer
# (even though it is an implementation of it) in order to avoid duplicate notificators
class NestLauncher(object):
"""
Setup, build, and launch a distributed Nest instance that will spawn the CLE and
requested brain processes.
NOTE: This class intentionally does not inherit SimulationServer
(even though it is an implementation of it) in order to avoid duplicate notificators
"""
def __init__(self, sim_config):
......@@ -92,7 +97,7 @@ class NestLauncher(object):
# construct the actual MPI launcher with the process that determines if the CLE or
# standalone brain should be launched
# TODO: Find way to send simconfig object directly to the DistributedNestProcess
args = ['--exdconf={}'.format(self._sim_config.exc_abs_path),
args = ['--exdconf={}'.format(os.path.realpath(self._sim_config.exc_abs_path)),
'--gzserver-host={}'.format(self._sim_config.gzserver_host),
'--reservation={}'.format(reservation_str),
'--sim-id={}'.format(self._sim_config._sim_id),
......@@ -101,7 +106,11 @@ class NestLauncher(object):
exe = '{python} -m hbp_nrp_distributed_nest.launch.DistributedNestProcess {args}'\
.format(python=sys.executable, args=' '.join(args))
self.mpilauncher = MPILauncher(exe)
logger.info("Initializing MPI launcher")
launcher = MPILauncher
self.mpilauncher = launcher(exe)
# build and deploy configuration
self._build()
......@@ -118,8 +127,7 @@ class NestLauncher(object):
# construct the actual MPI launcher based on the deployed configuration
self.mpilauncher.add_host(self._launcher.hostname,
self._launcher.host_tmpdir,
self._sim_config.num_brain_processes+ 1)
#2)
self._sim_config.num_brain_processes + 1)
# construct the mpi command line with the above host/launch information
self.mpilauncher.build()
......@@ -127,6 +135,7 @@ class NestLauncher(object):
# for error propagation reasons, we have to launch and init the MPI processes to emulate
# the behavior of the single process launcher, if the mpirun command fails or the CLE/brain
# processes fail then the error will be properly propagated
logger.info("Launching MPI")
self.mpilauncher.launch()
def run(self):
......
#!/bin/bash
# This script is designed for local usage.
# Ignore generated files.
export IGNORE_LINT="platform_venv|hbp_nrp_music_xml/hbp_nrp_music_xml/schema/generated"
# Ignore generated files
# export IGNORE_LINT="platform_venv|hbp_nrp_music_xml/hbp_nrp_music_xml/schema/generated"
# Ignore generated and music related files
export IGNORE_LINT='platform_venv|hbp_nrp_music_xml|hbp_nrp_music_interface|hbp_nrp_music_xml/hbp_nrp_music_xml/schema/generated|migrations|nest'
export VIRTUAL_ENV=$NRP_VIRTUAL_ENV
# This script only runs static code analysis, the tests can be run separately using run_tests.sh
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment