白季飞龙的个人主页

MQTT大杂烩

MQTT是什么

MQTT是一个轻量级的消息总线协议,提供消息订阅与发布

MQTT的安装

MQTT是一套标准,常用的服务端有Eclipse的Mosquitto。MQTT是IBM出品,Eclipse也是IBM出品,所以Mosquitto算是官方实现吧。

Mosquitto在macOS下的安装

执行命令 brew install mosquitto 即可安装

启动:

Mosquitto启动日志:

1543540708: mosquitto version 1.5.4 starting
1543540708: Config loaded from /usr/local/etc/mosquitto/mosquitto.conf.
1543540708: Opening ipv6 listen socket on port 1883.
1543540708: Opening ipv4 listen socket on port 1883.

可见Mosquitto默认监听1883端口

MQTT演示

MQTT是一套消息订阅与发布协议,这里我启动一个客户端订阅与监听animal主题,然后用另一个客户端向animal主题发送消息。控制台输出如下如下:

消息发布端

$ mosquitto_pub --topic animal --message "ant"

$ mosquitto_pub --topic animal --message "bee"

消息发布端没有控制台输出,发布消息后直接退出

消息订阅端

$ mosquitto_sub --topic animal

ant
bee

服务端

1543542192: New connection from ::1 on port 1883.
1543542192: New client connected from ::1 as mosqsub|73254-MacBook-A (c1, k60).
1543542192: No will message specified.
1543542192: Sending CONNACK to mosqsub|73254-MacBook-A (0, 0)
1543542192: Received SUBSCRIBE from mosqsub|73254-MacBook-A
1543542192:     animal (QoS 0)
1543542192: mosqsub|73254-MacBook-A 0 animal
1543542192: Sending SUBACK to mosqsub|73254-MacBook-A


1543542215: New connection from ::1 on port 1883.
1543542215: New client connected from ::1 as mosqpub|73257-MacBook-A (c1, k60).
1543542215: No will message specified.
1543542215: Sending CONNACK to mosqpub|73257-MacBook-A (0, 0)
1543542215: Received PUBLISH from mosqpub|73257-MacBook-A (d0, q0, r0, m0, 'animal', ... (3 bytes))
1543542215: Sending PUBLISH to mosqsub|73254-MacBook-A (d0, q0, r0, m0, 'animal', ... (3 bytes))
1543542215: Received DISCONNECT from mosqpub|73257-MacBook-A
1543542215: Client mosqpub|73257-MacBook-A disconnected.


1543542233: New connection from ::1 on port 1883.
1543542233: New client connected from ::1 as mosqpub|73267-MacBook-A (c1, k60).
1543542233: No will message specified.
1543542233: Sending CONNACK to mosqpub|73267-MacBook-A (0, 0)
1543542233: Received PUBLISH from mosqpub|73267-MacBook-A (d0, q0, r0, m0, 'animal', ... (3 bytes))
1543542233: Sending PUBLISH to mosqsub|73254-MacBook-A (d0, q0, r0, m0, 'animal', ... (3 bytes))
1543542233: Received DISCONNECT from mosqpub|73267-MacBook-A
1543542233: Client mosqpub|73267-MacBook-A disconnected.

MQTT启用密码验证

配置文件

password_file thepwdfile
allow_anonymous false

要点:

MQTT的Java客户端的使用

依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

Java示例代码

package bj;

import org.eclipse.paho.client.mqttv3.*;
import org.junit.Test;

/**
 * Created by BaiJiFeiLong@gmail.com at 2018/11/30 上午10:08
 */
public class MqttTest {

    // MQTT协议URL格式 tcp://<host>[:port] 默认端口1883
    private String mqttUrl = "tcp://localhost";

    @Test
    public void trump() throws MqttException {
        MqttClient mqttClient = new MqttClient(mqttUrl, "america/usa/trump");
        mqttClient.connect();
        // 向Eiffel问好
        mqttClient.publish("europe/france/eiffel", new MqttMessage("Eiffel, I am trump".getBytes()));
        // 向Thatcher问好
        mqttClient.publish("europe/uk/thatcher", new MqttMessage("Thatcher, I am trump".getBytes()));
        // 向G20问好(g20主题已经被多客户端订阅,所以可以算是广播消息)
        mqttClient.publish("g20", new MqttMessage("G20, I am trump".getBytes()));
    }

    /**
     * 订阅端1
     *
     * @throws MqttException        .
     * @throws InterruptedException .
     */
    @Test
    public void eiffel() throws MqttException, InterruptedException {
        MqttClient mqttClient = new MqttClient(mqttUrl, "europe/france/eiffel");
        mqttClient.connect();
        // 订阅自己,这样才能收到发给自己的消息
        mqttClient.subscribe("europe/france/eiffel");
        // 订阅G20,这样才能收到对应群组的消息
        mqttClient.subscribe("g20");
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {

            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("[Eiffel] received: " + new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });
        Thread.currentThread().join();
    }

    /**
     * 订阅端2
     *
     * @throws MqttException        .
     * @throws InterruptedException .
     */
    @Test
    public void thatcher() throws MqttException, InterruptedException {
        MqttClient mqttClient = new MqttClient(mqttUrl, "europe/uk/thatcher");
        mqttClient.connect();
        // 订阅自己,这样才能收到发给自己的消息
        mqttClient.subscribe("europe/uk/thatcher");
        // 订阅G20,这样才能收到对应群组的消息
        mqttClient.subscribe("g20");
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {

            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("[Thatcher] received: " + new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });
        Thread.currentThread().join();
    }
}

要点

控制台输出

  1. Eiffel
[Eiffel] received: Eiffel, I am trump
[Eiffel] received: G20, I am trump
  1. Thatcher
[Thatcher] received: Thatcher, I am trump
[Thatcher] received: G20, I am trump
  1. Trump

无输出

MQTT服务端日志

1545014641: mosquitto version 1.5.4 starting
1545014641: Config loaded from /usr/local/etc/mosquitto/mosquitto.conf.
1545014641: Opening ipv6 listen socket on port 1883.
1545014641: Opening ipv4 listen socket on port 1883.

1545014652: New connection from 127.0.0.1 on port 1883.
1545014652: New client connected from 127.0.0.1 as europe/france/eiffel (c1, k60).
1545014652: No will message specified.
1545014652: Sending CONNACK to europe/france/eiffel (0, 0)
1545014652: Received SUBSCRIBE from europe/france/eiffel
1545014652: 	europe/france/eiffel (QoS 1)
1545014652: europe/france/eiffel 1 europe/france/eiffel
1545014652: Sending SUBACK to europe/france/eiffel
1545014652: Received SUBSCRIBE from europe/france/eiffel
1545014652: 	g20 (QoS 1)
1545014652: europe/france/eiffel 1 g20
1545014652: Sending SUBACK to europe/france/eiffel

1545014660: New connection from 127.0.0.1 on port 1883.
1545014660: New client connected from 127.0.0.1 as europe/uk/thatcher (c1, k60).
1545014660: No will message specified.
1545014660: Sending CONNACK to europe/uk/thatcher (0, 0)
1545014660: Received SUBSCRIBE from europe/uk/thatcher
1545014660: 	europe/uk/thatcher (QoS 1)
1545014660: europe/uk/thatcher 1 europe/uk/thatcher
1545014660: Sending SUBACK to europe/uk/thatcher
1545014660: Received SUBSCRIBE from europe/uk/thatcher
1545014660: 	g20 (QoS 1)
1545014660: europe/uk/thatcher 1 g20
1545014660: Sending SUBACK to europe/uk/thatcher

1545014669: New connection from 127.0.0.1 on port 1883.
1545014669: New client connected from 127.0.0.1 as america/usa/trump (c1, k60).
1545014669: No will message specified.
1545014669: Sending CONNACK to america/usa/trump (0, 0)
1545014669: Received PUBLISH from america/usa/trump (d0, q1, r0, m1, 'europe/france/eiffel', ... (18 bytes))
1545014669: Sending PUBACK to america/usa/trump (Mid: 1)
1545014669: Sending PUBLISH to europe/france/eiffel (d0, q1, r0, m1, 'europe/france/eiffel', ... (18 bytes))
1545014669: Received PUBLISH from america/usa/trump (d0, q1, r0, m2, 'europe/uk/thatcher', ... (20 bytes))
1545014669: Sending PUBACK to america/usa/trump (Mid: 2)
1545014669: Sending PUBLISH to europe/uk/thatcher (d0, q1, r0, m1, 'europe/uk/thatcher', ... (20 bytes))
1545014669: Received PUBACK from europe/france/eiffel (Mid: 1)
1545014669: Received PUBACK from europe/uk/thatcher (Mid: 1)
1545014669: Received PUBLISH from america/usa/trump (d0, q1, r0, m3, 'g20', ... (15 bytes))
1545014669: Sending PUBACK to america/usa/trump (Mid: 3)
1545014669: Sending PUBLISH to europe/france/eiffel (d0, q1, r0, m2, 'g20', ... (15 bytes))
1545014669: Sending PUBLISH to europe/uk/thatcher (d0, q1, r0, m2, 'g20', ... (15 bytes))
1545014669: Received PUBACK from europe/france/eiffel (Mid: 2)
1545014669: Received PUBACK from europe/uk/thatcher (Mid: 2)
1545014670: Socket error on client america/usa/trump, disconnecting.

MQTT在Web端的使用

MQTT可以在Web端通过WebSocket连接使用

Mosquitto设置

修改配置文件/usr/local/etc/mosquitto/mosquitto.conf,添加以下三行

port 1883
listener 1888
protocol websockets

注意:

JS库下载

https://raw.githubusercontent.com/eclipse/paho.mqtt.javascript/master/src/paho-mqtt.js

示例代码

<!DOCTYPE html>
<html lang="en" style="width: 100%; height: 100%;">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="paho-mqtt.js"></script>
    <!--<script src="https://raw.githubusercontent.com/eclipse/paho.mqtt.javascript/master/src/paho-mqtt.js"></script>-->
</head>
<body style="width: 100%; height: 100%; background: #eeeeee; display: flex; flex-direction: column; justify-content: center; align-items: center">

<div id="log" style="border: 5px solid #bbbbbb; padding: 10px; font-size: 18pt;">
</div>
</body>
<script>
    (function () {
        console.old = console.log;
        var logger = document.getElementById('log');
        console.log = function (message) {
            logger.innerHTML += "<div >";
            for (var i in arguments) {
                logger.innerHTML += arguments[i] + ' ';
            }
            logger.innerHTML += "</div>";
        };
    })();
    var client = new Paho.MQTT.Client("localhost", 1888, "", "foo");
    client.connect({
        onSuccess: function () {
            console.log("[Conn] Connected");
            console.log("[Topic] Subscribing food ...");
            client.subscribe("food");

            console.log(">>> [food] Sending apple ...");
            client.send("food", "apple");
            console.log(">>> [food] Sending banana ...");
            client.send("food", "banana");
            console.log(">>> [food] Sending cat ...");
            client.send("food", "cat");
        }
    });
    client.onConnectionLost = function () {
        console.log("[Conn] Disconnected")
    };
    client.onMessageArrived = function (message) {
        console.log("<<< Message received: ", "[" + message.topic + "]", message.payloadString)
    };


</script>
</html>

JS控制台输出

[Conn] Connected
[Topic] Subscribing food ...
>>> [food] Sending apple ...
>>> [food] Sending banana ...
>>> [food] Sending cat ...
<<< Message received: [food] apple
<<< Message received: [food] banana
<<< Message received: [food] cat

Web端单聊群聊示例

HTML

<!DOCTYPE html>
<html lang="en" class="bj-match-parent" style="font-size: 1.2em">
<head>
    <meta charset="UTF-8">
    <title>MQTT Demo</title>
    <script src="paho-mqtt.js"></script>
    <!--<script src="https://raw.githubusercontent.com/eclipse/paho.mqtt.javascript/master/src/paho-mqtt.js"></script>-->
    <style>
        .animal {
            width: 33.33%;
            background: #bbbbbb;
            box-sizing: border-box;
            min-height: 200px;
        }

        .animal > .content, #animals > .content {
            max-height: 250px;
            overflow: scroll;
        }

        .animal:nth-child(2) {
            border-left: 5px dashed #aaaaaa;
            border-right: 5px dashed #aaaaaa;
        }

        .animal > :first-child, #animals > :first-child, #logs > :first-child, #actions > :first-child {
            background: #999999;
        }

        .bj-linear-layout {
            display: flex;
            flex-direction: row;
            box-sizing: border-box;
        }

        .bj-match-parent {
            width: 100%;
            height: 100%;
        }

        .bj-gravity-center {
            justify-content: center;
            align-items: center;
        }

    </style>
</head>
<body class="bj-linear-layout bj-match-parent bj-gravity-center" style="flex-direction: column">

<div class="bj-linear-layout" style="flex-direction: column; ;background: #eeeeee; min-width: 80%; min-height: 80%">
    <div class="bj-linear-layout" style="background: #cccccc; flex-grow: 1">
        <div id="ant" class="animal bj-linear-layout" style="flex-direction: column">
            <div>Ant</div>
            <div class="content" style="flex-grow: 1">
                <div>Ready.</div>
            </div>
            <input type="text" class="message" data-name="ant" placeholder="Please input.">
        </div>
        <div id="bee" class="animal bj-linear-layout" style="flex-direction: column">
            <div>Bee</div>
            <div class="content" style="flex-grow: 1">Ready.<br></div>
            <input type="text" class="message" data-name="bee" placeholder="Please input.">
        </div>
        <div id="cat" class="animal bj-linear-layout" style="flex-direction: column">
            <div>Cat</div>
            <div class="content" style="flex-grow: 1">Ready.<br></div>
            <input type="text" class="message" data-name="cat" placeholder="Please input.">
        </div>
    </div>
    <div id="animals" class="bj-linear-layout"
         style="background: #bbbbbb;flex-direction: column; flex-grow: 1; min-height: 200px; max-height: 300px;">
        <div>Animals</div>
        <div class="content" style="flex-grow: 1">Ready.<br></div>
        <input type="text" id="group-message" data-name="cat" placeholder="Please input.">
    </div>
    <div id="actions" class="bj-linear-layout" style="background: #bbbbbb; flex-direction: column; flex-grow: 0">
        <div>Actions</div>
        <div class="content" style="display: flex; justify-content: space-around; padding: 7px">
            <button class="login" data-name="ant">Login(Ant)</button>
            <button class="login" data-name="bee">Login(Bee)</button>
            <button class="login" data-name="cat">Login(Cat)</button>
        </div>
    </div>
    <div id="logs" class="bj-linear-layout"
         style="background: #bbbbbb; flex-direction: column; flex-grow: 1">
        <div>Logs</div>
        <div id="logs-content" style="height: 100px; overflow: scroll; font-size: 0.5rem">Ready.</div>
    </div>
</div>
</body>

<script>
    var client = null;
    var myName = null;
    (function () {
        console.old = console.log;
        var logger = document.getElementById('logs-content');
        console.log = function () {
            logger.innerHTML += "<div>";
            for (var i = 0; i < arguments.length; ++i) {
                console.old(arguments[i]);
                logger.innerHTML += arguments[i] + ' ';
            }
            logger.innerHTML += "</div>";
            logger.scrollTop = logger.scrollHeight;
        };
    })();

    [].slice.call(document.getElementsByClassName("login")).forEach(function (button) {
        button.addEventListener("click", function (e) {
            var id = e.target.id;
            var name = e.target.getAttribute("data-name");
            doLogin(name);
        });
    });

    [].slice.call(document.getElementsByClassName("message")).forEach(function (input) {
        input.addEventListener("keyup", function (e) {
            if (e.keyCode !== 13) return;
            var name = e.target.getAttribute("data-name");
            var message = e.target.value;
            e.target.value = "";
            client.send("user/" + name + "/inbox/" + myName, message);
            var element = document.getElementById(name).getElementsByClassName("content")[0];
            element.innerHTML += "<div style='text-align: right; color: forestgreen;'>" + myName + ": " + message + "</div>";
            element.scrollTop = element.scrollHeight;
            console.log(">>>", myName, "=>", name, ":", message);
        })
    });

    document.getElementById("group-message").addEventListener("keyup", function (e) {
        if (e.keyCode !== 13) return;
        var group = "animals";
        var message = e.target.value;
        client.send("group/" + group + "/inbox/" + myName, message);
        e.target.value = "";
        var element = document.getElementById(group).getElementsByClassName("content")[0];
        element.innerHTML += "<div style='text-align: right; color: green;'>" + myName + ": " + message + "</div>";
        element.scrollTop = element.scrollHeight;
        console.log(">>>", myName, "=>", group, ":", message);
    });

    function doLogin(name) {
        if (client !== null) {
            client.disconnect()
        }
        myName = name;
        console.log("Login...", name);
        client = new Paho.MQTT.Client("localhost", 1888, "", name);
        client.connect({
            onSuccess: function () {
                console.log("[Conn] Connected");
                console.log("[Topic] Subscribing ...");
                client.subscribe("user/" + name + "/inbox/+");
                client.subscribe("group/animals/inbox/+");
            }
        });
        client.onConnectionLost = function () {
            console.log("[Conn] Disconnected")
        };
        client.onMessageArrived = function (message) {
            var topic = message.topic;
            var payload = message.payloadString;
            var fromName = topic.substring(topic.lastIndexOf("/") + 1);
            var isGroupMessage = topic.substring(0, topic.indexOf("/")) === "group";
            console.log("<<< Message received: ", "[" + topic + "]", fromName, payload);
            var element;
            if (isGroupMessage) {
                if (fromName === myName) {
                    console.log("my message .")
                } else {
                    element = document.getElementById("animals").getElementsByClassName("content")[0];
                    element.innerHTML += "<div style='color: dodgerblue;'>" + fromName + ": " + payload + "</divs>";
                    element.scrollTop = element.scrollHeight;
                }
            } else {
                element = document.getElementById(fromName).getElementsByClassName("content")[0];
                element.innerHTML += "<div style='color: cornflowerblue;'>" + fromName + ": " + payload + "</div>";
                element.scrollTop = element.scrollHeight;
            }
        };
    }

</script>
</html>

JS控制台输出

Ready.
Login... cat
[Conn] Connected
[Topic] Subscribing ...
<<< Message received: [group/animals/inbox/ant] ant a
<<< Message received: [group/animals/inbox/ant] ant a
<<< Message received: [group/animals/inbox/ant] ant a
<<< Message received: [group/animals/inbox/bee] bee b
<<< Message received: [group/animals/inbox/bee] bee b
<<< Message received: [group/animals/inbox/bee] bee b
>>> cat => animals : c
<<< Message received: [group/animals/inbox/cat] cat c
my message .
>>> cat => animals : c
<<< Message received: [group/animals/inbox/cat] cat c
my message .
>>> cat => animals : c
<<< Message received: [group/animals/inbox/cat] cat c
my message .

文章首发: https://baijifeilong.github.io


漫漫路,莫论逍遥;潜心修,只为悟道