From 4782ac39cc5954b5525870c0151b9886676bc386 Mon Sep 17 00:00:00 2001
From: Kenny Sharma <kenny.sharma@tum.de>
Date: Thu, 2 Mar 2017 13:55:01 +0100
Subject: [PATCH] [NRRPLT-4809] Use mvapich2 mpirun syntax.

We are transitioning from OpenMPI to mvapich2, which has different
syntax for mpirun. This minor change updates the syntax and will ensure
that both local and deployed installed are using mvapich2.

The mpirun (mpiexec.hydra) is now a bit different than the OpenMPI
behavior. When executed, it sends a SIGSTOP to the parent process tree
and suspends everything (the bash terminal and our Python processes),
so it's necessary to now spawn it using the os.setssid parameter to
start a new session for the subprorcess that is separated from the
main process tree.

This patch also removes the subprocess.wait() call which according
to the documentation can cause buffering/memory issues. In it's place
poll() is used with live/faster output of the processes output to the
main log.

Change-Id: I3a9e255986ac6f39a01b7a1a801e8ab24a14abac
---
 .../launch/MUSICMPILauncher.py                | 24 ++++++++++++-------
 .../tests/launch/test_mpi_launcher.py         | 24 ++++++++++++++-----
 2 files changed, 34 insertions(+), 14 deletions(-)

diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py
index 180b865..eaf0dcf 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py
@@ -2,7 +2,7 @@
 """
 Helper class to build and execute a formatted mpirun command for music in the format:
 
-  mpirun -X <vars> -np <proc> --host <hostname/ip> --wdir <temporary work dir> <command> : <repeat>
+  mpirun -envlist <vars> -np <proc> -host <hostname/ip> -wdir <temporary work dir> <command> : ...
 
 where each of the hosts has a specific working directory with necessary config files already
 in place. Also passes environment variables required for NRP/CLE execution.
@@ -32,7 +32,7 @@ class MUSICMPILauncher(object):
         :param tmpdir A valid temporary directory on the remote host to launch in.
         :param processes The number of processes for this host.
         """
-        self._hosts.append('--np {} --host {} --wdir {}'.format(processes, hostname, tmpdir))
+        self._hosts.append('-np {} -host {} -wdir {}'.format(processes, hostname, tmpdir))
 
     def build(self):
         """
@@ -48,29 +48,37 @@ class MUSICMPILauncher(object):
         #       issue with the local installation at FZI, so keep localhost as is for now until
         #       we can explore the issue during SLURM integration)
         ros_master_uri = os.environ.get("ROS_MASTER_URI")
-        envs = ['NRP_MODELS_DIRECTORY',
-                'PYTHONPATH',
-                'ROS_MASTER_URI={}'.format(ros_master_uri)]
 
         # TODO: executable will be script that sets up environment if hosts > 1
         executable = "music cle.music"
 
         # build the command with per host specific configuration
-        envstr = ' '.join(['--x {}'.format(v) for v in envs])
+        envstr = '-envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI {}'.format(ros_master_uri)
         hoststr = ' : '.join(['{} {} {}'.format(envstr, h, executable) for h in self._hosts])
         self._cmd = 'mpirun {}'.format(hoststr)
 
     def run(self):
         """
         Execute the mpirun command and block until it terminates (meaning all child processes
-        should also be terminated if everything was successful.
+        should also be terminated if everything was successful).
         """
 
         if not self._cmd:
             raise Exception('No command set for MUSIC MPI processes, build() was not called!')
 
+        # Spawn the mpirun command, we need to do a few special things because mvapich2 will
+        # send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this
+        # would cause the entire Python stack to stop and create <defunct> processes.
         logger.info("Spawning MUSIC MPI processes: {}".format(self._cmd))
-        self._process = subprocess.Popen([self._cmd], shell=True)
+        self._process = subprocess.Popen(self._cmd.split(),         # no shell to SIGSTOP/hang
+                                         preexec_fn=os.setsid,      # create a new session/tree
+                                         stdin=subprocess.PIPE)     # allow input to the process
+
+        # for some reason we have to send a newline to the new session after setsid
+        self._process.stdin.write('\n\n')
+        self._process.stdin.flush()
+
+        # wait until it terminates
         self._process.wait()
 
     def shutdown(self):
diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py
index 1d4afcd..3caecb0 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py
@@ -6,8 +6,20 @@ from mock import Mock, patch
 import os
 
 
+class MockProcessIO(object):
+
+    def write(self, message):
+        pass
+
+    def flush(self):
+        pass
+
+
 class MockProcess(object):
 
+    def __init__(self):
+        self.stdin=MockProcessIO()
+
     def wait(self):
         pass
 
@@ -24,8 +36,8 @@ class TestControlAdapter(unittest.TestCase):
           self.__launcher.add_host('foo', '/tmp/foo')
           self.__launcher.add_host('bar', '/tmp/bar', 3)
           self.assertEqual(self.__launcher._hosts,
-                           ['--np 1 --host foo --wdir /tmp/foo',
-                            '--np 3 --host bar --wdir /tmp/bar'])
+                           ['-np 1 -host foo -wdir /tmp/foo',
+                            '-np 3 -host bar -wdir /tmp/bar'])
 
       @patch('os.environ.get', new=Mock(return_value='foo'))
       def test_build(self):
@@ -36,10 +48,10 @@ class TestControlAdapter(unittest.TestCase):
           self.__launcher.add_host('bar', '/tmp/bar', 3)
           self.__launcher.build()
           self.assertEqual(self.__launcher._cmd,
-                           'mpirun --x NRP_MODELS_DIRECTORY --x PYTHONPATH --x ROS_MASTER_URI=foo ' +
-                           '--np 1 --host foo --wdir /tmp/foo music cle.music : ' +
-                           '--x NRP_MODELS_DIRECTORY --x PYTHONPATH --x ROS_MASTER_URI=foo ' +
-                           '--np 3 --host bar --wdir /tmp/bar music cle.music')
+                           'mpirun -envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI foo ' +
+                           '-np 1 -host foo -wdir /tmp/foo music cle.music : ' +
+                           '-envlist NRP_MODELS_DIRECTORY -env ROS_MASTER_URI foo ' +
+                           '-np 3 -host bar -wdir /tmp/bar music cle.music')
 
       @patch('subprocess.Popen', new=Mock(return_value=MockProcess()))
       def test_run(self):
-- 
GitLab