Skip to content
Snippets Groups Projects
Commit e42c1245 authored by Kenny Sharma's avatar Kenny Sharma
Browse files

[NRRPLT-3902] Various fixes after Lugano vizcluster testing.

This patch resolves the remaining issues with distributed Nest launching on the
Lugano vizcluster. It includes fixes to:

- pass the music working directory path as an environment variable rather than
  using MUSIC file configuration, as the CLE seems to change working directory
  during the launch of an expeirment and the proxy.xml file was inaccessible

- fix launch success/failure to wait for a non progress bar status message from
  the CLE, this is because the status bar messages are always marked as 'done'
  even in the event of a failure, so it was not a correct way to check success

- start gzbridge on the backend VM by waiting for Gazebo to be started on the
  remote nodes, this is because the vizcluster node cannot start gzbridge and
  would not be reachable by the frontend even if it could

Change-Id: Id182da74b6e01e748003bc8d7694fc83fe572336
parent 684e4594
No related branches found
No related tags found
No related merge requests found
...@@ -103,10 +103,9 @@ class MUSICConfiguration(object): ...@@ -103,10 +103,9 @@ class MUSICConfiguration(object):
# write the actual music launching script to cle.music # write the actual music launching script to cle.music
music_file = os.path.join(path, 'cle.music') music_file = os.path.join(path, 'cle.music')
with open(music_file, 'w') as f: with open(music_file, 'w') as f:
header = {'MUSIC_XML': proxy_file}
ports = [MusicConfigPort(p) for p in self.__conf_xml.port] ports = [MusicConfigPort(p) for p in self.__conf_xml.port]
applications = [self.__applications['CLE'], self.__applications['BRAIN']] applications = [self.__applications['CLE'], self.__applications['BRAIN']]
self.__conf_script = MusicConfigWriter(header, applications, ports) self.__conf_script = MusicConfigWriter(None, applications, ports)
self.__conf_script.write(f) self.__conf_script.write(f)
@staticmethod # required to avoid "no self use" pylint error @staticmethod # required to avoid "no self use" pylint error
......
...@@ -8,6 +8,7 @@ from hbp_nrp_music_interface.cle import MUSICBrainLoader ...@@ -8,6 +8,7 @@ from hbp_nrp_music_interface.cle import MUSICBrainLoader
import music import music
import logging import logging
import os
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -36,13 +37,9 @@ class MUSICPyNNControlAdapter(PyNNControlAdapter): ...@@ -36,13 +37,9 @@ class MUSICPyNNControlAdapter(PyNNControlAdapter):
:param populations: A named list of populations to create :param populations: A named list of populations to create
""" """
# initialize MUSIC and read the population specification xml file that # initialize MUSIC and load proxies from the MUSIC proxy xml
# is referenced in the configuration script used to launch MUSIC
logger.info("Loading MUSIC proxy brain devices.") logger.info("Loading MUSIC proxy brain devices.")
try: music.Setup()
music_setup = music.Setup() music_path = os.environ.get('NRP_MUSIC_DIRECTORY')
music_xml = music_setup.config('MUSIC_XML') proxy_file = os.path.join(music_path, 'proxy.xml')
MUSICBrainLoader.load_proxies_from_xml(music_xml, **populations) MUSICBrainLoader.load_proxies_from_xml(proxy_file, **populations)
except music.UndefinedConfig:
raise Exception("Unable to load MUSIC proxy population definitions from MUSIC "
"configuration script. MUSIC_XML value was not properly set.")
...@@ -57,23 +57,19 @@ class MUSICBrainProcess(object): ...@@ -57,23 +57,19 @@ class MUSICBrainProcess(object):
self._brain_controller.load_brain(brain_file, **pop_dict) self._brain_controller.load_brain(brain_file, **pop_dict)
# load the MUSIC proxies for the spawned brain # load the MUSIC proxies for the spawned brain
try: music_path = os.environ.get('NRP_MUSIC_DIRECTORY')
xml_file = self._music_setup.config('MUSIC_XML') proxy_file = os.path.join(music_path, 'proxy.xml')
with open(xml_file, 'r') as f: with open(proxy_file, 'r') as f:
music_xml = f.read() music_xml = f.read()
proxy_model_factory = PyNNProxyFactory(sim) proxy_model_factory = PyNNProxyFactory(sim)
connector_factory = PyNNConnectorFactory(sim) connector_factory = PyNNConnectorFactory(sim)
xml_factory = XmlFactory("BRAIN", xml_factory = XmlFactory("BRAIN",
connector_factory, connector_factory,
proxy_model_factory, proxy_model_factory,
tf_config.brain_root.__dict__) tf_config.brain_root.__dict__)
self._proxies = xml_factory.create_proxies(music_xml) self._proxies = xml_factory.create_proxies(music_xml)
except music.UndefinedConfig:
raise Exception("Unable to load MUSIC proxy population definitions from MUSIC "
"configuration script. MUSIC_XML value was not properly set.")
def run(self): def run(self):
""" """
......
...@@ -13,9 +13,12 @@ import subprocess ...@@ -13,9 +13,12 @@ import subprocess
import time import time
import rospy import rospy
import rosnode
from std_msgs.msg import String from std_msgs.msg import String
import json import json
from hbp_nrp_cle import config
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -29,6 +32,7 @@ class MUSICMPILauncher(object): ...@@ -29,6 +32,7 @@ class MUSICMPILauncher(object):
self._cmd = None self._cmd = None
self._process = None self._process = None
self._launched = False self._launched = False
self._gazebo_master_uri = None
def add_host(self, hostname, tmpdir, processes=1): def add_host(self, hostname, tmpdir, processes=1):
""" """
...@@ -38,7 +42,8 @@ class MUSICMPILauncher(object): ...@@ -38,7 +42,8 @@ class MUSICMPILauncher(object):
:param tmpdir A valid temporary directory on the remote host to launch in. :param tmpdir A valid temporary directory on the remote host to launch in.
:param processes The number of processes for this host. :param processes The number of processes for this host.
""" """
self._hosts.append('-np {} -host {} -wdir {}'.format(processes, hostname, tmpdir)) self._hosts.append('-np {p} -host {h} -wdir {t} -genv NRP_MUSIC_DIRECTORY {t}'
.format(p=processes, h=hostname, t=tmpdir))
def build(self): def build(self):
""" """
...@@ -71,13 +76,9 @@ class MUSICMPILauncher(object): ...@@ -71,13 +76,9 @@ class MUSICMPILauncher(object):
logger.info("Spawning MUSIC MPI processes: {}".format(self._cmd)) logger.info("Spawning MUSIC MPI processes: {}".format(self._cmd))
self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang
preexec_fn=os.setsid, # create a new session/tree preexec_fn=os.setsid, # create a new session/tree
stdin=subprocess.PIPE, # allow input to the process stdin=subprocess.PIPE, # hangs without valid stdin
env=env_vars) # environment variables env=env_vars) # environment variables
# for some reason we have to send a newline to the new session after setsid
self._process.stdin.write('\n\n')
self._process.stdin.flush()
# wait until the CLE subprocess initializes properly or fails, the conditions are: # wait until the CLE subprocess initializes properly or fails, the conditions are:
# failure if process exits (bad command, failure to launch, etc.) # failure if process exits (bad command, failure to launch, etc.)
# success if the CLE publishes a status update indicating loading is complete # success if the CLE publishes a status update indicating loading is complete
...@@ -85,9 +86,14 @@ class MUSICMPILauncher(object): ...@@ -85,9 +86,14 @@ class MUSICMPILauncher(object):
# subscribe for loading status updates from the CLE, wait for completion message # subscribe for loading status updates from the CLE, wait for completion message
status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status) status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status)
# wait for one of the terminating conditions # wait for the process to abort or be successfully launched
while not self._process.poll() and not self._launched: while not self._process.poll() and not self._launched:
time.sleep(1)
# ensure gzbridge is running and accessible in deployed configurations
self._check_gzbridge()
# very short sleep to be as responsive as possible
time.sleep(0.1)
# disconnect the temporary status subscriber # disconnect the temporary status subscriber
status_sub.unregister() status_sub.unregister()
...@@ -108,14 +114,40 @@ class MUSICMPILauncher(object): ...@@ -108,14 +114,40 @@ class MUSICMPILauncher(object):
status = json.loads(msg.data) status = json.loads(msg.data)
# we may get a few extra status messages before unregistering, ignore them # ignore status bar messages, these don't give us information if a launch has failed
if 'progress' not in status: # since the loading task will always be set to 'done' when it aborts
if 'progress' in status:
return
# received a simulation status "tick", everything has launched successfully
self._launched = True
def _check_gzbridge(self):
"""
gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients
that are configured and able to reach the backend machines. If Gazebo is launched on a
remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start
gzbridge on this host (a backend server).
"""
if self._gazebo_master_uri:
return
# request the ip of the Gazebo node, result will be -1 if not found
res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True)
if res == -1:
return return
# check for task completion # replace the ROS port with the Gazebo port, configure env, and run gzbridge
if status['progress']['task'] == 'Neurorobotics Closed Loop Engine': self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345'
if 'done' in status['progress'] and status['progress']['done']:
self._launched = True # only need to start the gzbridge if running in a deployed configuration
if '127.0.0.1' not in self._gazebo_master_uri:
# this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we
# cannot import and reuse due to circular dependencies
os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri
os.system(config.config.get('gzbridge', 'restart-cmd'))
def run(self): def run(self):
""" """
...@@ -135,18 +167,23 @@ class MUSICMPILauncher(object): ...@@ -135,18 +167,23 @@ class MUSICMPILauncher(object):
def shutdown(self): def shutdown(self):
""" """
Attempt to forecfully shutdown the mpirun command if it is still running and has not Attempt to forecfully shutdown the mpirun command if it is still running and has not
cleanly shut itself down. cleanly shut itself down. Guaranteed to be called after launch success or failure.
""" """
if not self._process:
return
# try to terminate the mpirun command, mpirun will automatically exit nautrally when all # try to terminate the mpirun command, mpirun will automatically exit nautrally when all
# of its spawned child processes exit or are killed, so this isn't explicitly necessary # of its spawned child processes exit or are killed, so this isn't explicitly necessary
try: if self._process:
self._process.kill() try:
except OSError: # the process has already cleanly terminated self._process.kill()
pass except OSError: # the process has already cleanly terminated
pass
# terminate the gzbrige process/websocket if we started it above
if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri:
os.system(config.config.get('gzbridge', 'stop-cmd'))
# reset all class variables to prevent class reuse
self._gazebo_master_uri = None
self._launched = False self._launched = False
self._process = None self._process = None
self._cmd = None self._cmd = None
MUSIC_XML = "proxy.xml"
[CLE] [CLE]
np = 4 np = 4
binary = cle_binary binary = cle_binary
......
...@@ -19,10 +19,12 @@ class TestControlAdapter(unittest.TestCase): ...@@ -19,10 +19,12 @@ class TestControlAdapter(unittest.TestCase):
self.assertRaises(Exception, mc.load_brain, 'foo.xml') self.assertRaises(Exception, mc.load_brain, 'foo.xml')
@patch('music.Setup', name=MockedMUSICSetup()) @patch('music.Setup', name=MockedMUSICSetup())
@patch('os.environ.get', return_value='')
@patch('hbp_nrp_music_interface.cle.MUSICBrainLoader.load_proxies_from_xml') @patch('hbp_nrp_music_interface.cle.MUSICBrainLoader.load_proxies_from_xml')
def test_valid_xml(self, mocked_setup, mocked_load): def test_valid_xml(self, mocked_setup, mocked_environ, mocked_load):
mc = MUSICPyNNControlAdapter() mc = MUSICPyNNControlAdapter()
mc.load_brain('foo.xml') mc.load_brain('foo.xml')
mocked_environ.assert_called_once_with('NRP_MUSIC_DIRECTORY')
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -51,8 +51,8 @@ class TestControlAdapter(unittest.TestCase): ...@@ -51,8 +51,8 @@ class TestControlAdapter(unittest.TestCase):
self.__launcher.add_host('foo', '/tmp/foo') self.__launcher.add_host('foo', '/tmp/foo')
self.__launcher.add_host('bar', '/tmp/bar', 3) self.__launcher.add_host('bar', '/tmp/bar', 3)
self.assertEqual(self.__launcher._hosts, self.assertEqual(self.__launcher._hosts,
['-np 1 -host foo -wdir /tmp/foo', ['-np 1 -host foo -wdir /tmp/foo -genv NRP_MUSIC_DIRECTORY /tmp/foo',
'-np 3 -host bar -wdir /tmp/bar']) '-np 3 -host bar -wdir /tmp/bar -genv NRP_MUSIC_DIRECTORY /tmp/bar'])
@patch('os.environ.get', new=Mock(return_value='foo')) @patch('os.environ.get', new=Mock(return_value='foo'))
def test_build(self): def test_build(self):
...@@ -63,8 +63,8 @@ class TestControlAdapter(unittest.TestCase): ...@@ -63,8 +63,8 @@ class TestControlAdapter(unittest.TestCase):
self.__launcher.add_host('bar', '/tmp/bar', 3) self.__launcher.add_host('bar', '/tmp/bar', 3)
self.__launcher.build() self.__launcher.build()
self.assertEqual(self.__launcher._cmd, self.assertEqual(self.__launcher._cmd,
'mpirun -np 1 -host foo -wdir /tmp/foo music cle.music : ' + 'mpirun -np 1 -host foo -wdir /tmp/foo -genv NRP_MUSIC_DIRECTORY /tmp/foo music cle.music : ' +
'-np 3 -host bar -wdir /tmp/bar music cle.music') '-np 3 -host bar -wdir /tmp/bar -genv NRP_MUSIC_DIRECTORY /tmp/bar music cle.music')
def test_on_status(self): def test_on_status(self):
...@@ -73,15 +73,55 @@ class TestControlAdapter(unittest.TestCase): ...@@ -73,15 +73,55 @@ class TestControlAdapter(unittest.TestCase):
# not a progress message, no update to launched # not a progress message, no update to launched
msg = String() msg = String()
msg.data = json.dumps({'something': 'else'}) msg.data = json.dumps({'progress': {'task': 'Neurorobotics Closed Loop Engine'}})
self.__launcher._on_status(msg) self.__launcher._on_status(msg)
self.assertEqual(self.__launcher._launched, False) self.assertEqual(self.__launcher._launched, False)
# progress completion message # progress completion message
msg.data = json.dumps({'progress': {'task': 'Neurorobotics Closed Loop Engine', 'done': True}}) msg.data = json.dumps({'simulation': 'status'})
self.__launcher._on_status(msg) self.__launcher._on_status(msg)
self.assertEqual(self.__launcher._launched, True) self.assertEqual(self.__launcher._launched, True)
@patch('rosnode.get_api_uri')
@patch('rospy.get_master', return_value='foo')
@patch('os.environ', return_value={})
@patch('hbp_nrp_cle.config.config.get', return_value='mock_gzbridge_cmd')
@patch('os.system')
def test_check_gzbridge(self, system_mock, config_mock, environ_mock, rospy_mock, rosnode_mock):
# simulate a call when uri has already been set
self.__launcher._gazebo_master_uri = 'something'
self.__launcher._check_gzbridge()
self.assertEqual(rosnode_mock.call_count, 0)
self.assertEqual(environ_mock.call_count, 0)
self.assertEqual(system_mock.call_count, 0)
# simulate node not yet started
self.__launcher._gazebo_master_uri = None
rosnode_mock.return_value = (-1, '', '')
self.__launcher._check_gzbridge()
rosnode_mock.assert_called_with('foo', '/gazebo', True)
self.assertEqual(environ_mock.call_count, 0)
self.assertEqual(system_mock.call_count, 0)
# simulate started on localhost
self.__launcher._gazebo_master_uri = None
rosnode_mock.return_value = (1, '', 'http://127.0.0.1:54321')
self.__launcher._check_gzbridge()
rosnode_mock.assert_called_with('foo', '/gazebo', True)
self.assertEqual(self.__launcher._gazebo_master_uri, 'http://127.0.0.1:11345')
self.assertEqual(environ_mock.call_count, 0)
self.assertEqual(system_mock.call_count, 0)
# simulate started on remote host
self.__launcher._gazebo_master_uri = None
rosnode_mock.return_value = (1, '', 'http://1.2.3.4:54321')
self.__launcher._check_gzbridge()
rosnode_mock.assert_called_with('foo', '/gazebo', True)
self.assertEqual(self.__launcher._gazebo_master_uri, 'http://1.2.3.4:11345')
system_mock.assert_called_once_with('mock_gzbridge_cmd')
@patch('rospy.Subscriber') @patch('rospy.Subscriber')
def test_launch(self, rospy_mock): def test_launch(self, rospy_mock):
...@@ -92,16 +132,19 @@ class TestControlAdapter(unittest.TestCase): ...@@ -92,16 +132,19 @@ class TestControlAdapter(unittest.TestCase):
# not initialized, cannot run without a command # not initialized, cannot run without a command
self.assertRaises(Exception, self.__launcher.launch) self.assertRaises(Exception, self.__launcher.launch)
# initialize with a valid process # patch gzbridge launch function to be tested separately
self.__launcher._cmd = 'foo' with patch.object(self.__launcher, '_check_gzbridge', Mock()):
with patch('subprocess.Popen', MockProcess):
self.__launcher._launched = True # initialize with a valid process
self.__launcher.launch() self.__launcher._cmd = 'foo'
with patch('subprocess.Popen', MockProcess):
self.__launcher._launched = True
self.__launcher.launch()
# initialize with an invalid process # initialize with an invalid process
with patch('subprocess.Popen', MockFailedProcess): with patch('subprocess.Popen', MockFailedProcess):
self.__launcher._launched = False self.__launcher._launched = False
self.assertRaises(Exception, self.__launcher.launch) self.assertRaises(Exception, self.__launcher.launch)
def test_run(self): def test_run(self):
...@@ -119,12 +162,13 @@ class TestControlAdapter(unittest.TestCase): ...@@ -119,12 +162,13 @@ class TestControlAdapter(unittest.TestCase):
self.__launcher._process = MockFailedProcess(None, None, None, None) self.__launcher._process = MockFailedProcess(None, None, None, None)
self.assertRaises(Exception, self.__launcher.run) self.assertRaises(Exception, self.__launcher.run)
def test_shutdown(self): @patch('hbp_nrp_cle.config.config.get', return_value='mock_gzbridge_cmd')
self.__launcher._process = None @patch('os.system')
self.__launcher.shutdown() def test_shutdown(self, system_mock, config_mock):
self.__launcher._process = MockProcess(None, None, None, None) self.__launcher._process = MockProcess(None, None, None, None)
self.__launcher._gazebo_master_uri = 'http://1.2.3.4:11345'
self.__launcher.shutdown() self.__launcher.shutdown()
system_mock.assert_called_once_with('mock_gzbridge_cmd')
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment