diff --git a/src/components/nrp-core-dashboard/nrp-core-dashboard.js b/src/components/nrp-core-dashboard/nrp-core-dashboard.js index 90bba0c03e810bf1f61b8d14c0cf2e5e1a0e8792..6a524bf503e550948f361b14cc9ef3f3f7894332 100644 --- a/src/components/nrp-core-dashboard/nrp-core-dashboard.js +++ b/src/components/nrp-core-dashboard/nrp-core-dashboard.js @@ -1,6 +1,7 @@ import React from 'react'; import MqttClientService from '../../services/mqtt-client-service'; +//import subscribeToTopic from MqttClientService; export default class NrpCoreDashboard extends React.Component { constructor(props) { @@ -14,13 +15,15 @@ export default class NrpCoreDashboard extends React.Component { MqttClientService.instance.connect(this.mqttBrokerUrl); } - onMqttClientConnected(mqttClient) { - mqttClient.subscribeToTopic('test_topic'); - //mqttClient.subscribe('#', (err) => { - // if (err) { - // console.error(err); - // } - //}; + onMqttClientConnected(MqttClient) { + MqttClient.subscribe('#', (err) => { + if (err) { + console.error(err); + } + }); + // As a test to make sure MqttClientService can subscribe to multiple topics at once we use these two for testing + let token = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1))); + token = MqttClientService.instance.subscribeToTopic('test_topic_proto', (param1) => (console.info(param1))); } render() { diff --git a/src/services/mqtt-client-service.js b/src/services/mqtt-client-service.js index 15fc09c9be9fd481b1a201f49f1f5d5ca1d47dc7..5b5de09c559dd41315a73436ddeada5db6e4a617 100644 --- a/src/services/mqtt-client-service.js +++ b/src/services/mqtt-client-service.js @@ -2,9 +2,9 @@ import mqtt from 'mqtt'; import { EventEmitter } from 'events'; //import * as proto from 'nrp-jsproto/nrp-engine_msgs-protobuf.js'; -import { DataPackMessage } from '../../node_modules/nrp-jsproto/engine_grpc_pb.js'; -import jspb from 'google-protobuf'; -//import { hasSubscribers } from 'diagnostics_channel'; +//import { DataPackMessage } from 'nrp-jsproto/nrp-engine_msgs-protobuf.js'; +import jspb from '../../node_modules/google-protobuf/google-protobuf'; +import { hasSubscribers } from 'diagnostics_channel'; let _instance = null; const SINGLETON_ENFORCER = Symbol(); @@ -21,7 +21,7 @@ export default class MqttClientService extends EventEmitter { this.subTokensMap = new Map(); - console.info(DataPackMessage); + //console.info(DataPackMessage); } static get instance() { @@ -41,7 +41,9 @@ export default class MqttClientService extends EventEmitter { this.emit(MqttClientService.EVENTS.CONNECTED, this.client); }); this.client.on('error', this.onError); - this.client.on('message', this.onMessage); + this.client.on('message', (params) => { + this.onMessage(params); + }); } onError(error) { @@ -49,8 +51,17 @@ export default class MqttClientService extends EventEmitter { } onMessage(topic, payload, packet) { - console.info('MQTT message: [topic, payload, packet]'); - console.info([topic, payload, packet]); + //console.info('MQTT message: [topic, payload, packet]'); + //console.info([topic, payload, packet]); + //Deserializatin of Data must happen here + //Now we see which callbacks have been assigned for a topic + if (typeof this.subTokensMap.get(topic) !== 'undefined') { + for (var token in this.subTokensMap.get(topic)){ + if (typeof token.callback === 'function' && payload !== 'undefined'){ + token.callback(payload); + } + }; + }; /*try { if (topic.endsWith('/type')) { @@ -68,6 +79,7 @@ export default class MqttClientService extends EventEmitter { }*/ } + //callback should have args topic, payload subscribeToTopic(topic, callback=Function()){ const token = { topic: topic, @@ -86,6 +98,7 @@ export default class MqttClientService extends EventEmitter { ); } console.info('You have been subscribed to topic ' + topic); + console.info(this.subTokensMap); return token; } @@ -93,13 +106,13 @@ export default class MqttClientService extends EventEmitter { return jspb.Message.getField(protoMsg, oneofCaseNumber); } - static getDataPackMessageOneofCaseString(protoMsg) { + /*static getDataPackMessageOneofCaseString(protoMsg) { for (let dataCase in DataPackMessage.DataCase) { if (DataPackMessage.DataCase[dataCase] === protoMsg.getDataCase()) { return dataCase; } } - } + }*/ } MqttClientService.EVENTS = Object.freeze({