diff --git a/src/components/nrp-core-dashboard/nrp-core-dashboard.js b/src/components/nrp-core-dashboard/nrp-core-dashboard.js index 488ce2d0b790358986c16d82b7df6c85de5c299e..29b2a058cd12fdbed2314c573cb74349390d02ff 100644 --- a/src/components/nrp-core-dashboard/nrp-core-dashboard.js +++ b/src/components/nrp-core-dashboard/nrp-core-dashboard.js @@ -25,7 +25,9 @@ export default class NrpCoreDashboard extends React.Component { let token2 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1))); let token3 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1))); let token4 = MqttClientService.instance.subscribeToTopic('test_topic_proto', (param1) => (console.info(param1))); - //TODO: test unsubscribe once implemented + + // Test unsubscribe + MqttClientService.instance.unsubscribe(token3); } render() { diff --git a/src/services/__tests__/mqtt-client-service.test.js b/src/services/__tests__/mqtt-client-service.test.js new file mode 100644 index 0000000000000000000000000000000000000000..3f384cdd630949251c30d98e6a799b51229e2516 --- /dev/null +++ b/src/services/__tests__/mqtt-client-service.test.js @@ -0,0 +1,72 @@ +/** + * @jest-environment jsdom +*/ +import '@testing-library/jest-dom'; + +import MqttClientService from '../mqtt-client-service'; + +let subscribeTopicAndValidate = (topic, callback) => { + let token = MqttClientService.instance.subscribeToTopic(topic, callback); + expect(token).toBeDefined(); + expect(token.topic).toBe(topic); + expect(token.callback).toBe(callback); + expect(MqttClientService.instance.subTokensMap.get(topic).includes(token)).toBeTruthy(); + + return token; +}; + +let unsubscribeAndValidate = (token) => { + MqttClientService.instance.unsubscribe(token); + expect(MqttClientService.instance.subTokensMap.get(token.topic).includes(token)).toBeFalsy(); +}; + +test('sub/unsub', async () => { + let topicA = 'topic/A'; + let topicB = 'topic/B'; + + let sub1Callback = jest.fn(); + let sub1Token = subscribeTopicAndValidate(topicA, sub1Callback); + let sub2Callback = jest.fn(); + let sub2Token = subscribeTopicAndValidate(topicA, sub2Callback); + let sub3Callback = jest.fn(); + let sub3Token = subscribeTopicAndValidate(topicB, sub3Callback); + + expect(MqttClientService.instance.subTokensMap.get(topicA).length).toBe(2); + expect(MqttClientService.instance.subTokensMap.get(topicB).length).toBe(1); + + MqttClientService.instance.onMessage(topicA, {}); + MqttClientService.instance.onMessage(topicB, {}); + expect(sub1Token.callback).toHaveBeenCalledTimes(1); + expect(sub2Token.callback).toHaveBeenCalledTimes(1); + expect(sub3Token.callback).toHaveBeenCalledTimes(1); + + unsubscribeAndValidate(sub1Token); + expect(MqttClientService.instance.subTokensMap.get(topicA).length).toBe(1); + expect(MqttClientService.instance.subTokensMap.get(topicB).length).toBe(1); + + MqttClientService.instance.onMessage(topicA, {}); + MqttClientService.instance.onMessage(topicB, {}); + expect(sub1Token.callback).toHaveBeenCalledTimes(1); + expect(sub2Token.callback).toHaveBeenCalledTimes(2); + expect(sub3Token.callback).toHaveBeenCalledTimes(2); + + unsubscribeAndValidate(sub2Token); + expect(MqttClientService.instance.subTokensMap.get(topicA).length).toBe(0); + expect(MqttClientService.instance.subTokensMap.get(topicB).length).toBe(1); + + MqttClientService.instance.onMessage(topicA, {}); + MqttClientService.instance.onMessage(topicB, {}); + expect(sub1Token.callback).toHaveBeenCalledTimes(1); + expect(sub2Token.callback).toHaveBeenCalledTimes(2); + expect(sub3Token.callback).toHaveBeenCalledTimes(3); + + unsubscribeAndValidate(sub3Token); + expect(MqttClientService.instance.subTokensMap.get(topicA).length).toBe(0); + expect(MqttClientService.instance.subTokensMap.get(topicB).length).toBe(0); + + MqttClientService.instance.onMessage(topicA, {}); + MqttClientService.instance.onMessage(topicB, {}); + expect(sub1Token.callback).toHaveBeenCalledTimes(1); + expect(sub2Token.callback).toHaveBeenCalledTimes(2); + expect(sub3Token.callback).toHaveBeenCalledTimes(3); +}); diff --git a/src/services/mqtt-client-service.js b/src/services/mqtt-client-service.js index d81b21a1c2641d24eb8fce9a4588ecef8e82cdc5..06b2193c3180dc803e3180d2c36bb18865db867c 100644 --- a/src/services/mqtt-client-service.js +++ b/src/services/mqtt-client-service.js @@ -47,15 +47,18 @@ export default class MqttClientService extends EventEmitter { } onMessage(topic, payload, packet) { + if (typeof payload === 'undefined') { + return; + } + //console.info('MQTT message: [topic, payload, packet]'); //console.info([topic, payload, packet]); //Now we see which callbacks have been assigned for a topic - if (typeof this.subTokensMap.get(topic) !== 'undefined') { - for (var token in this.subTokensMap.get(topic)){ - if (typeof token.callback === 'function' && payload !== 'undefined') { - //Deserializatin of Data must happen here - token.callback(payload); - } + let subTokens = this.subTokensMap.get(topic); + if (typeof subTokens !== 'undefined') { + for (var token of subTokens) { + //Deserializatin of Data must happen here + token.callback(payload); }; }; @@ -95,11 +98,28 @@ export default class MqttClientService extends EventEmitter { [token] ); } - console.info('You have been subscribed to topic ' + topic); - console.info(this.subTokensMap); + //console.info('You have been subscribed to topic ' + topic); + //console.info(this.subTokensMap); return token; } + unsubscribe(unsubToken) { + if (this.subTokensMap.has(unsubToken.topic)){ + let tokens = this.subTokensMap.get(unsubToken.topic); + let index = tokens.indexOf(unsubToken); + if (index !== -1) { + tokens.splice(index, 1); + //console.info('You have been unsubscribed from topic ' + unsubToken.topic); + } + else { + console.warn('Your provided token could not be found in the subscription list'); + } + } + else{ + console.warn('The topic ' + unsubToken.topic + ' was not found'); + } + } + static getProtoOneofData(protoMsg, oneofCaseNumber) { return jspb.Message.getField(protoMsg, oneofCaseNumber); }