Skip to content
Snippets Groups Projects
Commit 15497230 authored by Evan Eames's avatar Evan Eames Committed by Sandro Weber
Browse files

Merged in NRRPLT-8384-mqtt (pull request #35)

Added unsubscribe

Approved-by: Sandro Weber
parents 26d11f8f ef9ec74c
No related branches found
No related tags found
No related merge requests found
...@@ -25,7 +25,9 @@ export default class NrpCoreDashboard extends React.Component { ...@@ -25,7 +25,9 @@ export default class NrpCoreDashboard extends React.Component {
let token2 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1))); let token2 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1)));
let token3 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1))); let token3 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1)));
let token4 = MqttClientService.instance.subscribeToTopic('test_topic_proto', (param1) => (console.info(param1))); let token4 = MqttClientService.instance.subscribeToTopic('test_topic_proto', (param1) => (console.info(param1)));
//TODO: test unsubscribe once implemented
// Test unsubscribe
MqttClientService.instance.unsubscribe(token3);
} }
render() { render() {
......
/**
* @jest-environment jsdom
*/
import '@testing-library/jest-dom';
import MqttClientService from '../mqtt-client-service';
let subscribeTopicAndValidate = (topic, callback) => {
let token = MqttClientService.instance.subscribeToTopic(topic, callback);
expect(token).toBeDefined();
expect(token.topic).toBe(topic);
expect(token.callback).toBe(callback);
expect(MqttClientService.instance.subTokensMap.get(topic).includes(token)).toBeTruthy();
return token;
};
let unsubscribeAndValidate = (token) => {
MqttClientService.instance.unsubscribe(token);
expect(MqttClientService.instance.subTokensMap.get(token.topic).includes(token)).toBeFalsy();
};
test('sub/unsub', async () => {
let topicA = 'topic/A';
let topicB = 'topic/B';
let sub1Callback = jest.fn();
let sub1Token = subscribeTopicAndValidate(topicA, sub1Callback);
let sub2Callback = jest.fn();
let sub2Token = subscribeTopicAndValidate(topicA, sub2Callback);
let sub3Callback = jest.fn();
let sub3Token = subscribeTopicAndValidate(topicB, sub3Callback);
expect(MqttClientService.instance.subTokensMap.get(topicA).length).toBe(2);
expect(MqttClientService.instance.subTokensMap.get(topicB).length).toBe(1);
MqttClientService.instance.onMessage(topicA, {});
MqttClientService.instance.onMessage(topicB, {});
expect(sub1Token.callback).toHaveBeenCalledTimes(1);
expect(sub2Token.callback).toHaveBeenCalledTimes(1);
expect(sub3Token.callback).toHaveBeenCalledTimes(1);
unsubscribeAndValidate(sub1Token);
expect(MqttClientService.instance.subTokensMap.get(topicA).length).toBe(1);
expect(MqttClientService.instance.subTokensMap.get(topicB).length).toBe(1);
MqttClientService.instance.onMessage(topicA, {});
MqttClientService.instance.onMessage(topicB, {});
expect(sub1Token.callback).toHaveBeenCalledTimes(1);
expect(sub2Token.callback).toHaveBeenCalledTimes(2);
expect(sub3Token.callback).toHaveBeenCalledTimes(2);
unsubscribeAndValidate(sub2Token);
expect(MqttClientService.instance.subTokensMap.get(topicA).length).toBe(0);
expect(MqttClientService.instance.subTokensMap.get(topicB).length).toBe(1);
MqttClientService.instance.onMessage(topicA, {});
MqttClientService.instance.onMessage(topicB, {});
expect(sub1Token.callback).toHaveBeenCalledTimes(1);
expect(sub2Token.callback).toHaveBeenCalledTimes(2);
expect(sub3Token.callback).toHaveBeenCalledTimes(3);
unsubscribeAndValidate(sub3Token);
expect(MqttClientService.instance.subTokensMap.get(topicA).length).toBe(0);
expect(MqttClientService.instance.subTokensMap.get(topicB).length).toBe(0);
MqttClientService.instance.onMessage(topicA, {});
MqttClientService.instance.onMessage(topicB, {});
expect(sub1Token.callback).toHaveBeenCalledTimes(1);
expect(sub2Token.callback).toHaveBeenCalledTimes(2);
expect(sub3Token.callback).toHaveBeenCalledTimes(3);
});
...@@ -47,15 +47,18 @@ export default class MqttClientService extends EventEmitter { ...@@ -47,15 +47,18 @@ export default class MqttClientService extends EventEmitter {
} }
onMessage(topic, payload, packet) { onMessage(topic, payload, packet) {
if (typeof payload === 'undefined') {
return;
}
//console.info('MQTT message: [topic, payload, packet]'); //console.info('MQTT message: [topic, payload, packet]');
//console.info([topic, payload, packet]); //console.info([topic, payload, packet]);
//Now we see which callbacks have been assigned for a topic //Now we see which callbacks have been assigned for a topic
if (typeof this.subTokensMap.get(topic) !== 'undefined') { let subTokens = this.subTokensMap.get(topic);
for (var token in this.subTokensMap.get(topic)){ if (typeof subTokens !== 'undefined') {
if (typeof token.callback === 'function' && payload !== 'undefined') { for (var token of subTokens) {
//Deserializatin of Data must happen here //Deserializatin of Data must happen here
token.callback(payload); token.callback(payload);
}
}; };
}; };
...@@ -95,11 +98,28 @@ export default class MqttClientService extends EventEmitter { ...@@ -95,11 +98,28 @@ export default class MqttClientService extends EventEmitter {
[token] [token]
); );
} }
console.info('You have been subscribed to topic ' + topic); //console.info('You have been subscribed to topic ' + topic);
console.info(this.subTokensMap); //console.info(this.subTokensMap);
return token; return token;
} }
unsubscribe(unsubToken) {
if (this.subTokensMap.has(unsubToken.topic)){
let tokens = this.subTokensMap.get(unsubToken.topic);
let index = tokens.indexOf(unsubToken);
if (index !== -1) {
tokens.splice(index, 1);
//console.info('You have been unsubscribed from topic ' + unsubToken.topic);
}
else {
console.warn('Your provided token could not be found in the subscription list');
}
}
else{
console.warn('The topic ' + unsubToken.topic + ' was not found');
}
}
static getProtoOneofData(protoMsg, oneofCaseNumber) { static getProtoOneofData(protoMsg, oneofCaseNumber) {
return jspb.Message.getField(protoMsg, oneofCaseNumber); return jspb.Message.getField(protoMsg, oneofCaseNumber);
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment