diff --git a/src/components/nrp-core-dashboard/nrp-core-dashboard.js b/src/components/nrp-core-dashboard/nrp-core-dashboard.js index ee0b7adb3ded787acf89a330c583f9dc723552a0..488ce2d0b790358986c16d82b7df6c85de5c299e 100644 --- a/src/components/nrp-core-dashboard/nrp-core-dashboard.js +++ b/src/components/nrp-core-dashboard/nrp-core-dashboard.js @@ -14,12 +14,18 @@ export default class NrpCoreDashboard extends React.Component { MqttClientService.instance.connect(this.mqttBrokerUrl); } - onMqttClientConnected(mqttClient) { - mqttClient.subscribe('#', (err) => { + onMqttClientConnected(MqttClient) { + MqttClient.subscribe('#', (err) => { if (err) { console.error(err); } }); + // As a test to make sure MqttClientService can subscribe to multiple topics (and the same topic) at once + let token1 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1))); + let token2 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1))); + let token3 = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1))); + let token4 = MqttClientService.instance.subscribeToTopic('test_topic_proto', (param1) => (console.info(param1))); + //TODO: test unsubscribe once implemented } render() { diff --git a/src/services/mqtt-client-service.js b/src/services/mqtt-client-service.js index fe899e0659d905fc6c70404754ef8c6f1529494f..d81b21a1c2641d24eb8fce9a4588ecef8e82cdc5 100644 --- a/src/services/mqtt-client-service.js +++ b/src/services/mqtt-client-service.js @@ -1,7 +1,8 @@ import mqtt from 'mqtt'; import { EventEmitter } from 'events'; -//import DataPackMessage from 'nrp-jsproto/engine_grpc_pb'; +//import { DataPackMessage } from 'nrp-jsproto/engine_grpc_pb'; +import jspb from '../../node_modules/google-protobuf/google-protobuf'; let _instance = null; const SINGLETON_ENFORCER = Symbol(); @@ -16,7 +17,7 @@ export default class MqttClientService extends EventEmitter { throw new Error('Use ' + this.constructor.name + '.instance'); } - //console.info(['DataPackMessage', DataPackMessage]); + this.subTokensMap = new Map(); } static get instance() { @@ -36,7 +37,9 @@ export default class MqttClientService extends EventEmitter { this.emit(MqttClientService.EVENTS.CONNECTED, this.client); }); this.client.on('error', this.onError); - this.client.on('message', this.onMessage); + this.client.on('message', (params) => { + this.onMessage(params); + }); } onError(error) { @@ -44,8 +47,17 @@ export default class MqttClientService extends EventEmitter { } onMessage(topic, payload, packet) { - console.info('MQTT message: [topic, payload, packet]'); - console.info([topic, payload, packet]); + //console.info('MQTT message: [topic, payload, packet]'); + //console.info([topic, payload, packet]); + //Now we see which callbacks have been assigned for a topic + if (typeof this.subTokensMap.get(topic) !== 'undefined') { + for (var token in this.subTokensMap.get(topic)){ + if (typeof token.callback === 'function' && payload !== 'undefined') { + //Deserializatin of Data must happen here + token.callback(payload); + } + }; + }; /*try { if (topic.endsWith('/type')) { @@ -62,6 +74,43 @@ export default class MqttClientService extends EventEmitter { console.error(error); }*/ } + + //callback should have args topic, payload + subscribeToTopic(topic, callback) { + if (typeof callback !== 'function') { + console.error('trying to subscribe to topic "' + topic + '", but no callback function given!'); + return; + } + + const token = { + topic: topic, + callback: callback + }; + if (this.subTokensMap.has(token.topic)){ + this.subTokensMap.get(token.topic).push(token); + } + else{ + this.subTokensMap.set( + token.topic, + [token] + ); + } + console.info('You have been subscribed to topic ' + topic); + console.info(this.subTokensMap); + return token; + } + + static getProtoOneofData(protoMsg, oneofCaseNumber) { + return jspb.Message.getField(protoMsg, oneofCaseNumber); + } + + /*static getDataPackMessageOneofCaseString(protoMsg) { + for (let dataCase in DataPackMessage.DataCase) { + if (DataPackMessage.DataCase[dataCase] === protoMsg.getDataCase()) { + return dataCase; + } + } + }*/ } MqttClientService.EVENTS = Object.freeze({