Skip to content
Snippets Groups Projects
Commit 4782ac39 authored by Kenny Sharma's avatar Kenny Sharma
Browse files

[NRRPLT-4809] Use mvapich2 mpirun syntax.

We are transitioning from OpenMPI to mvapich2, which has different
syntax for mpirun. This minor change updates the syntax and will ensure
that both local and deployed installed are using mvapich2.

The mpirun (mpiexec.hydra) is now a bit different than the OpenMPI
behavior. When executed, it sends a SIGSTOP to the parent process tree
and suspends everything (the bash terminal and our Python processes),
so it's necessary to now spawn it using the os.setssid parameter to
start a new session for the subprorcess that is separated from the
main process tree.

This patch also removes the subprocess.wait() call which according
to the documentation can cause buffering/memory issues. In it's place
poll() is used with live/faster output of the processes output to the
main log.

Change-Id: I3a9e255986ac6f39a01b7a1a801e8ab24a14abac
parent 46ebaf6d
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@
"""
Helper class to build and execute a formatted mpirun command for music in the format:
mpirun -X <vars> -np <proc> --host <hostname/ip> --wdir <temporary work dir> <command> : <repeat>
mpirun -envlist <vars> -np <proc> -host <hostname/ip> -wdir <temporary work dir> <command> : ...
where each of the hosts has a specific working directory with necessary config files already
in place. Also passes environment variables required for NRP/CLE execution.
......@@ -32,7 +32,7 @@ class MUSICMPILauncher(object):
:param tmpdir A valid temporary directory on the remote host to launch in.
:param processes The number of processes for this host.
"""
self._hosts.append('--np {} --host {} --wdir {}'.format(processes, hostname, tmpdir))
self._hosts.append('-np {} -host {} -wdir {}'.format(processes, hostname, tmpdir))
def build(self):
"""
......@@ -48,29 +48,37 @@ class MUSICMPILauncher(object):
# issue with the local installation at FZI, so keep localhost as is for now until
# we can explore the issue during SLURM integration)
ros_master_uri = os.environ.get("ROS_MASTER_URI")
envs = ['NRP_MODELS_DIRECTORY',
'PYTHONPATH',
'ROS_MASTER_URI={}'.format(ros_master_uri)]
# TODO: executable will be script that sets up environment if hosts > 1
executable = "music cle.music"
# build the command with per host specific configuration
envstr = ' '.join(['--x {}'.format(v) for v in envs])
envstr = '-envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI {}'.format(ros_master_uri)
hoststr = ' : '.join(['{} {} {}'.format(envstr, h, executable) for h in self._hosts])
self._cmd = 'mpirun {}'.format(hoststr)
def run(self):
"""
Execute the mpirun command and block until it terminates (meaning all child processes
should also be terminated if everything was successful.
should also be terminated if everything was successful).
"""
if not self._cmd:
raise Exception('No command set for MUSIC MPI processes, build() was not called!')
# Spawn the mpirun command, we need to do a few special things because mvapich2 will
# send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this
# would cause the entire Python stack to stop and create <defunct> processes.
logger.info("Spawning MUSIC MPI processes: {}".format(self._cmd))
self._process = subprocess.Popen([self._cmd], shell=True)
self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang
preexec_fn=os.setsid, # create a new session/tree
stdin=subprocess.PIPE) # allow input to the process
# for some reason we have to send a newline to the new session after setsid
self._process.stdin.write('\n\n')
self._process.stdin.flush()
# wait until it terminates
self._process.wait()
def shutdown(self):
......
......@@ -6,8 +6,20 @@ from mock import Mock, patch
import os
class MockProcessIO(object):
def write(self, message):
pass
def flush(self):
pass
class MockProcess(object):
def __init__(self):
self.stdin=MockProcessIO()
def wait(self):
pass
......@@ -24,8 +36,8 @@ class TestControlAdapter(unittest.TestCase):
self.__launcher.add_host('foo', '/tmp/foo')
self.__launcher.add_host('bar', '/tmp/bar', 3)
self.assertEqual(self.__launcher._hosts,
['--np 1 --host foo --wdir /tmp/foo',
'--np 3 --host bar --wdir /tmp/bar'])
['-np 1 -host foo -wdir /tmp/foo',
'-np 3 -host bar -wdir /tmp/bar'])
@patch('os.environ.get', new=Mock(return_value='foo'))
def test_build(self):
......@@ -36,10 +48,10 @@ class TestControlAdapter(unittest.TestCase):
self.__launcher.add_host('bar', '/tmp/bar', 3)
self.__launcher.build()
self.assertEqual(self.__launcher._cmd,
'mpirun --x NRP_MODELS_DIRECTORY --x PYTHONPATH --x ROS_MASTER_URI=foo ' +
'--np 1 --host foo --wdir /tmp/foo music cle.music : ' +
'--x NRP_MODELS_DIRECTORY --x PYTHONPATH --x ROS_MASTER_URI=foo ' +
'--np 3 --host bar --wdir /tmp/bar music cle.music')
'mpirun -envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI foo ' +
'-np 1 -host foo -wdir /tmp/foo music cle.music : ' +
'-envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI foo ' +
'-np 3 -host bar -wdir /tmp/bar music cle.music')
@patch('subprocess.Popen', new=Mock(return_value=MockProcess()))
def test_run(self):
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment