Skip to content

MQTT

What is MQTT?

MQTT, or MQ Telemetry Transport, has become a de facto standard in this world, as it’s easy to set up, and works well without a lot of computing overhead. “MQ” at one time stood for Message Queuing, but it has now apparently transcended its acronym status.

How Does MQTT Work?

MQTT uses a publish-subscribe methodology, where clients send and receive messages to each other through a centralized broker, also sometimes called a server. Clients both publish and subscribe to information channels called topics, and any data that passes on via the broker is tagged with a topic label. Once clients are pointed to the broker’s IP address, there’s no more system configuration involved. Clients simply send messages to a topic (that may or may not exist elsewhere) by publishing topic-tagged data to it. Clients listen to topics by subscribing to them.

Note that each client knows nothing about the other clients on the network; the broker merely takes care of data distribution. This can be to one client, many clients, or none, if nothing else is actually subscribed. All a client needs to know is where to find the broker/server. If a client’s IP address changes, or there are other modifications in the underlying system, as long as each client knows where to find the server, things will still function properly.

There are many ready to go and opensource mqtt client is available eg. mqtt fx, mqtt lens etc.

We can use then for teeting and visualisation purposes.

Example

Output

platformio.ini

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
;PlatformIO Project Configuration File
;
;   Build options: build flags, source filter
;   Upload options: custom upload port, speed and extra flags
;   Library options: dependencies, extra library storages
;   Advanced options: extra scripting
;
; Please visit documentation for the other options and examples
; https://docs.platformio.org/page/projectconf.html

[env:esp32dev]
platform = espressif32
framework = arduino
board = esp32dev
board_build.f_cpu = 160000000L
board_build.f_flash = 80000000L
board_build.flash_mode = qio
build_flags =
-DWEBSOCKET_DISABLED=true

lib_deps =
  ArduinoJson@~5.13.4
;   LoRa,
;   https://github.com/adafruit/Adafruit-GFX-Library.git
  ; https://github.com/MrityunjaiKumar/esp8266-oled-ssd1306.git
  ; https://github.com/MrityunjaiKumar/HELTEC_OLED.git
  ; https://github.com/ThingPulse/esp8266-oled-ssd1306.git
  ; https://github.com/adafruit/RadioHead.git
  ; https://github.com/JoaoLopesF/RemoteDebug.git
  ; https://github.com/MrityunjaiKumar/Heltec_ESP32_Lora.git
  https://github.com/knolleary/pubsubclient.git
  https://github.com/adafruit/Adafruit_Sensor.git
  https://github.com/adafruit/Adafruit_BME280_Library.git

monitor_speed = 115200
monitor_port = ${common.port}
upload_speed = 921600

upload_port = ${common.port}
; upload_port=192.168.0.247 
[common]
port=COM11

main.cpp

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <Arduino.h>
#include "common.h"
#include <WiFi.h>
#include "mqtt_credential.h"
#include <Wire.h>
#include <WiFi.h>
#include <PubSubClient.h>

#define use_dummy_data FALSE
// Replace the next variables with your SSID/Password combination
const char *ssid = WIFI_SSID;
const char *password = WIFI_PASSWORD;

// Add your MQTT Broker IP address, example:
//const char* mqtt_server = "192.168.1.144";
const char *mqtt_server = broker_address;

WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0;
char msg[50];
int value = 0;

void setup_wifi()
{
  delay(10);
  // We start by connecting to a WiFi network
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED)
  {
    delay(500);
    Serial.print(".");
  }

  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

void setup()
{
  Serial.begin(115200);
  setup_wifi();
  client.setServer(mqtt_server, broker_port);
  client.setCallback(callback);
  // #if !(use_dummy_data)
  // bme_setup();
  // #endif
    randomSeed(analogRead(0));

}

void callback(char *topic, byte *message, unsigned int length)
{
  Serial.print("Message arrived on topic: ");
  Serial.print(topic);
  Serial.print(". Message: ");
  String messageTemp;

  for (int i = 0; i < length; i++)
  {
    Serial.print((char)message[i]);
    messageTemp += (char)message[i];
  }
  Serial.println();
}

void reconnect()
{
  // Loop until we're reconnected
  while (!client.connected())
  {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect("demo", mqtt_client_username, mqtt_client_password))
    {
      Serial.println("connected");
      // Subscribe
      client.subscribe("Tank_node_sub");
    }
    else
    {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}
void loop()
{
  if (!client.connected())
  {
    reconnect();
  }
  client.loop();

  long now = millis();
  if (now - lastMsg > 5000)
  {

    lastMsg = now;
    // #if !(use_dummy_data)
    // client.publish(mqtt_publish_topic, get_BME_payload().c_str());
    // #else
     client.publish(mqtt_publish_topic, get_dummy_BME_payload().c_str());
    // #endif
    // Serial.println(get_BME_payload());
  }
}

common.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#ifndef _COMMOM_H
#define _COMMOM_H
#include <Arduino.h>
void callback(char *topic, byte *message, unsigned int length);
void reconnect();
void setup_wifi();
void bme_setup();
String get_BME_payload();
String get_dummy_BME_payload();
#endif //_COMMOM_H

mqtt_credential.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#ifndef _MQTT_CREDENTIALS_H
#define _MQTT_CREDENTIALS_H

#define WIFI_SSID "sincgrid"
#define WIFI_PASSWORD "sincgrid.com"

#define broker_address "192.168.0.106"
#define broker_port 1883
#define mqtt_client_username ""
#define mqtt_client_password ""
#define mqtt_subscribe_topic "IOTASYNC/sub"
#define mqtt_publish_topic "IOTASYNC/demo"

#endif //_MQTT_CREDENTIALS_H`
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#include "common.h"
#include <ArduinoJson.h>
#include <Adafruit_Sensor.h>
#include <Adafruit_BME280.h>
Adafruit_BME280 bme;

float temperature, humidity, pressure, altitude;

#define SEALEVELPRESSURE_HPA (1050)

void bme_setup()
{
    // put your setup code here, to run once:
    delay(100);
    bme.begin(0x76);
}

void bme_loop()
{
    // put your main code here, to run repeatedly:
    temperature = bme.readTemperature();
    humidity = bme.readHumidity();
    pressure = bme.readPressure() / 100.0F;
    altitude = bme.readAltitude(SEALEVELPRESSURE_HPA);

    Serial.print("Temperature = ");
    Serial.print(temperature);
    Serial.println(" *C");

    // Convert temperature to Fahrenheit
    /*Serial.print("Temperature = ");
  Serial.print(1.8 * bme.readTemperature() + 32);
  Serial.println(" *F");*/

    Serial.print("Pressure = ");
    Serial.print(pressure);
    Serial.println(" hPa");

    Serial.print("Approx. Altitude = ");
    Serial.print(altitude);
    Serial.println(" m");

    Serial.print("Humidity = ");
    Serial.print(humidity);
    Serial.println(" %");

    Serial.println();
    delay(1000);
}

String get_BME_payload()
{

    String message= "";

    temperature = bme.readTemperature();
    humidity = bme.readHumidity();
    pressure = bme.readPressure() / 100.0F;
    altitude = bme.readAltitude(SEALEVELPRESSURE_HPA);

    //  Serial.print("Temperature = ");
    //  Serial.print(temperature);
    //  Serial.println(" *C");

    //  // Convert temperature to Fahrenheit
    //  /*Serial.print("Temperature = ");
    //   Serial.print(1.8 * bme.readTemperature() + 32);
    //   Serial.println(" *F");*/

    //  Serial.print("Pressure = ");
    //  Serial.print(pressure);
    //  Serial.println(" hPa");

    //  Serial.print("Approx. Altitude = ");
    //  Serial.print(altitude);
    //  Serial.println(" m");

    //  Serial.print("Humidity = ");
    //  Serial.print(humidity);
    //  Serial.println(" %");

    //  Serial.println();

    //Encoder function
    StaticJsonBuffer<200> jsonBuffer;
    JsonObject &root = jsonBuffer.createObject();
    root["dev_id"] = "demo";
    JsonObject& iotasync_data = root.createNestedObject("iotasync_data");
    iotasync_data["temp"] = String(temperature);
    iotasync_data["alt"] = String(altitude);
    iotasync_data["humidity"] = String(humidity);
    iotasync_data["pressure"] = String(pressure);



    char JSONmessageBuffer[200];

    root.printTo(JSONmessageBuffer, sizeof(JSONmessageBuffer));
    message += String(JSONmessageBuffer);

    // StaticJsonBuffer<200> jsonBuffer2;
    // JsonObject &root_out = jsonBuffer2.createObject();


    // char JSONmessageBuffer2[200];

    // root_out.printTo(JSONmessageBuffer2, sizeof(JSONmessageBuffer2));
    // message_outer += String(JSONmessageBuffer2);
    Serial.println(message);
    return message;
}

String get_dummy_BME_payload(){

    String message= "";

    temperature = random(16, 17);
    humidity = random(45,47);
    pressure = random(5300,5500) / 100.0F;
    altitude = random(12,13);

    //Encoder function
    StaticJsonBuffer<200> jsonBuffer;
    JsonObject &root = jsonBuffer.createObject();
    root["dev_id"] = "demo";
    JsonObject& iotasync_data = root.createNestedObject("iotasync_data");
    iotasync_data["temp"] = String(temperature);
    iotasync_data["alt"] = String(altitude);
    iotasync_data["humidity"] = String(humidity);
    iotasync_data["pressure"] = String(pressure);



    char JSONmessageBuffer[200];

    root.printTo(JSONmessageBuffer, sizeof(JSONmessageBuffer));
    message += String(JSONmessageBuffer);

    // StaticJsonBuffer<200> jsonBuffer2;
    // JsonObject &root_out = jsonBuffer2.createObject();


    // char JSONmessageBuffer2[200];

    // root_out.printTo(JSONmessageBuffer2, sizeof(JSONmessageBuffer2));
    // message_outer += String(JSONmessageBuffer2);
    Serial.println(message);
    return message;
}

Git repository link for codes :- Repo