-
Sandro Weber authoredb393f8af
mqtt-client-service.js 3.20 KiB
import mqtt from 'mqtt';
import { EventEmitter } from 'events';
//import * as proto from 'nrp-jsproto/nrp-engine_msgs-protobuf.js';
//import { DataPackMessage } from 'nrp-jsproto/nrp-engine_msgs-protobuf.js';
import jspb from '../../node_modules/google-protobuf/google-protobuf';
let _instance = null;
const SINGLETON_ENFORCER = Symbol();
/**
* Service handling state and info of running simulations.
*/
export default class MqttClientService extends EventEmitter {
constructor(enforcer) {
super();
if (enforcer !== SINGLETON_ENFORCER) {
throw new Error('Use ' + this.constructor.name + '.instance');
}
this.subTokensMap = new Map();
}
static get instance() {
if (_instance == null) {
_instance = new MqttClientService(SINGLETON_ENFORCER);
}
return _instance;
}
connect(brokerUrl) {
console.info('MQTT connecting to ' + brokerUrl + ' ...');
this.client = mqtt.connect(brokerUrl);
this.client.on('connect', () => {
console.info('... MQTT connected');
console.info(this.client);
this.emit(MqttClientService.EVENTS.CONNECTED, this.client);
});
this.client.on('error', this.onError);
this.client.on('message', (params) => {
this.onMessage(params);
});
}
onError(error) {
console.error(error);
}
onMessage(topic, payload, packet) {
//console.info('MQTT message: [topic, payload, packet]');
//console.info([topic, payload, packet]);
//Now we see which callbacks have been assigned for a topic
if (typeof this.subTokensMap.get(topic) !== 'undefined') {
for (var token in this.subTokensMap.get(topic)){
if (typeof token.callback === 'function' && payload !== 'undefined') {
//Deserializatin of Data must happen here
token.callback(payload);
}
};
};
/*try {
if (topic.endsWith('/type')) {
let msg = String(payload);
console.info('"' + topic + '" message format = ' + msg);
}
else {
let msg = DataPackMessage.deserializeBinary(payload);
console.info('DataPackMessage');
console.info(msg);
}
}
catch (error) {
console.error(error);
}*/
}
//callback should have args topic, payload
subscribeToTopic(topic, callback) {
if (typeof callback !== 'function') {
console.error('trying to subscribe to topic "' + topic + '", but no callback function given!');
return;
}
const token = {
topic: topic,
callback: callback
};
if (this.subTokensMap.has(token.topic)){
this.subTokensMap.get(token.topic).push(token);
}
else{
this.subTokensMap.set(
token.topic,
[token]
);
}
console.info('You have been subscribed to topic ' + topic);
console.info(this.subTokensMap);
return token;
}
static getProtoOneofData(protoMsg, oneofCaseNumber) {
return jspb.Message.getField(protoMsg, oneofCaseNumber);
}
/*static getDataPackMessageOneofCaseString(protoMsg) {
for (let dataCase in DataPackMessage.DataCase) {
if (DataPackMessage.DataCase[dataCase] === protoMsg.getDataCase()) {
return dataCase;
}
}
}*/
}
MqttClientService.EVENTS = Object.freeze({
CONNECTED: 'CONNECTED'
});