diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/bibi/bibi_music_config.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/bibi/bibi_music_config.py index 9ab03714ea86c55719b80cfa020ceaef522b9929..821582733167d061693d0382ac1736616edf344b 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/bibi/bibi_music_config.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/bibi/bibi_music_config.py @@ -103,10 +103,9 @@ class MUSICConfiguration(object): # write the actual music launching script to cle.music music_file = os.path.join(path, 'cle.music') with open(music_file, 'w') as f: - header = {'MUSIC_XML': proxy_file} ports = [MusicConfigPort(p) for p in self.__conf_xml.port] applications = [self.__applications['CLE'], self.__applications['BRAIN']] - self.__conf_script = MusicConfigWriter(header, applications, ports) + self.__conf_script = MusicConfigWriter(None, applications, ports) self.__conf_script.write(f) @staticmethod # required to avoid "no self use" pylint error diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py index 5ff70f9d07c9f3a0fbab84e5c4e2434d2145b964..ded51a715eaa1b0b14997934512388bb81353331 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/cle/MUSICPyNNControlAdapter.py @@ -8,6 +8,7 @@ from hbp_nrp_music_interface.cle import MUSICBrainLoader import music import logging +import os logger = logging.getLogger(__name__) @@ -36,13 +37,9 @@ class MUSICPyNNControlAdapter(PyNNControlAdapter): :param populations: A named list of populations to create """ - # initialize MUSIC and read the population specification xml file that - # is referenced in the configuration script used to launch MUSIC + # initialize MUSIC and load proxies from the MUSIC proxy xml logger.info("Loading MUSIC proxy brain devices.") - try: - music_setup = music.Setup() - music_xml = music_setup.config('MUSIC_XML') - MUSICBrainLoader.load_proxies_from_xml(music_xml, **populations) - except music.UndefinedConfig: - raise Exception("Unable to load MUSIC proxy population definitions from MUSIC " - "configuration script. MUSIC_XML value was not properly set.") + music.Setup() + music_path = os.environ.get('NRP_MUSIC_DIRECTORY') + proxy_file = os.path.join(music_path, 'proxy.xml') + MUSICBrainLoader.load_proxies_from_xml(proxy_file, **populations) diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py index d3be1aa86e9a558dffa0520fcdd2ed4bb5c5db68..49a4196cbfb62a53b872ea8b230d6dd0826ea47c 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICBrainProcess.py @@ -57,23 +57,19 @@ class MUSICBrainProcess(object): self._brain_controller.load_brain(brain_file, **pop_dict) # load the MUSIC proxies for the spawned brain - try: - xml_file = self._music_setup.config('MUSIC_XML') - with open(xml_file, 'r') as f: - music_xml = f.read() - - proxy_model_factory = PyNNProxyFactory(sim) - connector_factory = PyNNConnectorFactory(sim) - xml_factory = XmlFactory("BRAIN", - connector_factory, - proxy_model_factory, - tf_config.brain_root.__dict__) - - self._proxies = xml_factory.create_proxies(music_xml) - - except music.UndefinedConfig: - raise Exception("Unable to load MUSIC proxy population definitions from MUSIC " - "configuration script. MUSIC_XML value was not properly set.") + music_path = os.environ.get('NRP_MUSIC_DIRECTORY') + proxy_file = os.path.join(music_path, 'proxy.xml') + with open(proxy_file, 'r') as f: + music_xml = f.read() + + proxy_model_factory = PyNNProxyFactory(sim) + connector_factory = PyNNConnectorFactory(sim) + xml_factory = XmlFactory("BRAIN", + connector_factory, + proxy_model_factory, + tf_config.brain_root.__dict__) + + self._proxies = xml_factory.create_proxies(music_xml) def run(self): """ diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py index 355ae4c6d909c6ff1dfe3686da16b767dd965211..876c191f16790a02889c499de10b408bb988a16d 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/launch/MUSICMPILauncher.py @@ -13,9 +13,12 @@ import subprocess import time import rospy +import rosnode from std_msgs.msg import String import json +from hbp_nrp_cle import config + logger = logging.getLogger(__name__) @@ -29,6 +32,7 @@ class MUSICMPILauncher(object): self._cmd = None self._process = None self._launched = False + self._gazebo_master_uri = None def add_host(self, hostname, tmpdir, processes=1): """ @@ -38,7 +42,8 @@ class MUSICMPILauncher(object): :param tmpdir A valid temporary directory on the remote host to launch in. :param processes The number of processes for this host. """ - self._hosts.append('-np {} -host {} -wdir {}'.format(processes, hostname, tmpdir)) + self._hosts.append('-np {p} -host {h} -wdir {t} -genv NRP_MUSIC_DIRECTORY {t}' + .format(p=processes, h=hostname, t=tmpdir)) def build(self): """ @@ -71,13 +76,9 @@ class MUSICMPILauncher(object): logger.info("Spawning MUSIC MPI processes: {}".format(self._cmd)) self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang preexec_fn=os.setsid, # create a new session/tree - stdin=subprocess.PIPE, # allow input to the process + stdin=subprocess.PIPE, # hangs without valid stdin env=env_vars) # environment variables - # for some reason we have to send a newline to the new session after setsid - self._process.stdin.write('\n\n') - self._process.stdin.flush() - # wait until the CLE subprocess initializes properly or fails, the conditions are: # failure if process exits (bad command, failure to launch, etc.) # success if the CLE publishes a status update indicating loading is complete @@ -85,9 +86,14 @@ class MUSICMPILauncher(object): # subscribe for loading status updates from the CLE, wait for completion message status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status) - # wait for one of the terminating conditions + # wait for the process to abort or be successfully launched while not self._process.poll() and not self._launched: - time.sleep(1) + + # ensure gzbridge is running and accessible in deployed configurations + self._check_gzbridge() + + # very short sleep to be as responsive as possible + time.sleep(0.1) # disconnect the temporary status subscriber status_sub.unregister() @@ -108,14 +114,40 @@ class MUSICMPILauncher(object): status = json.loads(msg.data) - # we may get a few extra status messages before unregistering, ignore them - if 'progress' not in status: + # ignore status bar messages, these don't give us information if a launch has failed + # since the loading task will always be set to 'done' when it aborts + if 'progress' in status: + return + + # received a simulation status "tick", everything has launched successfully + self._launched = True + + def _check_gzbridge(self): + """ + gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients + that are configured and able to reach the backend machines. If Gazebo is launched on a + remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start + gzbridge on this host (a backend server). + """ + + if self._gazebo_master_uri: + return + + # request the ip of the Gazebo node, result will be -1 if not found + res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True) + if res == -1: return - # check for task completion - if status['progress']['task'] == 'Neurorobotics Closed Loop Engine': - if 'done' in status['progress'] and status['progress']['done']: - self._launched = True + # replace the ROS port with the Gazebo port, configure env, and run gzbridge + self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345' + + # only need to start the gzbridge if running in a deployed configuration + if '127.0.0.1' not in self._gazebo_master_uri: + + # this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we + # cannot import and reuse due to circular dependencies + os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri + os.system(config.config.get('gzbridge', 'restart-cmd')) def run(self): """ @@ -135,18 +167,23 @@ class MUSICMPILauncher(object): def shutdown(self): """ Attempt to forecfully shutdown the mpirun command if it is still running and has not - cleanly shut itself down. + cleanly shut itself down. Guaranteed to be called after launch success or failure. """ - if not self._process: - return # try to terminate the mpirun command, mpirun will automatically exit nautrally when all # of its spawned child processes exit or are killed, so this isn't explicitly necessary - try: - self._process.kill() - except OSError: # the process has already cleanly terminated - pass - + if self._process: + try: + self._process.kill() + except OSError: # the process has already cleanly terminated + pass + + # terminate the gzbrige process/websocket if we started it above + if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri: + os.system(config.config.get('gzbridge', 'stop-cmd')) + + # reset all class variables to prevent class reuse + self._gazebo_master_uri = None self._launched = False self._process = None self._cmd = None diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/bibi/config.music b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/bibi/config.music index f8955b5e9551b528ac365a3cd1fcafbf02ab026f..c3235d0cce0bf890c9353965f28929fe276dddc4 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/bibi/config.music +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/bibi/config.music @@ -1,5 +1,3 @@ -MUSIC_XML = "proxy.xml" - [CLE] np = 4 binary = cle_binary diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/cle/test_control_adapter.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/cle/test_control_adapter.py index 5d1172893ae3b7973f1b8608f565e1e82abf1da3..16c5a8abebd44296192d3fc0862997cb2c1d3374 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/cle/test_control_adapter.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/cle/test_control_adapter.py @@ -19,10 +19,12 @@ class TestControlAdapter(unittest.TestCase): self.assertRaises(Exception, mc.load_brain, 'foo.xml') @patch('music.Setup', name=MockedMUSICSetup()) + @patch('os.environ.get', return_value='') @patch('hbp_nrp_music_interface.cle.MUSICBrainLoader.load_proxies_from_xml') - def test_valid_xml(self, mocked_setup, mocked_load): + def test_valid_xml(self, mocked_setup, mocked_environ, mocked_load): mc = MUSICPyNNControlAdapter() mc.load_brain('foo.xml') + mocked_environ.assert_called_once_with('NRP_MUSIC_DIRECTORY') if __name__ == "__main__": unittest.main() diff --git a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py index 1782c559e0c7d7d7afc013ff4248d115dbc62ccf..64735e971576c3a5db0555d7d4af34177c7b73f2 100644 --- a/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py +++ b/hbp_nrp_music_interface/hbp_nrp_music_interface/tests/launch/test_mpi_launcher.py @@ -51,8 +51,8 @@ class TestControlAdapter(unittest.TestCase): self.__launcher.add_host('foo', '/tmp/foo') self.__launcher.add_host('bar', '/tmp/bar', 3) self.assertEqual(self.__launcher._hosts, - ['-np 1 -host foo -wdir /tmp/foo', - '-np 3 -host bar -wdir /tmp/bar']) + ['-np 1 -host foo -wdir /tmp/foo -genv NRP_MUSIC_DIRECTORY /tmp/foo', + '-np 3 -host bar -wdir /tmp/bar -genv NRP_MUSIC_DIRECTORY /tmp/bar']) @patch('os.environ.get', new=Mock(return_value='foo')) def test_build(self): @@ -63,8 +63,8 @@ class TestControlAdapter(unittest.TestCase): self.__launcher.add_host('bar', '/tmp/bar', 3) self.__launcher.build() self.assertEqual(self.__launcher._cmd, - 'mpirun -np 1 -host foo -wdir /tmp/foo music cle.music : ' + - '-np 3 -host bar -wdir /tmp/bar music cle.music') + 'mpirun -np 1 -host foo -wdir /tmp/foo -genv NRP_MUSIC_DIRECTORY /tmp/foo music cle.music : ' + + '-np 3 -host bar -wdir /tmp/bar -genv NRP_MUSIC_DIRECTORY /tmp/bar music cle.music') def test_on_status(self): @@ -73,15 +73,55 @@ class TestControlAdapter(unittest.TestCase): # not a progress message, no update to launched msg = String() - msg.data = json.dumps({'something': 'else'}) + msg.data = json.dumps({'progress': {'task': 'Neurorobotics Closed Loop Engine'}}) self.__launcher._on_status(msg) self.assertEqual(self.__launcher._launched, False) # progress completion message - msg.data = json.dumps({'progress': {'task': 'Neurorobotics Closed Loop Engine', 'done': True}}) + msg.data = json.dumps({'simulation': 'status'}) self.__launcher._on_status(msg) self.assertEqual(self.__launcher._launched, True) + @patch('rosnode.get_api_uri') + @patch('rospy.get_master', return_value='foo') + @patch('os.environ', return_value={}) + @patch('hbp_nrp_cle.config.config.get', return_value='mock_gzbridge_cmd') + @patch('os.system') + def test_check_gzbridge(self, system_mock, config_mock, environ_mock, rospy_mock, rosnode_mock): + + # simulate a call when uri has already been set + self.__launcher._gazebo_master_uri = 'something' + self.__launcher._check_gzbridge() + self.assertEqual(rosnode_mock.call_count, 0) + self.assertEqual(environ_mock.call_count, 0) + self.assertEqual(system_mock.call_count, 0) + + # simulate node not yet started + self.__launcher._gazebo_master_uri = None + rosnode_mock.return_value = (-1, '', '') + self.__launcher._check_gzbridge() + rosnode_mock.assert_called_with('foo', '/gazebo', True) + self.assertEqual(environ_mock.call_count, 0) + self.assertEqual(system_mock.call_count, 0) + + # simulate started on localhost + self.__launcher._gazebo_master_uri = None + rosnode_mock.return_value = (1, '', 'http://127.0.0.1:54321') + self.__launcher._check_gzbridge() + rosnode_mock.assert_called_with('foo', '/gazebo', True) + self.assertEqual(self.__launcher._gazebo_master_uri, 'http://127.0.0.1:11345') + self.assertEqual(environ_mock.call_count, 0) + self.assertEqual(system_mock.call_count, 0) + + # simulate started on remote host + self.__launcher._gazebo_master_uri = None + rosnode_mock.return_value = (1, '', 'http://1.2.3.4:54321') + self.__launcher._check_gzbridge() + rosnode_mock.assert_called_with('foo', '/gazebo', True) + self.assertEqual(self.__launcher._gazebo_master_uri, 'http://1.2.3.4:11345') + system_mock.assert_called_once_with('mock_gzbridge_cmd') + + @patch('rospy.Subscriber') def test_launch(self, rospy_mock): @@ -92,16 +132,19 @@ class TestControlAdapter(unittest.TestCase): # not initialized, cannot run without a command self.assertRaises(Exception, self.__launcher.launch) - # initialize with a valid process - self.__launcher._cmd = 'foo' - with patch('subprocess.Popen', MockProcess): - self.__launcher._launched = True - self.__launcher.launch() + # patch gzbridge launch function to be tested separately + with patch.object(self.__launcher, '_check_gzbridge', Mock()): + + # initialize with a valid process + self.__launcher._cmd = 'foo' + with patch('subprocess.Popen', MockProcess): + self.__launcher._launched = True + self.__launcher.launch() - # initialize with an invalid process - with patch('subprocess.Popen', MockFailedProcess): - self.__launcher._launched = False - self.assertRaises(Exception, self.__launcher.launch) + # initialize with an invalid process + with patch('subprocess.Popen', MockFailedProcess): + self.__launcher._launched = False + self.assertRaises(Exception, self.__launcher.launch) def test_run(self): @@ -119,12 +162,13 @@ class TestControlAdapter(unittest.TestCase): self.__launcher._process = MockFailedProcess(None, None, None, None) self.assertRaises(Exception, self.__launcher.run) - def test_shutdown(self): - self.__launcher._process = None - self.__launcher.shutdown() - + @patch('hbp_nrp_cle.config.config.get', return_value='mock_gzbridge_cmd') + @patch('os.system') + def test_shutdown(self, system_mock, config_mock): self.__launcher._process = MockProcess(None, None, None, None) + self.__launcher._gazebo_master_uri = 'http://1.2.3.4:11345' self.__launcher.shutdown() + system_mock.assert_called_once_with('mock_gzbridge_cmd') if __name__ == "__main__": unittest.main()