Skip to content
Snippets Groups Projects
Commit cc5a4c4c authored by ManosAngelidis's avatar ManosAngelidis
Browse files

[NRRPLT-8384-mqtt Updated onMessage function]

parent efe4bf32
No related branches found
No related tags found
No related merge requests found
import React from 'react'; import React from 'react';
import MqttClientService from '../../services/mqtt-client-service'; import MqttClientService from '../../services/mqtt-client-service';
//import subscribeToTopic from MqttClientService;
export default class NrpCoreDashboard extends React.Component { export default class NrpCoreDashboard extends React.Component {
constructor(props) { constructor(props) {
...@@ -14,13 +15,15 @@ export default class NrpCoreDashboard extends React.Component { ...@@ -14,13 +15,15 @@ export default class NrpCoreDashboard extends React.Component {
MqttClientService.instance.connect(this.mqttBrokerUrl); MqttClientService.instance.connect(this.mqttBrokerUrl);
} }
onMqttClientConnected(mqttClient) { onMqttClientConnected(MqttClient) {
mqttClient.subscribeToTopic('test_topic'); MqttClient.subscribe('#', (err) => {
//mqttClient.subscribe('#', (err) => { if (err) {
// if (err) { console.error(err);
// console.error(err); }
// } });
//}; // As a test to make sure MqttClientService can subscribe to multiple topics at once we use these two for testing
let token = MqttClientService.instance.subscribeToTopic('test_topic', (param1) => (console.info(param1)));
token = MqttClientService.instance.subscribeToTopic('test_topic_proto', (param1) => (console.info(param1)));
} }
render() { render() {
......
...@@ -2,9 +2,9 @@ import mqtt from 'mqtt'; ...@@ -2,9 +2,9 @@ import mqtt from 'mqtt';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
//import * as proto from 'nrp-jsproto/nrp-engine_msgs-protobuf.js'; //import * as proto from 'nrp-jsproto/nrp-engine_msgs-protobuf.js';
import { DataPackMessage } from '../../node_modules/nrp-jsproto/engine_grpc_pb.js'; //import { DataPackMessage } from 'nrp-jsproto/nrp-engine_msgs-protobuf.js';
import jspb from 'google-protobuf'; import jspb from '../../node_modules/google-protobuf/google-protobuf';
//import { hasSubscribers } from 'diagnostics_channel'; import { hasSubscribers } from 'diagnostics_channel';
let _instance = null; let _instance = null;
const SINGLETON_ENFORCER = Symbol(); const SINGLETON_ENFORCER = Symbol();
...@@ -21,7 +21,7 @@ export default class MqttClientService extends EventEmitter { ...@@ -21,7 +21,7 @@ export default class MqttClientService extends EventEmitter {
this.subTokensMap = new Map(); this.subTokensMap = new Map();
console.info(DataPackMessage); //console.info(DataPackMessage);
} }
static get instance() { static get instance() {
...@@ -41,7 +41,9 @@ export default class MqttClientService extends EventEmitter { ...@@ -41,7 +41,9 @@ export default class MqttClientService extends EventEmitter {
this.emit(MqttClientService.EVENTS.CONNECTED, this.client); this.emit(MqttClientService.EVENTS.CONNECTED, this.client);
}); });
this.client.on('error', this.onError); this.client.on('error', this.onError);
this.client.on('message', this.onMessage); this.client.on('message', (params) => {
this.onMessage(params);
});
} }
onError(error) { onError(error) {
...@@ -49,10 +51,19 @@ export default class MqttClientService extends EventEmitter { ...@@ -49,10 +51,19 @@ export default class MqttClientService extends EventEmitter {
} }
onMessage(topic, payload, packet) { onMessage(topic, payload, packet) {
console.info('MQTT message: [topic, payload, packet]'); //console.info('MQTT message: [topic, payload, packet]');
console.info([topic, payload, packet]); //console.info([topic, payload, packet]);
//Deserializatin of Data must happen here
//Now we see which callbacks have been assigned for a topic
if (typeof this.subTokensMap.get(topic) !== 'undefined') {
for (var token in this.subTokensMap.get(topic)){
if (typeof token.callback === 'function' && payload !== 'undefined'){
token.callback(payload);
}
};
};
try { /*try {
if (topic.endsWith('/type')) { if (topic.endsWith('/type')) {
let msg = String(payload); let msg = String(payload);
console.info('"' + topic + '" message format = ' + msg); console.info('"' + topic + '" message format = ' + msg);
...@@ -65,9 +76,10 @@ export default class MqttClientService extends EventEmitter { ...@@ -65,9 +76,10 @@ export default class MqttClientService extends EventEmitter {
} }
catch (error) { catch (error) {
console.error(error); console.error(error);
} }*/
} }
//callback should have args topic, payload
subscribeToTopic(topic, callback=Function()){ subscribeToTopic(topic, callback=Function()){
const token = { const token = {
topic: topic, topic: topic,
...@@ -86,6 +98,7 @@ export default class MqttClientService extends EventEmitter { ...@@ -86,6 +98,7 @@ export default class MqttClientService extends EventEmitter {
); );
} }
console.info('You have been subscribed to topic ' + topic); console.info('You have been subscribed to topic ' + topic);
console.info(this.subTokensMap);
return token; return token;
} }
...@@ -93,13 +106,13 @@ export default class MqttClientService extends EventEmitter { ...@@ -93,13 +106,13 @@ export default class MqttClientService extends EventEmitter {
return jspb.Message.getField(protoMsg, oneofCaseNumber); return jspb.Message.getField(protoMsg, oneofCaseNumber);
} }
static getDataPackMessageOneofCaseString(protoMsg) { /*static getDataPackMessageOneofCaseString(protoMsg) {
for (let dataCase in DataPackMessage.DataCase) { for (let dataCase in DataPackMessage.DataCase) {
if (DataPackMessage.DataCase[dataCase] === protoMsg.getDataCase()) { if (DataPackMessage.DataCase[dataCase] === protoMsg.getDataCase()) {
return dataCase; return dataCase;
} }
} }
} }*/
} }
MqttClientService.EVENTS = Object.freeze({ MqttClientService.EVENTS = Object.freeze({
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment