Skip to content
Snippets Groups Projects
Commit 5f1d6112 authored by Evan Eames's avatar Evan Eames Committed by Sandro Weber
Browse files

Merged in NRRPLT-8384-mqtt (pull request #34)

NRRPLT-8384 mqtt  subscribeToTopic and onMessage

Approved-by: Sandro Weber
parents 8de08962 a17229af
No related branches found
No related tags found
No related merge requests found
......@@ -14,12 +14,18 @@ export default class NrpCoreDashboard extends React.Component {
MqttClientService.instance.connect(this.mqttBrokerUrl);
}
onMqttClientConnected(mqttClient) {
mqttClient.subscribe('#', (err) => {
onMqttClientConnected(MqttClient) {
MqttClient.subscribe('#', (err) => {
if (err) {
console.error(err);
}
});
// As a test to make sure MqttClientService can subscribe to multiple topics (and the same topic) at once
let token1 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1)));
let token2 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1)));
let token3 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1)));
let token4 = MqttClientService.instance.subscribeToTopic('test_topic_proto', (param1) => (console.info(param1)));
//TODO: test unsubscribe once implemented
}
render() {
......
import mqtt from 'mqtt';
import { EventEmitter } from 'events';
//import DataPackMessage from 'nrp-jsproto/engine_grpc_pb';
//import { DataPackMessage } from 'nrp-jsproto/engine_grpc_pb';
import jspb from '../../node_modules/google-protobuf/google-protobuf';
let _instance = null;
const SINGLETON_ENFORCER = Symbol();
......@@ -16,7 +17,7 @@ export default class MqttClientService extends EventEmitter {
throw new Error('Use ' + this.constructor.name + '.instance');
}
//console.info(['DataPackMessage', DataPackMessage]);
this.subTokensMap = new Map();
}
static get instance() {
......@@ -36,7 +37,9 @@ export default class MqttClientService extends EventEmitter {
this.emit(MqttClientService.EVENTS.CONNECTED, this.client);
});
this.client.on('error', this.onError);
this.client.on('message', this.onMessage);
this.client.on('message', (params) => {
this.onMessage(params);
});
}
onError(error) {
......@@ -44,8 +47,17 @@ export default class MqttClientService extends EventEmitter {
}
onMessage(topic, payload, packet) {
console.info('MQTT message: [topic, payload, packet]');
console.info([topic, payload, packet]);
//console.info('MQTT message: [topic, payload, packet]');
//console.info([topic, payload, packet]);
//Now we see which callbacks have been assigned for a topic
if (typeof this.subTokensMap.get(topic) !== 'undefined') {
for (var token in this.subTokensMap.get(topic)){
if (typeof token.callback === 'function' && payload !== 'undefined') {
//Deserializatin of Data must happen here
token.callback(payload);
}
};
};
/*try {
if (topic.endsWith('/type')) {
......@@ -62,6 +74,43 @@ export default class MqttClientService extends EventEmitter {
console.error(error);
}*/
}
//callback should have args topic, payload
subscribeToTopic(topic, callback) {
if (typeof callback !== 'function') {
console.error('trying to subscribe to topic "' + topic + '", but no callback function given!');
return;
}
const token = {
topic: topic,
callback: callback
};
if (this.subTokensMap.has(token.topic)){
this.subTokensMap.get(token.topic).push(token);
}
else{
this.subTokensMap.set(
token.topic,
[token]
);
}
console.info('You have been subscribed to topic ' + topic);
console.info(this.subTokensMap);
return token;
}
static getProtoOneofData(protoMsg, oneofCaseNumber) {
return jspb.Message.getField(protoMsg, oneofCaseNumber);
}
/*static getDataPackMessageOneofCaseString(protoMsg) {
for (let dataCase in DataPackMessage.DataCase) {
if (DataPackMessage.DataCase[dataCase] === protoMsg.getDataCase()) {
return dataCase;
}
}
}*/
}
MqttClientService.EVENTS = Object.freeze({
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment