Skip to content
Snippets Groups Projects
mqtt-client-service.js 3.20 KiB
import mqtt from 'mqtt';
import { EventEmitter } from 'events';

//import * as proto from 'nrp-jsproto/nrp-engine_msgs-protobuf.js';
//import { DataPackMessage } from 'nrp-jsproto/nrp-engine_msgs-protobuf.js';
import jspb from '../../node_modules/google-protobuf/google-protobuf';

let _instance = null;
const SINGLETON_ENFORCER = Symbol();

/**
 * Service handling state and info of running simulations.
 */
export default class MqttClientService extends EventEmitter {
  constructor(enforcer) {
    super();
    if (enforcer !== SINGLETON_ENFORCER) {
      throw new Error('Use ' + this.constructor.name + '.instance');
    }

    this.subTokensMap = new Map();
  }

  static get instance() {
    if (_instance == null) {
      _instance = new MqttClientService(SINGLETON_ENFORCER);
    }

    return _instance;
  }

  connect(brokerUrl) {
    console.info('MQTT connecting to ' + brokerUrl + ' ...');
    this.client = mqtt.connect(brokerUrl);
    this.client.on('connect', () => {
      console.info('... MQTT connected');
      console.info(this.client);
      this.emit(MqttClientService.EVENTS.CONNECTED, this.client);
    });
    this.client.on('error', this.onError);
    this.client.on('message', (params) => {
      this.onMessage(params);
    });
  }

  onError(error) {
    console.error(error);
  }

  onMessage(topic, payload, packet) {
    //console.info('MQTT message: [topic, payload, packet]');
    //console.info([topic, payload, packet]);
    //Now we see which callbacks have been assigned for a topic
    if (typeof this.subTokensMap.get(topic) !== 'undefined') {
      for (var token in this.subTokensMap.get(topic)){
        if (typeof token.callback === 'function' && payload !== 'undefined') {
          //Deserializatin of Data must happen here
          token.callback(payload);
        }
      };
    };

    /*try {
      if (topic.endsWith('/type')) {
        let msg = String(payload);
        console.info('"' + topic + '" message format = ' + msg);
      }
      else {
        let msg = DataPackMessage.deserializeBinary(payload);
        console.info('DataPackMessage');
        console.info(msg);
      }
    }
    catch (error) {
      console.error(error);
    }*/
  }

  //callback should have args topic, payload
  subscribeToTopic(topic, callback) {
    if (typeof callback !== 'function') {
      console.error('trying to subscribe to topic "' + topic + '", but no callback function given!');
      return;
    }

    const token = {
      topic: topic,
      callback: callback
    };
    if (this.subTokensMap.has(token.topic)){
      this.subTokensMap.get(token.topic).push(token);
    }
    else{
      this.subTokensMap.set(
        token.topic,
        [token]
      );
    }
    console.info('You have been subscribed to topic ' + topic);
    console.info(this.subTokensMap);
    return token;
  }

  static getProtoOneofData(protoMsg, oneofCaseNumber) {
    return jspb.Message.getField(protoMsg, oneofCaseNumber);
  }

  /*static getDataPackMessageOneofCaseString(protoMsg) {
    for (let dataCase in DataPackMessage.DataCase) {
      if (DataPackMessage.DataCase[dataCase] === protoMsg.getDataCase()) {
        return dataCase;
      }
    }
  }*/
}

MqttClientService.EVENTS = Object.freeze({
  CONNECTED: 'CONNECTED'
});