From e42c124558add52dd3217166299cd77244dca3dc Mon Sep 17 00:00:00 2001
From: Kenny Sharma <kenny.sharma@tum.de>
Date: Mon, 27 Mar 2017 08:26:07 +0200
Subject: [PATCH] [NRRPLT-3902] Various fixes after Lugano vizcluster testing.

This patch resolves the remaining issues with distributed Nest launching on the
Lugano vizcluster. It includes fixes to:

- pass the music working directory path as an environment variable rather than
  using MUSIC file configuration, as the CLE seems to change working directory
  during the launch of an expeirment and the proxy.xml file was inaccessible

- fix launch success/failure to wait for a non progress bar status message from
  the CLE, this is because the status bar messages are always marked as 'done'
  even in the event of a failure, so it was not a correct way to check success

- start gzbridge on the backend VM by waiting for Gazebo to be started on the
  remote nodes, this is because the vizcluster node cannot start gzbridge and
  would not be reachable by the frontend even if it could

Change-Id: Id182da74b6e01e748003bc8d7694fc83fe572336
---
 .../bibi/bibi_music_config.py                 |  3 +-
 .../cle/MUSICPyNNControlAdapter.py            | 15 ++--
 .../launch/MUSICBrainProcess.py               | 30 +++----
 .../launch/MUSICMPILauncher.py                | 81 +++++++++++++-----
 .../tests/bibi/config.music                   |  2 -
 .../tests/cle/test_control_adapter.py         |  4 +-
 .../tests/launch/test_mpi_launcher.py         | 82 ++++++++++++++-----
 7 files changed, 145 insertions(+), 72 deletions(-)

diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/bibi/bibi_music_config.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/bibi/bibi_music_config.py
index 9ab0371..8215827 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/bibi/bibi_music_config.py
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/bibi/bibi_music_config.py
@@ -103,10 +103,9 @@ class MUSICConfiguration(object):
         # write the actual music launching script to cle.music
         music_file = os.path.join(path, 'cle.music')
         with open(music_file, 'w') as f:
-            header = {'MUSIC_XML': proxy_file}
             ports = [MusicConfigPort(p) for p in self.__conf_xml.port]
             applications = [self.__applications['CLE'], self.__applications['BRAIN']]
-            self.__conf_script = MusicConfigWriter(header, applications, ports)
+            self.__conf_script = MusicConfigWriter(None, applications, ports)
             self.__conf_script.write(f)
 
     @staticmethod # required to avoid "no self use" pylint error
diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py
index 5ff70f9..ded51a7 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py
@@ -8,6 +8,7 @@ from hbp_nrp_music_interface.cle import MUSICBrainLoader
 
 import music
 import logging
+import os
 
 logger = logging.getLogger(__name__)
 
@@ -36,13 +37,9 @@ class MUSICPyNNControlAdapter(PyNNControlAdapter):
         :param populations: A named list of populations to create
         """
 
-        # initialize MUSIC and read the population specification xml file that
-        # is referenced in the configuration script used to launch MUSIC
+        # initialize MUSIC and load proxies from the MUSIC proxy xml
         logger.info("Loading MUSIC proxy brain devices.")
-        try:
-            music_setup = music.Setup()
-            music_xml = music_setup.config('MUSIC_XML')
-            MUSICBrainLoader.load_proxies_from_xml(music_xml, **populations)
-        except music.UndefinedConfig:
-            raise Exception("Unable to load MUSIC proxy population definitions from MUSIC "
-                            "configuration script. MUSIC_XML value was not properly set.")
+        music.Setup()
+        music_path = os.environ.get('NRP_MUSIC_DIRECTORY')
+        proxy_file = os.path.join(music_path, 'proxy.xml')
+        MUSICBrainLoader.load_proxies_from_xml(proxy_file, **populations)
diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py
index d3be1aa..49a4196 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py
@@ -57,23 +57,19 @@ class MUSICBrainProcess(object):
         self._brain_controller.load_brain(brain_file, **pop_dict)
 
         # load the MUSIC proxies for the spawned brain
-        try:
-            xml_file = self._music_setup.config('MUSIC_XML')
-            with open(xml_file, 'r') as f:
-                music_xml = f.read()
-
-            proxy_model_factory = PyNNProxyFactory(sim)
-            connector_factory = PyNNConnectorFactory(sim)
-            xml_factory = XmlFactory("BRAIN",
-                                     connector_factory,
-                                     proxy_model_factory,
-                                     tf_config.brain_root.__dict__)
-
-            self._proxies = xml_factory.create_proxies(music_xml)
-
-        except music.UndefinedConfig:
-            raise Exception("Unable to load MUSIC proxy population definitions from MUSIC "
-                            "configuration script. MUSIC_XML value was not properly set.")
+        music_path = os.environ.get('NRP_MUSIC_DIRECTORY')
+        proxy_file = os.path.join(music_path, 'proxy.xml')
+        with open(proxy_file, 'r') as f:
+            music_xml = f.read()
+
+        proxy_model_factory = PyNNProxyFactory(sim)
+        connector_factory = PyNNConnectorFactory(sim)
+        xml_factory = XmlFactory("BRAIN",
+                                 connector_factory,
+                                 proxy_model_factory,
+                                 tf_config.brain_root.__dict__)
+
+        self._proxies = xml_factory.create_proxies(music_xml)
 
     def run(self):
         """
diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py
index 355ae4c..876c191 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py
@@ -13,9 +13,12 @@ import subprocess
 import time
 
 import rospy
+import rosnode
 from std_msgs.msg import String
 import json
 
+from hbp_nrp_cle import config
+
 logger = logging.getLogger(__name__)
 
 
@@ -29,6 +32,7 @@ class MUSICMPILauncher(object):
         self._cmd = None
         self._process = None
         self._launched = False
+        self._gazebo_master_uri = None
 
     def add_host(self, hostname, tmpdir, processes=1):
         """
@@ -38,7 +42,8 @@ class MUSICMPILauncher(object):
         :param tmpdir A valid temporary directory on the remote host to launch in.
         :param processes The number of processes for this host.
         """
-        self._hosts.append('-np {} -host {} -wdir {}'.format(processes, hostname, tmpdir))
+        self._hosts.append('-np {p} -host {h} -wdir {t} -genv NRP_MUSIC_DIRECTORY {t}'
+                           .format(p=processes, h=hostname, t=tmpdir))
 
     def build(self):
         """
@@ -71,13 +76,9 @@ class MUSICMPILauncher(object):
         logger.info("Spawning MUSIC MPI processes: {}".format(self._cmd))
         self._process = subprocess.Popen(self._cmd.split(),         # no shell to SIGSTOP/hang
                                          preexec_fn=os.setsid,      # create a new session/tree
-                                         stdin=subprocess.PIPE,     # allow input to the process
+                                         stdin=subprocess.PIPE,     # hangs without valid stdin
                                          env=env_vars)              # environment variables
 
-        # for some reason we have to send a newline to the new session after setsid
-        self._process.stdin.write('\n\n')
-        self._process.stdin.flush()
-
         # wait until the CLE subprocess initializes properly or fails, the conditions are:
         # failure if process exits (bad command, failure to launch, etc.)
         # success if the CLE publishes a status update indicating loading is complete
@@ -85,9 +86,14 @@ class MUSICMPILauncher(object):
         # subscribe for loading status updates from the CLE, wait for completion message
         status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status)
 
-        # wait for one of the terminating conditions
+        # wait for the process to abort or be successfully launched
         while not self._process.poll() and not self._launched:
-            time.sleep(1)
+
+            # ensure gzbridge is running and accessible in deployed configurations
+            self._check_gzbridge()
+
+            # very short sleep to be as responsive as possible
+            time.sleep(0.1)
 
         # disconnect the temporary status subscriber
         status_sub.unregister()
@@ -108,14 +114,40 @@ class MUSICMPILauncher(object):
 
         status = json.loads(msg.data)
 
-        # we may get a few extra status messages before unregistering, ignore them
-        if 'progress' not in status:
+        # ignore status bar messages, these don't give us information if a launch has failed
+        # since the loading task will always be set to 'done' when it aborts
+        if 'progress' in status:
+            return
+
+        # received a simulation status "tick", everything has launched successfully
+        self._launched = True
+
+    def _check_gzbridge(self):
+        """
+        gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients
+        that are configured and able to reach the backend machines. If Gazebo is launched on a
+        remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start
+        gzbridge on this host (a backend server).
+        """
+
+        if self._gazebo_master_uri:
+            return
+
+        # request the ip of the Gazebo node, result will be -1 if not found
+        res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True)
+        if res == -1:
             return
 
-        # check for task completion
-        if status['progress']['task'] == 'Neurorobotics Closed Loop Engine':
-            if 'done' in status['progress'] and status['progress']['done']:
-                self._launched = True
+        # replace the ROS port with the Gazebo port, configure env, and run gzbridge
+        self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345'
+
+        # only need to start the gzbridge if running in a deployed configuration
+        if '127.0.0.1' not in self._gazebo_master_uri:
+
+            # this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we
+            # cannot import and reuse due to circular dependencies
+            os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri
+            os.system(config.config.get('gzbridge', 'restart-cmd'))
 
     def run(self):
         """
@@ -135,18 +167,23 @@ class MUSICMPILauncher(object):
     def shutdown(self):
         """
         Attempt to forecfully shutdown the mpirun command if it is still running and has not
-        cleanly shut itself down.
+        cleanly shut itself down. Guaranteed to be called after launch success or failure.
         """
-        if not self._process:
-            return
 
         # try to terminate the mpirun command, mpirun will automatically exit nautrally when all
         # of its spawned child processes exit or are killed, so this isn't explicitly necessary
-        try:
-            self._process.kill()
-        except OSError: # the process has already cleanly terminated
-            pass
-
+        if self._process:
+            try:
+                self._process.kill()
+            except OSError: # the process has already cleanly terminated
+                pass
+
+        # terminate the gzbrige process/websocket if we started it above
+        if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri:
+            os.system(config.config.get('gzbridge', 'stop-cmd'))
+
+        # reset all class variables to prevent class reuse
+        self._gazebo_master_uri = None
         self._launched = False
         self._process = None
         self._cmd = None
diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/bibi/config.music b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/bibi/config.music
index f8955b5..c3235d0 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/bibi/config.music
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/bibi/config.music
@@ -1,5 +1,3 @@
-MUSIC_XML = "proxy.xml"
-
 [CLE]
 np = 4
 binary = cle_binary
diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/cle/test_control_adapter.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/cle/test_control_adapter.py
index 5d11728..16c5a8a 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/cle/test_control_adapter.py
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/cle/test_control_adapter.py
@@ -19,10 +19,12 @@ class TestControlAdapter(unittest.TestCase):
           self.assertRaises(Exception, mc.load_brain, 'foo.xml')
 
       @patch('music.Setup', name=MockedMUSICSetup())
+      @patch('os.environ.get', return_value='')
       @patch('hbp_nrp_music_interface.cle.MUSICBrainLoader.load_proxies_from_xml')
-      def test_valid_xml(self, mocked_setup, mocked_load):
+      def test_valid_xml(self, mocked_setup, mocked_environ, mocked_load):
           mc = MUSICPyNNControlAdapter()
           mc.load_brain('foo.xml')
+          mocked_environ.assert_called_once_with('NRP_MUSIC_DIRECTORY')
 
 if __name__ == "__main__":
     unittest.main()
diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py
index 1782c55..64735e9 100644
--- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py
+++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py
@@ -51,8 +51,8 @@ class TestControlAdapter(unittest.TestCase):
           self.__launcher.add_host('foo', '/tmp/foo')
           self.__launcher.add_host('bar', '/tmp/bar', 3)
           self.assertEqual(self.__launcher._hosts,
-                           ['-np 1 -host foo -wdir /tmp/foo',
-                            '-np 3 -host bar -wdir /tmp/bar'])
+                           ['-np 1 -host foo -wdir /tmp/foo -genv NRP_MUSIC_DIRECTORY /tmp/foo',
+                            '-np 3 -host bar -wdir /tmp/bar -genv NRP_MUSIC_DIRECTORY /tmp/bar'])
 
       @patch('os.environ.get', new=Mock(return_value='foo'))
       def test_build(self):
@@ -63,8 +63,8 @@ class TestControlAdapter(unittest.TestCase):
           self.__launcher.add_host('bar', '/tmp/bar', 3)
           self.__launcher.build()
           self.assertEqual(self.__launcher._cmd,
-                           'mpirun -np 1 -host foo -wdir /tmp/foo music cle.music : ' +
-                           '-np 3 -host bar -wdir /tmp/bar music cle.music')
+                           'mpirun -np 1 -host foo -wdir /tmp/foo -genv NRP_MUSIC_DIRECTORY /tmp/foo music cle.music : ' +
+                           '-np 3 -host bar -wdir /tmp/bar -genv NRP_MUSIC_DIRECTORY /tmp/bar music cle.music')
 
       def test_on_status(self):
 
@@ -73,15 +73,55 @@ class TestControlAdapter(unittest.TestCase):
 
           # not a progress message, no update to launched
           msg = String()
-          msg.data = json.dumps({'something': 'else'})
+          msg.data = json.dumps({'progress': {'task': 'Neurorobotics Closed Loop Engine'}})
           self.__launcher._on_status(msg)
           self.assertEqual(self.__launcher._launched, False)
 
           # progress completion message
-          msg.data = json.dumps({'progress': {'task': 'Neurorobotics Closed Loop Engine', 'done': True}})
+          msg.data = json.dumps({'simulation': 'status'})
           self.__launcher._on_status(msg)
           self.assertEqual(self.__launcher._launched, True)
 
+      @patch('rosnode.get_api_uri')
+      @patch('rospy.get_master', return_value='foo')
+      @patch('os.environ', return_value={})
+      @patch('hbp_nrp_cle.config.config.get', return_value='mock_gzbridge_cmd')
+      @patch('os.system')
+      def test_check_gzbridge(self, system_mock, config_mock, environ_mock, rospy_mock, rosnode_mock):
+
+          # simulate a call when uri has already been set
+          self.__launcher._gazebo_master_uri = 'something'
+          self.__launcher._check_gzbridge()
+          self.assertEqual(rosnode_mock.call_count, 0)
+          self.assertEqual(environ_mock.call_count, 0)
+          self.assertEqual(system_mock.call_count, 0)
+
+          # simulate node not yet started
+          self.__launcher._gazebo_master_uri = None
+          rosnode_mock.return_value = (-1, '', '')
+          self.__launcher._check_gzbridge()
+          rosnode_mock.assert_called_with('foo', '/gazebo', True)
+          self.assertEqual(environ_mock.call_count, 0)
+          self.assertEqual(system_mock.call_count, 0)
+
+          # simulate started on localhost
+          self.__launcher._gazebo_master_uri = None
+          rosnode_mock.return_value = (1, '', 'http://127.0.0.1:54321')
+          self.__launcher._check_gzbridge()
+          rosnode_mock.assert_called_with('foo', '/gazebo', True)
+          self.assertEqual(self.__launcher._gazebo_master_uri, 'http://127.0.0.1:11345')
+          self.assertEqual(environ_mock.call_count, 0)
+          self.assertEqual(system_mock.call_count, 0)
+
+          # simulate started on remote host
+          self.__launcher._gazebo_master_uri = None
+          rosnode_mock.return_value = (1, '', 'http://1.2.3.4:54321')
+          self.__launcher._check_gzbridge()
+          rosnode_mock.assert_called_with('foo', '/gazebo', True)
+          self.assertEqual(self.__launcher._gazebo_master_uri, 'http://1.2.3.4:11345')
+          system_mock.assert_called_once_with('mock_gzbridge_cmd')
+
+
       @patch('rospy.Subscriber')
       def test_launch(self, rospy_mock):
 
@@ -92,16 +132,19 @@ class TestControlAdapter(unittest.TestCase):
           # not initialized, cannot run without a command
           self.assertRaises(Exception, self.__launcher.launch)
 
-          # initialize with a valid process
-          self.__launcher._cmd = 'foo'
-          with patch('subprocess.Popen', MockProcess):
-              self.__launcher._launched = True
-              self.__launcher.launch()
+          # patch gzbridge launch function to be tested separately
+          with patch.object(self.__launcher, '_check_gzbridge', Mock()):
+
+              # initialize with a valid process
+              self.__launcher._cmd = 'foo'
+              with patch('subprocess.Popen', MockProcess):
+                  self.__launcher._launched = True
+                  self.__launcher.launch()
 
-          # initialize with an invalid process
-          with patch('subprocess.Popen', MockFailedProcess):
-              self.__launcher._launched = False
-              self.assertRaises(Exception, self.__launcher.launch)
+              # initialize with an invalid process
+              with patch('subprocess.Popen', MockFailedProcess):
+                  self.__launcher._launched = False
+                  self.assertRaises(Exception, self.__launcher.launch)
 
       def test_run(self):
 
@@ -119,12 +162,13 @@ class TestControlAdapter(unittest.TestCase):
           self.__launcher._process = MockFailedProcess(None, None, None, None)
           self.assertRaises(Exception, self.__launcher.run)
 
-      def test_shutdown(self):
-          self.__launcher._process = None
-          self.__launcher.shutdown()
-
+      @patch('hbp_nrp_cle.config.config.get', return_value='mock_gzbridge_cmd')
+      @patch('os.system')
+      def test_shutdown(self, system_mock, config_mock):
           self.__launcher._process = MockProcess(None, None, None, None)
+          self.__launcher._gazebo_master_uri = 'http://1.2.3.4:11345'
           self.__launcher.shutdown()
+          system_mock.assert_called_once_with('mock_gzbridge_cmd')
 
 if __name__ == "__main__":
     unittest.main()
-- 
GitLab