MQTT是一个轻量级的消息总线协议,提供消息订阅与发布
MQTT是一套标准,常用的服务端有Eclipse的Mosquitto。MQTT是IBM出品,Eclipse也是IBM出品,所以Mosquitto算是官方实现吧。
执行命令 brew install mosquitto
即可安装
启动:
brew services start mosquitto
可以启动,但是我这边启动不了,也找不到报错信息和相关日志。所以放弃/usr/local/opt/mosquitto/sbin/mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf -v
以Verbose模式阻塞式启动。也可以不使用-c
选项,用默认配置启动MosquittoMosquitto启动日志:
1543540708: mosquitto version 1.5.4 starting
1543540708: Config loaded from /usr/local/etc/mosquitto/mosquitto.conf.
1543540708: Opening ipv6 listen socket on port 1883.
1543540708: Opening ipv4 listen socket on port 1883.
可见Mosquitto默认监听1883端口
MQTT是一套消息订阅与发布协议,这里我启动一个客户端订阅与监听animal
主题,然后用另一个客户端向animal
主题发送消息。控制台输出如下如下:
$ mosquitto_pub --topic animal --message "ant"
$ mosquitto_pub --topic animal --message "bee"
消息发布端没有控制台输出,发布消息后直接退出
$ mosquitto_sub --topic animal
ant
bee
1543542192: New connection from ::1 on port 1883.
1543542192: New client connected from ::1 as mosqsub|73254-MacBook-A (c1, k60).
1543542192: No will message specified.
1543542192: Sending CONNACK to mosqsub|73254-MacBook-A (0, 0)
1543542192: Received SUBSCRIBE from mosqsub|73254-MacBook-A
1543542192: animal (QoS 0)
1543542192: mosqsub|73254-MacBook-A 0 animal
1543542192: Sending SUBACK to mosqsub|73254-MacBook-A
1543542215: New connection from ::1 on port 1883.
1543542215: New client connected from ::1 as mosqpub|73257-MacBook-A (c1, k60).
1543542215: No will message specified.
1543542215: Sending CONNACK to mosqpub|73257-MacBook-A (0, 0)
1543542215: Received PUBLISH from mosqpub|73257-MacBook-A (d0, q0, r0, m0, 'animal', ... (3 bytes))
1543542215: Sending PUBLISH to mosqsub|73254-MacBook-A (d0, q0, r0, m0, 'animal', ... (3 bytes))
1543542215: Received DISCONNECT from mosqpub|73257-MacBook-A
1543542215: Client mosqpub|73257-MacBook-A disconnected.
1543542233: New connection from ::1 on port 1883.
1543542233: New client connected from ::1 as mosqpub|73267-MacBook-A (c1, k60).
1543542233: No will message specified.
1543542233: Sending CONNACK to mosqpub|73267-MacBook-A (0, 0)
1543542233: Received PUBLISH from mosqpub|73267-MacBook-A (d0, q0, r0, m0, 'animal', ... (3 bytes))
1543542233: Sending PUBLISH to mosqsub|73254-MacBook-A (d0, q0, r0, m0, 'animal', ... (3 bytes))
1543542233: Received DISCONNECT from mosqpub|73267-MacBook-A
1543542233: Client mosqpub|73267-MacBook-A disconnected.
配置文件
password_file thepwdfile
allow_anonymous false
要点:
allow_annoymous
和 password_file
必须同时设置password_file
若使用相对路径,相对的是工作目录password_file
必须绝对一定不能为空,否则不报错,而且不用密码也可登录!!!<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
package bj;
import org.eclipse.paho.client.mqttv3.*;
import org.junit.Test;
/**
* Created by BaiJiFeiLong@gmail.com at 2018/11/30 上午10:08
*/
public class MqttTest {
// MQTT协议URL格式 tcp://<host>[:port] 默认端口1883
private String mqttUrl = "tcp://localhost";
@Test
public void trump() throws MqttException {
MqttClient mqttClient = new MqttClient(mqttUrl, "america/usa/trump");
mqttClient.connect();
// 向Eiffel问好
mqttClient.publish("europe/france/eiffel", new MqttMessage("Eiffel, I am trump".getBytes()));
// 向Thatcher问好
mqttClient.publish("europe/uk/thatcher", new MqttMessage("Thatcher, I am trump".getBytes()));
// 向G20问好(g20主题已经被多客户端订阅,所以可以算是广播消息)
mqttClient.publish("g20", new MqttMessage("G20, I am trump".getBytes()));
}
/**
* 订阅端1
*
* @throws MqttException .
* @throws InterruptedException .
*/
@Test
public void eiffel() throws MqttException, InterruptedException {
MqttClient mqttClient = new MqttClient(mqttUrl, "europe/france/eiffel");
mqttClient.connect();
// 订阅自己,这样才能收到发给自己的消息
mqttClient.subscribe("europe/france/eiffel");
// 订阅G20,这样才能收到对应群组的消息
mqttClient.subscribe("g20");
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("[Eiffel] received: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
Thread.currentThread().join();
}
/**
* 订阅端2
*
* @throws MqttException .
* @throws InterruptedException .
*/
@Test
public void thatcher() throws MqttException, InterruptedException {
MqttClient mqttClient = new MqttClient(mqttUrl, "europe/uk/thatcher");
mqttClient.connect();
// 订阅自己,这样才能收到发给自己的消息
mqttClient.subscribe("europe/uk/thatcher");
// 订阅G20,这样才能收到对应群组的消息
mqttClient.subscribe("g20");
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("[Thatcher] received: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
Thread.currentThread().join();
}
}
+
匹配单级主题,#
匹配多级主题[Eiffel] received: Eiffel, I am trump
[Eiffel] received: G20, I am trump
[Thatcher] received: Thatcher, I am trump
[Thatcher] received: G20, I am trump
无输出
1545014641: mosquitto version 1.5.4 starting
1545014641: Config loaded from /usr/local/etc/mosquitto/mosquitto.conf.
1545014641: Opening ipv6 listen socket on port 1883.
1545014641: Opening ipv4 listen socket on port 1883.
1545014652: New connection from 127.0.0.1 on port 1883.
1545014652: New client connected from 127.0.0.1 as europe/france/eiffel (c1, k60).
1545014652: No will message specified.
1545014652: Sending CONNACK to europe/france/eiffel (0, 0)
1545014652: Received SUBSCRIBE from europe/france/eiffel
1545014652: europe/france/eiffel (QoS 1)
1545014652: europe/france/eiffel 1 europe/france/eiffel
1545014652: Sending SUBACK to europe/france/eiffel
1545014652: Received SUBSCRIBE from europe/france/eiffel
1545014652: g20 (QoS 1)
1545014652: europe/france/eiffel 1 g20
1545014652: Sending SUBACK to europe/france/eiffel
1545014660: New connection from 127.0.0.1 on port 1883.
1545014660: New client connected from 127.0.0.1 as europe/uk/thatcher (c1, k60).
1545014660: No will message specified.
1545014660: Sending CONNACK to europe/uk/thatcher (0, 0)
1545014660: Received SUBSCRIBE from europe/uk/thatcher
1545014660: europe/uk/thatcher (QoS 1)
1545014660: europe/uk/thatcher 1 europe/uk/thatcher
1545014660: Sending SUBACK to europe/uk/thatcher
1545014660: Received SUBSCRIBE from europe/uk/thatcher
1545014660: g20 (QoS 1)
1545014660: europe/uk/thatcher 1 g20
1545014660: Sending SUBACK to europe/uk/thatcher
1545014669: New connection from 127.0.0.1 on port 1883.
1545014669: New client connected from 127.0.0.1 as america/usa/trump (c1, k60).
1545014669: No will message specified.
1545014669: Sending CONNACK to america/usa/trump (0, 0)
1545014669: Received PUBLISH from america/usa/trump (d0, q1, r0, m1, 'europe/france/eiffel', ... (18 bytes))
1545014669: Sending PUBACK to america/usa/trump (Mid: 1)
1545014669: Sending PUBLISH to europe/france/eiffel (d0, q1, r0, m1, 'europe/france/eiffel', ... (18 bytes))
1545014669: Received PUBLISH from america/usa/trump (d0, q1, r0, m2, 'europe/uk/thatcher', ... (20 bytes))
1545014669: Sending PUBACK to america/usa/trump (Mid: 2)
1545014669: Sending PUBLISH to europe/uk/thatcher (d0, q1, r0, m1, 'europe/uk/thatcher', ... (20 bytes))
1545014669: Received PUBACK from europe/france/eiffel (Mid: 1)
1545014669: Received PUBACK from europe/uk/thatcher (Mid: 1)
1545014669: Received PUBLISH from america/usa/trump (d0, q1, r0, m3, 'g20', ... (15 bytes))
1545014669: Sending PUBACK to america/usa/trump (Mid: 3)
1545014669: Sending PUBLISH to europe/france/eiffel (d0, q1, r0, m2, 'g20', ... (15 bytes))
1545014669: Sending PUBLISH to europe/uk/thatcher (d0, q1, r0, m2, 'g20', ... (15 bytes))
1545014669: Received PUBACK from europe/france/eiffel (Mid: 2)
1545014669: Received PUBACK from europe/uk/thatcher (Mid: 2)
1545014670: Socket error on client america/usa/trump, disconnecting.
MQTT可以在Web端通过WebSocket连接使用
修改配置文件/usr/local/etc/mosquitto/mosquitto.conf
,添加以下三行
port 1883
listener 1888
protocol websockets
注意:
https://raw.githubusercontent.com/eclipse/paho.mqtt.javascript/master/src/paho-mqtt.js
<!DOCTYPE html>
<html lang="en" style="width: 100%; height: 100%;">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="paho-mqtt.js"></script>
<!--<script src="https://raw.githubusercontent.com/eclipse/paho.mqtt.javascript/master/src/paho-mqtt.js"></script>-->
</head>
<body style="width: 100%; height: 100%; background: #eeeeee; display: flex; flex-direction: column; justify-content: center; align-items: center">
<div id="log" style="border: 5px solid #bbbbbb; padding: 10px; font-size: 18pt;">
</div>
</body>
<script>
(function () {
console.old = console.log;
var logger = document.getElementById('log');
console.log = function (message) {
logger.innerHTML += "<div >";
for (var i in arguments) {
logger.innerHTML += arguments[i] + ' ';
}
logger.innerHTML += "</div>";
};
})();
var client = new Paho.MQTT.Client("localhost", 1888, "", "foo");
client.connect({
onSuccess: function () {
console.log("[Conn] Connected");
console.log("[Topic] Subscribing food ...");
client.subscribe("food");
console.log(">>> [food] Sending apple ...");
client.send("food", "apple");
console.log(">>> [food] Sending banana ...");
client.send("food", "banana");
console.log(">>> [food] Sending cat ...");
client.send("food", "cat");
}
});
client.onConnectionLost = function () {
console.log("[Conn] Disconnected")
};
client.onMessageArrived = function (message) {
console.log("<<< Message received: ", "[" + message.topic + "]", message.payloadString)
};
</script>
</html>
[Conn] Connected
[Topic] Subscribing food ...
>>> [food] Sending apple ...
>>> [food] Sending banana ...
>>> [food] Sending cat ...
<<< Message received: [food] apple
<<< Message received: [food] banana
<<< Message received: [food] cat
<!DOCTYPE html>
<html lang="en" class="bj-match-parent" style="font-size: 1.2em">
<head>
<meta charset="UTF-8">
<title>MQTT Demo</title>
<script src="paho-mqtt.js"></script>
<!--<script src="https://raw.githubusercontent.com/eclipse/paho.mqtt.javascript/master/src/paho-mqtt.js"></script>-->
<style>
.animal {
width: 33.33%;
background: #bbbbbb;
box-sizing: border-box;
min-height: 200px;
}
.animal > .content, #animals > .content {
max-height: 250px;
overflow: scroll;
}
.animal:nth-child(2) {
border-left: 5px dashed #aaaaaa;
border-right: 5px dashed #aaaaaa;
}
.animal > :first-child, #animals > :first-child, #logs > :first-child, #actions > :first-child {
background: #999999;
}
.bj-linear-layout {
display: flex;
flex-direction: row;
box-sizing: border-box;
}
.bj-match-parent {
width: 100%;
height: 100%;
}
.bj-gravity-center {
justify-content: center;
align-items: center;
}
</style>
</head>
<body class="bj-linear-layout bj-match-parent bj-gravity-center" style="flex-direction: column">
<div class="bj-linear-layout" style="flex-direction: column; ;background: #eeeeee; min-width: 80%; min-height: 80%">
<div class="bj-linear-layout" style="background: #cccccc; flex-grow: 1">
<div id="ant" class="animal bj-linear-layout" style="flex-direction: column">
<div>Ant</div>
<div class="content" style="flex-grow: 1">
<div>Ready.</div>
</div>
<input type="text" class="message" data-name="ant" placeholder="Please input.">
</div>
<div id="bee" class="animal bj-linear-layout" style="flex-direction: column">
<div>Bee</div>
<div class="content" style="flex-grow: 1">Ready.<br></div>
<input type="text" class="message" data-name="bee" placeholder="Please input.">
</div>
<div id="cat" class="animal bj-linear-layout" style="flex-direction: column">
<div>Cat</div>
<div class="content" style="flex-grow: 1">Ready.<br></div>
<input type="text" class="message" data-name="cat" placeholder="Please input.">
</div>
</div>
<div id="animals" class="bj-linear-layout"
style="background: #bbbbbb;flex-direction: column; flex-grow: 1; min-height: 200px; max-height: 300px;">
<div>Animals</div>
<div class="content" style="flex-grow: 1">Ready.<br></div>
<input type="text" id="group-message" data-name="cat" placeholder="Please input.">
</div>
<div id="actions" class="bj-linear-layout" style="background: #bbbbbb; flex-direction: column; flex-grow: 0">
<div>Actions</div>
<div class="content" style="display: flex; justify-content: space-around; padding: 7px">
<button class="login" data-name="ant">Login(Ant)</button>
<button class="login" data-name="bee">Login(Bee)</button>
<button class="login" data-name="cat">Login(Cat)</button>
</div>
</div>
<div id="logs" class="bj-linear-layout"
style="background: #bbbbbb; flex-direction: column; flex-grow: 1">
<div>Logs</div>
<div id="logs-content" style="height: 100px; overflow: scroll; font-size: 0.5rem">Ready.</div>
</div>
</div>
</body>
<script>
var client = null;
var myName = null;
(function () {
console.old = console.log;
var logger = document.getElementById('logs-content');
console.log = function () {
logger.innerHTML += "<div>";
for (var i = 0; i < arguments.length; ++i) {
console.old(arguments[i]);
logger.innerHTML += arguments[i] + ' ';
}
logger.innerHTML += "</div>";
logger.scrollTop = logger.scrollHeight;
};
})();
[].slice.call(document.getElementsByClassName("login")).forEach(function (button) {
button.addEventListener("click", function (e) {
var id = e.target.id;
var name = e.target.getAttribute("data-name");
doLogin(name);
});
});
[].slice.call(document.getElementsByClassName("message")).forEach(function (input) {
input.addEventListener("keyup", function (e) {
if (e.keyCode !== 13) return;
var name = e.target.getAttribute("data-name");
var message = e.target.value;
e.target.value = "";
client.send("user/" + name + "/inbox/" + myName, message);
var element = document.getElementById(name).getElementsByClassName("content")[0];
element.innerHTML += "<div style='text-align: right; color: forestgreen;'>" + myName + ": " + message + "</div>";
element.scrollTop = element.scrollHeight;
console.log(">>>", myName, "=>", name, ":", message);
})
});
document.getElementById("group-message").addEventListener("keyup", function (e) {
if (e.keyCode !== 13) return;
var group = "animals";
var message = e.target.value;
client.send("group/" + group + "/inbox/" + myName, message);
e.target.value = "";
var element = document.getElementById(group).getElementsByClassName("content")[0];
element.innerHTML += "<div style='text-align: right; color: green;'>" + myName + ": " + message + "</div>";
element.scrollTop = element.scrollHeight;
console.log(">>>", myName, "=>", group, ":", message);
});
function doLogin(name) {
if (client !== null) {
client.disconnect()
}
myName = name;
console.log("Login...", name);
client = new Paho.MQTT.Client("localhost", 1888, "", name);
client.connect({
onSuccess: function () {
console.log("[Conn] Connected");
console.log("[Topic] Subscribing ...");
client.subscribe("user/" + name + "/inbox/+");
client.subscribe("group/animals/inbox/+");
}
});
client.onConnectionLost = function () {
console.log("[Conn] Disconnected")
};
client.onMessageArrived = function (message) {
var topic = message.topic;
var payload = message.payloadString;
var fromName = topic.substring(topic.lastIndexOf("/") + 1);
var isGroupMessage = topic.substring(0, topic.indexOf("/")) === "group";
console.log("<<< Message received: ", "[" + topic + "]", fromName, payload);
var element;
if (isGroupMessage) {
if (fromName === myName) {
console.log("my message .")
} else {
element = document.getElementById("animals").getElementsByClassName("content")[0];
element.innerHTML += "<div style='color: dodgerblue;'>" + fromName + ": " + payload + "</divs>";
element.scrollTop = element.scrollHeight;
}
} else {
element = document.getElementById(fromName).getElementsByClassName("content")[0];
element.innerHTML += "<div style='color: cornflowerblue;'>" + fromName + ": " + payload + "</div>";
element.scrollTop = element.scrollHeight;
}
};
}
</script>
</html>
Ready.
Login... cat
[Conn] Connected
[Topic] Subscribing ...
<<< Message received: [group/animals/inbox/ant] ant a
<<< Message received: [group/animals/inbox/ant] ant a
<<< Message received: [group/animals/inbox/ant] ant a
<<< Message received: [group/animals/inbox/bee] bee b
<<< Message received: [group/animals/inbox/bee] bee b
<<< Message received: [group/animals/inbox/bee] bee b
>>> cat => animals : c
<<< Message received: [group/animals/inbox/cat] cat c
my message .
>>> cat => animals : c
<<< Message received: [group/animals/inbox/cat] cat c
my message .
>>> cat => animals : c
<<< Message received: [group/animals/inbox/cat] cat c
my message .