From 9c5449575cbf4b454c65b8fab634d05e7055dd41 Mon Sep 17 00:00:00 2001 From: Eloy Retamino <retamino@ugr.es> Date: Tue, 3 Nov 2020 17:15:11 +0100 Subject: [PATCH] [NRRPLT-0000] moved rosbridge code to a separate class; executing client in a separate process to bypass roslipby problems in handling multiple clients in the same process --- .../pynrp/rosbridge_handler.py | 329 ++++++++++++++++++ hbp_nrp_virtual_coach/pynrp/simulation.py | 68 ++-- .../pynrp/tests/test_simulation.py | 57 +-- .../pynrp/tests/test_virtual_coach.py | 2 +- 4 files changed, 363 insertions(+), 93 deletions(-) create mode 100644 hbp_nrp_virtual_coach/pynrp/rosbridge_handler.py diff --git a/hbp_nrp_virtual_coach/pynrp/rosbridge_handler.py b/hbp_nrp_virtual_coach/pynrp/rosbridge_handler.py new file mode 100644 index 0000000..cd8ec92 --- /dev/null +++ b/hbp_nrp_virtual_coach/pynrp/rosbridge_handler.py @@ -0,0 +1,329 @@ +# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER +# This file is part of the Neurorobotics Platform software +# Copyright (C) 2014,2015,2016,2017 Human Brain Project +# https://www.humanbrainproject.eu +# +# The Human Brain Project is a European Commission funded project +# in the frame of the Horizon2020 FET Flagship plan. +# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# ---LICENSE-END +""" +An interface to handle rosbridge communication +""" + +import roslibpy +import logging +from multiprocessing import Process, Pipe +from threading import Thread +import time + +_CONNECTION_TIMEOUT = 10 # seconds before stop waiting for rosbridge connection + + +class RosBridgeConnectError(Exception): + """ + Type of error risen when there are connection issues + """ + pass + + +# TODO: add unit tests +class RosBridgeHandler(object): + """ + Provides a convenient interface to connect to RosBridge and handle properly connection error, + disconnections, etc + """ + + def __init__(self): + """ + Creates an instance of this class + """ + + self._logger = logging.getLogger('RosBridgeHandler') + self._is_init = False + self._was_closed = False + self._client = None + self._topics = {} + + @property + def is_init(self): + """ + Returns True if the handler has been initialized + """ + return self._is_init + + @is_init.setter + def is_init(self, value): + """ + Setter method for is_init property + + :param value: boolean value for is_init property + """ + self._is_init = value + + def initialize(self, host, port=None): + """ + Initializes the handler if it is not initialized yet and it was not initialized before. + Creates a RosBridge client and connects it to the server. Raises RosBridgeConnectError + in case of connection errors. + + :param host: string representing the ip of the RosBridge server + :param port: (optional) integer corresponding to the RosBridge server port. None by default. + """ + # Check flags + if self._is_init: + self._logger.info('Rosbridge client already initialized') + return + elif self._was_closed: + self._logger.info('Re-initializing a Rosbridge client is not allowed. it is not ' + 'recommended to initialized any other client in the same process. ' + 'Use class RosBridgeHandlerProcess if you need to do that') + return + + has_failed = False + + # Connect to rosbridge server + try: + self._client = roslibpy.Ros(host=host, port=port) + self._client.run(_CONNECTION_TIMEOUT) + # pylint: disable=broad-except + except Exception: + has_failed = True + else: + if not self._client.is_connected: + has_failed = True + + if has_failed: + raise RosBridgeConnectError('Rosbridge client could not connect to server') + + self._is_init = True + + def subscribe_topic(self, name, ros_type, callback): + """ + Subscribes a callback to a Ros topic + + :param name: string corresponding to the topic address + :param ros_type: string corresponding to the topic type + :param callback: function object to be called upon new message + """ + if not self._is_init: + self._logger.info('Rosbridge client was not correctly initialized. Could not subscribe ' + 'to: %s', name) + return + + self._topics[name] = roslibpy.Topic(self._client, name, ros_type) + self._topics[name].subscribe(callback) + + def unsubscribe_topic(self, name): + """ + Unsubscribes a callback to a Ros topic + + :param name: string corresponding to the topic address + """ + if name in self._topics: + self._topics[name].unsubscribe() + else: + self._logger.info('RosBridge client was not subscribed to: %s', name) + + def close(self): + """ + Closes the RosBridge connection. After this the handler can not be initialized again. + Furthermore, it is not recommended to initialized any other client in the same process. + Use class RosBridgeHandlerProcess if you need to do that. + """ + if not self._is_init: + self._logger.info('Rosbridge client was not correctly initialized. Cannot be closed') + return + + for name in self._topics: + self.unsubscribe_topic(name) + + self._client.close() + self._was_closed = True + self._is_init = False + self._logger.info('RosBridge client was closed correctly') + + +# TODO: add unit tests +class RosBridgeHandlerProcess(RosBridgeHandler): + """ + Extension of RosBridgeHandler that runs the RosBridge client in a separate process. This is + convenient when starting more than one client from the same process is required. + RosBridgeHandler can give problems in this case due to limitations in roslibpy / twisted for + handling re-connections and multiple connections. + """ + + def __init__(self): + """ + Creates an instance of this class + """ + super().__init__() + + self._pipe = None + self._rb_process = None + self._rb_proxy = None + self._in_process = False + self._callbacks = {} + + def initialize(self, host, port=None): + """ + Initializes the handler if it is not initialized yet and it was not initialized before. + Creates a RosBridge client and connects it to the server. Raises RosBridgeConnectError + in case of connection errors. + + :param host: string representing the ip of the RosBridge server + :param port: (optional) integer corresponding to the RosBridge server port. None by default. + """ + if self.is_init: + self._logger.info('Rosbridge client already initialized') + return + + self._pipe = Pipe() + + # starts process with rosbridge client + self._rb_process = Process(target=self._event_loop_process, args=(host, port,)) + self._rb_process.start() + + # starts thread handling callbacks + self._rb_proxy = Thread(target=self._event_loop_proxy) + self._rb_proxy.start() + + # waits for initialization is completed + n = 0 + while not self.is_init and n < _CONNECTION_TIMEOUT: + n += 1 + time.sleep(1) + + if not self.is_init: + raise RosBridgeConnectError('Rosbridge client could not connect to server') + + def _event_loop_process(self, host, port=None): + """ + Method handling the communication with the RosBridge server from a separate process + + :param host: string representing the ip of the RosBridge server + :param port: (optional) integer corresponding to the RosBridge server port. None by default. + """ + # everything is executed inside of the forked process from now on + self._in_process = True + self.is_init = False + + # start rosbridge client + try: + super().initialize(host, port) + # pylint: disable=broad-except + except Exception as e: + self._logger.info('failure connecting to RosBridge server: %s', e) + self._pipe[1].send(['error']) + return + else: + self._pipe[1].send(['init']) + + # listen for proxy requests + def make_callback(_msg): + return lambda x: self._pipe[1].send([_msg[1], x]) + + try: + while True: + msg = self._pipe[1].recv() + if msg[0] == 'close': + self.close() + self._pipe[1].send(['close']) + break + elif msg[0] == 'subscribe': + self.subscribe_topic(msg[1], msg[2], make_callback(msg)) + elif msg[0] == 'unsubscribe': + self.unsubscribe_topic(msg[1]) + # pylint: disable=broad-except + except Exception as e: + self._logger.info('Exception in RosBridge client process: %s', e) + self._pipe[1].send(['error']) + + def _event_loop_proxy(self): + """ + Method handling the communication from the RosBridge client process in a separate thread + """ + while True: + try: + msg = self._pipe[0].recv() + if msg[0] in self._callbacks: + self._callbacks[msg[0]](msg[1]) + elif msg[0] == 'init': + self.is_init = True + elif msg[0] == 'close': + break + elif msg[0] == 'error': + self.is_init = False + raise RosBridgeConnectError('Failure in RosBridge client process') + except EOFError: + self._logger.info('connection closed') + break + + def subscribe_topic(self, name, ros_type, callback): + """ + Subscribes a callback to a Ros topic + + :param name: string corresponding to the topic address + :param ros_type: string corresponding to the topic type + :param callback: function object to be called upon new message + """ + if not self._is_init: + self._logger.info('Rosbridge client was not correctly initialized. Could not subscribe ' + 'to: %s', name) + return + + if self._in_process: + super().subscribe_topic(name, ros_type, callback) + else: + # tell process to subscribe to topic + self._callbacks[name] = callback + self._pipe[0].send(['subscribe', name, ros_type]) + + def unsubscribe_topic(self, name): + """ + Unsubscribes a callback to a Ros topic + + :param name: string corresponding to the topic address + """ + if not self._is_init: + self._logger.info('Rosbridge client was not correctly initialized. Could not ' + 'unsubscribe from: %s', name) + return + + if self._in_process: + super().unsubscribe_topic(name) + else: + # tell process to unsubscribe from topic + self._pipe[0].send(['unsubscribe', name]) + + def close(self): + """ + Closes the RosBridge connection. After this the handler can not be initialized again. + Furthermore, it is not recommended to initialized any other client in the same process. + Use class RosBridgeHandlerProcess if you need to do that. + """ + if not self._is_init: + self._logger.info('Rosbridge client was not correctly initialized. Cannot be closed') + return + + if self._in_process: + super().close() + else: + # tell process to shutdown + self._pipe[0].send(['close']) + self._callbacks = {} + self.is_init = False diff --git a/hbp_nrp_virtual_coach/pynrp/simulation.py b/hbp_nrp_virtual_coach/pynrp/simulation.py index c628631..2a8b79f 100644 --- a/hbp_nrp_virtual_coach/pynrp/simulation.py +++ b/hbp_nrp_virtual_coach/pynrp/simulation.py @@ -45,7 +45,7 @@ from future import standard_library from pynrp.requests_client import RequestsClient from pynrp.config import Config -import roslibpy +from pynrp.rosbridge_handler import RosBridgeHandlerProcess try: from urllib2 import HTTPError @@ -84,8 +84,6 @@ class Simulation(object): self.__previous_subtask = None self.__ros_client = None - self.__status_sub = None - self.__error_sub = None self.__status_callbacks = [] # class level logger so that we can change the name of the logger dynamically @@ -178,31 +176,25 @@ class Simulation(object): self.__experiment_id = experiment_id # Connect to rosbridge and create subscribers + ws_url = self.__server_info['rosbridge']['websocket'] + ws_url = '{}?token={}'.format(ws_url, storage_token) if storage_token else ws_url + # NOTE: roslibpy is buggy when creating more than one client, ie. calling + # roslibpy.Ros, a second time in the same virtual coach session (ie. from the same process) + # Therefore we are running the client in a separate process and channeling the incoming + # msgs to subscribed callbacks in this process through a multiprocessing.Pipe try: - ws_url = self.__server_info['rosbridge']['websocket'] - ws_url = '{}?token={}'.format(ws_url, storage_token) if storage_token else ws_url - # TODO: roslibpy is buggy when creating more than one client, ie. calling - # roslibpy.Ros a second time in the same virtual coach session (from the same process) - # will likely fail. - self.__ros_client = roslibpy.Ros(host=ws_url) - self.__ros_client.run() - - if self.__ros_client.is_connected: - status_topic = self.__config['ros']['status'] - self.__status_sub = roslibpy.Topic(self.__ros_client, status_topic, - 'std_msgs/String') - self.__status_sub.subscribe(self.__on_status) - - self.__error_sub = roslibpy.Topic(self.__ros_client, self.__config['ros']['error'], - 'cle_ros_msgs/CLEError') - self.__error_sub.subscribe(self.__on_error) - else: - self.__close_rosbridge() - self.__logger.info('Connection with rosbridge could not be established. ' - 'Some functionalities will be disabled') + self.__ros_client = RosBridgeHandlerProcess() + self.__ros_client.initialize(ws_url) + + self.__ros_client.subscribe_topic(self.__config['ros']['status'], 'std_msgs/String', + self.__on_status) + + self.__ros_client.subscribe_topic(self.__config['ros']['error'], + 'cle_ros_msgs/CLEError', self.__on_error) # pylint: disable=broad-except - except Exception as e: - self.__logger.info('Connection with rosbridge failed: %s.', str(e)) + except Exception: + self.__logger.info('Connection with rosbridge server could not be established. ' + 'Some functionalities will be disabled') # success, simulation is launched self.__logger.info('Ready.') @@ -225,20 +217,6 @@ class Simulation(object): Attempt to stop the simulation by transitioning to the "stopped" state. """ self.__set_state('stopped') - #self.__close_rosbridge() - - def __close_rosbridge(self): - """ - Terminates rosbridge connection - """ - if self.__status_sub: - self.__status_sub.unsubscribe() - - if self.__error_sub: - self.__error_sub.unsubscribe() - - if self.__ros_client: - self.__ros_client.close() def register_status_callback(self, callback): """ @@ -254,14 +232,9 @@ class Simulation(object): raise Exception( "Simulation has not been created, cannot set state!") - # make sure ROS is enabled (this is only a valid check after simulation is created) - if not self.__status_sub: - raise Exception( - "Unable to register status callback, rosbridge connection is not available.") - # make sure the callback is not registered already, warn the user if it is if callback in self.__status_callbacks: - self.__logger.warn( + self.__logger.warning( 'Attempting to register duplicate status callback, ignoring.') return @@ -349,7 +322,8 @@ class Simulation(object): # this simulation instance should not be reused if status['state'] in ['stopped', 'halted']: self.__status_callbacks[:] = [] - self.__close_rosbridge() + if self.__ros_client: + self.__ros_client.close() self.__logger.info('Simulation has been stopped.') def get_state(self): diff --git a/hbp_nrp_virtual_coach/pynrp/tests/test_simulation.py b/hbp_nrp_virtual_coach/pynrp/tests/test_simulation.py index 7d42ad8..3c284d7 100644 --- a/hbp_nrp_virtual_coach/pynrp/tests/test_simulation.py +++ b/hbp_nrp_virtual_coach/pynrp/tests/test_simulation.py @@ -99,7 +99,8 @@ class TestSimulation(unittest.TestCase): self._sim._Simulation__http_client.post.assert_called_once() mocked_traceback.print_exec.assert_called_once() - def test_create(self): + @patch('pynrp.rosbridge_handler.RosBridgeHandlerProcess.initialize') + def test_create(self, mocked_init): self.setUpForLaunch() # mock the call to set simulation state @@ -110,7 +111,8 @@ class TestSimulation(unittest.TestCase): # calling launch twice on an instance should fail after successful creation self.assertRaises(Exception, self._sim.launch, 'id', 'conf', 'server-name', 'reservation') - def test_create_cloned(self): + @patch('pynrp.rosbridge_handler.RosBridgeHandlerProcess.initialize') + def test_create_cloned(self, mocked_init): self.setUpForLaunch() # mock the call to set simulation state self._sim._Simulation__set_state = Mock() @@ -118,31 +120,6 @@ class TestSimulation(unittest.TestCase): self.assertEqual(self._sim.launch('id', 'conf', 'server-name', 'reservation'), True) self.assertEqual(self._sim._Simulation__http_client.post.call_count, 1) - - @patch('roslibpy.Ros') - def test_rosbridge_connection(self, mocked_ros): - # launch simulation - self.setUpForLaunch() - self._sim._Simulation__set_state = Mock() - self.assertEqual(self._sim.launch('id', 'conf', 'server-name', 'reservation'), True) - - # rosbridge connection is closed properly - rb_client = self._sim._Simulation__ros_client - self._sim._Simulation__close_rosbridge() - self.assertTrue(rb_client.terminate.called) - self.assertEqual(self._sim._Simulation__ros_client, None) - self.assertEqual(self._sim._Simulation__status_sub, None) - self.assertEqual(self._sim._Simulation__error_sub, None) - - # rosbridge connection is closed after simulation is closed - self._sim._Simulation__close_rosbridge = Mock() - self._sim._Simulation__on_status({"data": '{"state": "stopped"}'}) - self.assertEqual(self._sim._Simulation__close_rosbridge.call_count,1) - self._sim._Simulation__on_status({"data": '{"state": "halted"}'}) - self.assertEqual(self._sim._Simulation__close_rosbridge.call_count,2) - self._sim.stop() - self.assertEqual(self._sim._Simulation__close_rosbridge.call_count, 3) - def test_set_state_asserts(self): self.assertRaises(AssertionError, self._sim._Simulation__set_state, None) self.assertRaises(Exception, self._sim._Simulation__set_state, 'foo') @@ -174,8 +151,7 @@ class TestSimulation(unittest.TestCase): # mock the OIDC call self._sim._Simulation__set_state = Mock() - # mock the error subscriber and ros_client - self._sim._Simulation__error_sub = Mock() + # mock the ros_client self._sim._Simulation__ros_client = Mock() # start @@ -189,7 +165,6 @@ class TestSimulation(unittest.TestCase): # stop self._sim.stop() self._sim._Simulation__set_state.assert_called_with('stopped') - self.assertEqual(self._sim._Simulation__error_sub, None) def test_get_state(self): self._sim._Simulation__server = 'server' @@ -523,26 +498,22 @@ class TestSimulation(unittest.TestCase): # uninitialized simulation should throw an error self.assertRaises(Exception, self._sim.register_status_callback, mock_callback) self.assertEqual(self._sim._Simulation__logger.info.call_count, 0) - self.assertEqual(self._sim._Simulation__logger.warn.call_count, 0) + self.assertEqual(self._sim._Simulation__logger.warning.call_count, 0) - # initialized simulation, but no ROS + # initialized simulation self._sim._Simulation__server = 'server' self._sim._Simulation__sim_url = 'url' - self.assertRaises(Exception, self._sim.register_status_callback, mock_callback) - self.assertEqual(self._sim._Simulation__logger.info.call_count, 0) - self.assertEqual(self._sim._Simulation__logger.warn.call_count, 0) # successful registration - self._sim._Simulation__status_sub = 'subscriber' self._sim.register_status_callback(mock_callback) self.assertEqual(self._sim._Simulation__status_callbacks, [mock_callback]) self._sim._Simulation__logger.info.assert_called_once_with('Status callback registered.') - self.assertEqual(self._sim._Simulation__logger.warn.call_count, 0) + self.assertEqual(self._sim._Simulation__logger.warning.call_count, 0) # ignored duplicate registration self._sim.register_status_callback(mock_callback) self.assertEqual(self._sim._Simulation__status_callbacks, [mock_callback]) - self._sim._Simulation__logger.warn.assert_called_once_with('Attempting to register duplicate ' + self._sim._Simulation__logger.warning.assert_called_once_with('Attempting to register duplicate ' 'status callback, ignoring.') self.assertEqual(self._sim._Simulation__logger.info.call_count, 1) @@ -562,8 +533,6 @@ class TestSimulation(unittest.TestCase): # override the logger so we can check for messages self._sim._Simulation__ros_client = Mock() self._sim._Simulation__logger = Mock() - self._sim._Simulation__status_sub = Mock() - self._sim._Simulation__error_sub = Mock() self._sim._Simulation__previous_subtask = 'foo' # callback to ensure it is only called when appropriate @@ -578,26 +547,24 @@ class TestSimulation(unittest.TestCase): self._sim._Simulation__on_status(mock_msg({'progress': {'done': True}})) self._sim._Simulation__on_status(mock_msg({'progress': {'subtask': 'foo'}})) self.assertEqual(self._sim._Simulation__logger.info.call_count, 0) - self.assertEqual(self._sim._Simulation__status_sub.unregister.call_count, 0) self.assertEqual(mock_callback.call_count, 0) # progress message that should be logged self._sim._Simulation__on_status(mock_msg({'progress': {'task': 'foo', 'subtask': 'bar'}})) self._sim._Simulation__logger.info.assert_called_once_with('[%s] %s', 'foo', 'bar') - self.assertEqual(self._sim._Simulation__status_sub.unregister.call_count, 0) self.assertEqual(mock_callback.call_count, 0) # simulation status, callback should be invoked self._sim._Simulation__on_status(mock_msg({'state': 'paused'})) mock_callback.assert_called_once_with({'state': 'paused'}) - self.assertEqual(self._sim._Simulation__status_sub.unregister.call_count, 0) self.assertEqual(self._sim._Simulation__logger.info.call_count, 1) # from above # simulation status for end of the simulation self._sim._Simulation__on_status(mock_msg({'state': 'stopped'})) mock_callback.assert_called_with({'state': 'stopped'}) - self.assertEqual(self._sim._Simulation__status_sub, None) - self.assertEqual(self._sim._Simulation__error_sub, None) + self.assertEqual(self._sim._Simulation__ros_client.close.call_count, 1) + self._sim._Simulation__on_status({"data": '{"state": "halted"}'}) + self.assertEqual(self._sim._Simulation__ros_client.close.call_count, 2) self.assertEqual(self._sim._Simulation__status_callbacks, []) self._sim._Simulation__logger.info.assert_called_with('Simulation has been stopped.') diff --git a/hbp_nrp_virtual_coach/pynrp/tests/test_virtual_coach.py b/hbp_nrp_virtual_coach/pynrp/tests/test_virtual_coach.py index de4aecf..067d5b8 100644 --- a/hbp_nrp_virtual_coach/pynrp/tests/test_virtual_coach.py +++ b/hbp_nrp_virtual_coach/pynrp/tests/test_virtual_coach.py @@ -413,7 +413,7 @@ mock-server-5 @patch('pynrp.virtual_coach.logger.info') def test_clone_experiment_to_storage(self, mock_logger, mock_list): mock_list.return_value = self._mock_exp_list - self._vc._VirtualCoach__http_client = Mock(post=Mock(return_value=[200, 'experiment_id'])) + self._vc._VirtualCoach__http_client = Mock(post=Mock(return_value=[200, b'experiment_id'])) self._vc.clone_experiment_to_storage('MockExperiment1') self.assertEqual(mock_logger.call_count, 1) -- GitLab