Skip to content
Snippets Groups Projects
Commit 64a5d0b2 authored by Hossain Mahmud's avatar Hossain Mahmud Committed by Manos Angelidis
Browse files

Merged in multinode-daint-jp66 (pull request #14)


[NRPJP-66] parameterized daint launcher

* [NRRPLT-0000] hotfix for daint launch command

* [NRPJP-66] parameterized daint launcher

* [NRPJP-66] moved to sarus + printing output from srun subprocess

* [NRPJP-66] Temporary ammended for "ImportError: libmpicxx.so.12: cannot open shared object file: No such file or directory" bug

* [NRPJP-66] mpi process prints hostname

Approved-by: default avatarEloy Retamino <retamino@ugr.es>
Approved-by: default avatarMichael Zechmair <michael.zechmair@in.tum.de>
parent 3f319935
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,6 @@ from hbp_nrp_cle import config
logger = logging.getLogger(__name__)
class DaintLauncher(object):
"""
Class constructs and executes the mpi in daint environment
......@@ -52,6 +51,7 @@ class DaintLauncher(object):
self._process = None
self._launched = False
self._gazebo_master_uri = None
self._status_sub = None
# TODO: change NestLaucher and remove this method
def add_host(self, hostname, tmpdir, processes=1):
......@@ -69,29 +69,56 @@ class DaintLauncher(object):
"""
Construct the srun command line string with all hosts provided.
"""
import socket
def get_ip():
"""
:return: Returns public interface's IP address
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# it doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
ip = s.getsockname()[0]
# pylint: disable=broad-exception
except:
# look up on the local host
ip = '127.0.0.1'
finally:
s.close()
return ip
# Reconstruct Daint nodes' $HOME
# WARNING: both $HOME and $USER has been overridden in the image
# daint_user = os.popen('id -un').read().rstrip().lstrip()
daint_user = subprocess.check_output(['id', '-un']).rstrip().lstrip()
daint_home = os.path.join('/users', daint_user)
daint_shared_dir = os.environ.get('DAINT_SHARED_DIRECTORY', daint_home)
# Clear log files
os.popen('echo > {shared_dir}/srun.log'.format(shared_dir=daint_shared_dir))
os.popen('echo > {shared_dir}/mpi.log'.format(shared_dir=daint_shared_dir))
# build the command with per host specific configuration
_shifter = '/apps/daint/system/opt/shifter/18.06.0/bin/shifter run --mpi ' \
'--writable-volatile=/home2/bbpnrsoa/nginx ' \
'--writable-volatile=/home2/bbpnrsoa/.ros ' \
'--writable-volatile=/home2/bbpnrsoa/.gazebo ' \
'--writable-volatile=/home2/bbpnrsoa/.nano ' \
'--writable-volatile=/home2/bbpnrsoa/.config ' \
'--writable-volatile=/home2/bbpnrsoa/.sdformat ' \
'' \
'--writable-volatile=/home2/bbpnrsoa/.local/var ' \
'--writable-volatile=/home2/bbpnrsoa/.local/etc/nginx ' \
'' \
'--writable-volatile=/var/run ' \
'--writable-volatile=/var/log ' \
'' \
'--writable-volatile=/home2/bbpnrsoa/nrp/src/ExDBackend' \
'--writable-volatile=/home2/bbpnrsoa/nrp/src/BrainSimulation' \
'' \
'--mount=type=bind,source=$HOME,destination=$HOME' \
'index.docker.io/hbpneurorobotics/nrp:sruntest'
self._cmd = 'srun -N2 -n2 {shifter} {command}'.format(shifter=_shifter, command=self._exe)
_sarus = '{binary} --verbose run --mpi ' \
'--mount=type=bind,source={user_home},destination={user_home} ' \
'--mount=source={log_dir},dst={log_dir},type=bind ' \
'' \
'hbpneurorobotics/nrp:daint ' \
'{hbp}/BrainSimulation/mpi.sh {log_dir} {ros_master} {mpi_command}'.format(
binary='/apps/daint/system/opt/sarus/1.0.1/bin/sarus',
home=os.environ.get('HOME'), # /home2/bbpnrsoa
hbp=os.environ.get('HBP'), # /home2/bbpnrsoa/nrp/src
user_home=daint_home, # /users/<cscs_username>
log_dir=daint_shared_dir, # eg, daint_home or /scratch/snx3000/<cscs_username>
ros_master=get_ip(),
mpi_command=self._exe)
self._cmd = 'srun -v -C gpu -N 2 -n 2 ' \
'--chdir {wdir} --jobid {jobid} {sarus}'.format(
wdir=daint_shared_dir,
jobid=os.environ.get('SLURM_JOB_ID'),
sarus=_sarus)
def launch(self):
"""
......@@ -113,31 +140,62 @@ class DaintLauncher(object):
self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang
preexec_fn=os.setsid, # create a new session/tree
stdin=subprocess.PIPE, # hangs without valid stdin
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_vars) # environment variables
# subscribe for loading status updates from the CLE, wait for completion message
self._status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status)
import time
start = time.time()
# wait until the CLE subprocess initializes properly or fails, the conditions are:
# failure if process exits (bad command, failure to launch, etc.)
# success if the CLE publishes a status update indicating loading is complete
# subscribe for loading status updates from the CLE, wait for completion message
status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status)
# wait for the process to abort or be successfully launched
# wait for the srun and shifter to be execute. Timeout in 120 seconds
# _launched will become True when ROSCLEServer would be initialized
# triggering a message at /ros_cle_simulation/status
n = 0
while not self._process.poll() and not self._launched:
n += 1
time.sleep(1)
# launch has failed, propagate an error to the user
if not self._launched:
o = self._process.stdout.read()
e = self._process.stderr.read()
print 'PROCESS END: {}'.format(n)
if o:
print '--------- STDOUT -------'
print o
print '--------- STDOUT -------'
if e:
print '--------- ERROR -------'
print e
print '--------- ERROR -------'
raise Exception(
'ABORTING: Distributed launch failure. '
'Timed out while waiting for /ros_cle_simulation/status service to come alive. '
'Please contact neurorobotics@humanbrainproject.eu if this problem persists.'
)
start = time.time()
# wait for the gzserver script to execute or timeout in 60 seconds
while not self._gazebo_master_uri and time.time() - start < 60:
# ensure gzbridge is running and accessible in deployed configurations
self._check_gzbridge()
time.sleep(1)
# very short sleep to be as responsive as possible
time.sleep(0.1)
# disconnect the temporary status subscriber
status_sub.unregister()
# launch has failed, propagate an error to the user
if not self._launched:
raise Exception('Distributed MPI launch failure, aborting.\nPlease contact '
'neurorobotics@humanbrainproject.eu if this problem persists.')
# gzserver launch has failed, propagate an error to the user
if not self._gazebo_master_uri:
raise Exception(
'ABORTING: Distributed launch failure.\n'
'Timed out while waiting for /gazebo node to come alive. '
'Please contact neurorobotics@humanbrainproject.eu if this problem persists.'
)
def _on_status(self, msg):
"""
......@@ -149,6 +207,7 @@ class DaintLauncher(object):
"""
status = json.loads(msg.data)
logger.info(str(status))
# ignore status bar messages, these don't give us information if a launch has failed
# since the loading task will always be set to 'done' when it aborts
......@@ -156,8 +215,12 @@ class DaintLauncher(object):
return
# received a simulation status "tick", everything has launched successfully
logger.info("===== MARKING LAUNCHED to be TRUE =====")
self._launched = True
# disconnect the status subscriber
self._status_sub.unregister()
def _check_gzbridge(self):
"""
gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients
......@@ -171,6 +234,8 @@ class DaintLauncher(object):
# request the ip of the Gazebo node, result will be -1 if not found
res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True)
logger.info("===== gzbridge /gazebo ip from rospy {}".format(str(ip)))
if res == -1:
return
......@@ -183,6 +248,8 @@ class DaintLauncher(object):
# this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we
# cannot import and reuse due to circular dependencies
os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri
logger.info("===== GAZEBO_MASTER_URI var: {}".format(self._gazebo_master_uri))
logger.info("===== GAZEBO_MASTER_URI env: {}".format(os.environ['GAZEBO_MASTER_URI']))
os.system(config.config.get('gzbridge', 'restart-cmd'))
def run(self):
......@@ -194,6 +261,14 @@ class DaintLauncher(object):
if not self._process or not self._launched:
raise Exception('No MPI process launched, cannot run until launch() is called.')
import time
while True:
logger.info("===== STDOUT from subprocess.Popen =====\n{}".format(
str(self._process.stdout.read())))
logger.info("===== STDERR from subprocess.Popen =====\n{}".format(
str(self._process.stderr.read())))
time.sleep(3)
# wait until it terminates, if launch fails or is killed externally, propagate an
# Exception to notify the simulation factory to shutdown since the launch/run has failed
if self._process.wait() != 0:
......@@ -224,3 +299,4 @@ class DaintLauncher(object):
self._process = None
self._cmd = None
self._exe = None
......@@ -138,7 +138,7 @@ class NestBrainProcess(object):
elif data == 'step':
# run the coordinated simulation step
#print "[MPI] ===================== step ======================="
# print "[MPI] ===================== step ======================="
self._brain_controller.run_step(self._timestep * 1000.0) # msec
self._brain_communicator.refresh_buffers(0.0)
......
......@@ -106,7 +106,7 @@ class NestLauncher(object):
'--token={}'.format(self._sim_config._token),
'--experiment_id={}'.format(self._sim_config.experiment_id)]
exe = '{python} -m hbp_nrp_distributed_nest.launch.main {args}'\
exe = '{python} -u -m hbp_nrp_distributed_nest.launch.main {args}'\
.format(python=sys.executable, args=' '.join(args))
logger.info("Initializing MPI launcher")
......
......@@ -51,8 +51,10 @@ if __name__ == '__main__': # pragma: no cover
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
import socket
hostname = socket.gethostname()
print '[ MPI ] ========== nest rank={} ========'.format(nest.Rank())
print '[ MPI ] ========== nest rank={}; hostname={} ========'.format(nest.Rank(),hostname)
# use the MPI process rank to determine if we should launch CLE or brain process
# both launch commands are blocking until shutdown occurs
......
mpi.sh 0 → 100755
#!/usr/bin/env bash
logdir=$1
exec 1> $logdir/srun.log 2>&1
shift
echo "[mpi.sh] Running mpi.sh script. Logs in: $logdir"
cat /home2/bbpnrsoa/nrp/src/BrainSimulation/mpi.sh
echo -n "[mpi.sh] CWD: "
pwd
# Set CLE environment
export LC_ALL=C
unset LANGUAGE
. /home2/bbpnrsoa/nrp/src/user-scripts/nrp_variables
echo $PYTHONPATH
export ROS_MASTER_URI=http://$1:11311
echo "[mpi.sh] ROS_MASTER_URI: $ROS_MASTER_URI"
shift
export ROS_IP=$(hostname -I | cut -d " " -f 1)
echo "[mpi.sh] ROS_IP: $ROS_IP"
export GAZEBO_MASTER_URI=http://$(hostname -I | cut -d " " -f 1):11345
echo "[mpi.sh] GAZEBO_MASTER_URI : $GAZEBO_MASTER_URI"
rm /usr/lib/libmpi.so.12
rm /usr/lib/libmpi.so.12.0.2
ln -s /usr/lib/libmpi.so.12.0.5 /usr/lib/libmpi.so.12
export LD_LIBRARY_PATH=/usr/lib:$LD_LIBRARY_PATH
echo "[mpi.sh] Executing the MPI command: $@"
$@ >> $logdir/mpi.log 2>&1
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment