这篇文章讲解: 采用华为云最新推出的Flexus云服务器X实例EMQX服务器,搭建MQTT服务器,完成设备上云,实现物联网产品开发
随着物联网(IoT)技术的快速发展,越来越多的企业和个人开始探索如何将智能设备连接到云端,以便更有效地收集数据、监控状态以及远程控制。在这个过程中,消息队列遥测传输协议(MQTT)作为一种轻量级的消息传递协议,因其低带宽消耗、可靠的消息传递机制以及简单的架构而被广泛采用。为了满足这一需求,华为云推出了高性能的Flexus云服务器X实例,并且与EMQX这样的专业MQTT服务相结合,为企业提供了一个稳定且可扩展的平台来构建自己的物联网解决方案。
本文为开发者们提供一个详尽的指南,指导如何利用华为云最新的Flexus云服务器X实例结合EMQX MQTT代理软件,快速搭建起一个高效稳定的MQTT服务器环境。通过这个过程,不仅能够了解到选择合适云服务的重要性,还将学习到从环境配置到安全设置等一系列关键步骤。
华为云Flexus云服务器X实例是由国家科技进步奖获得者、华为公司Fellow及华为云首席架构师顾炯炯牵头研发的一款创新性云服务器。该实例基于华为的擎天QingTian架构、瑶光云脑和盘古大模型等核心技术,是业界首款应用驱动的柔性算力云服务器,适用于高科技、零售、金融、游戏等多个行业的通用工作负载场景,如网络应用、数据库、虚拟桌面、分析索引、微服务及持续集成/持续部署(CI/CD)等。
传统的云服务器通常只提供固定的CPU和内存规格,无法精准匹配用户的实际资源需求,导致资源利用效率低下。相比之下,华为云Flexus X实例提供了更为灵活的算力配置,支持超过100种不同的CPU与内存配比,最高可达到3:1的比例,从而更好地适应各种业务应用的需求。
Flexus X实例不仅在性能方面表现出色,还内置了智能应用调优算法,结合华为技术专家多年积累的经验,在基础模式下,其GeekBench单核及多核跑分可达业界同规格独享型实例的1.6倍。在性能模式下,Flexus X实例的性能超过了同类C系/G系/R系及S系旗舰型云主机的标准。
Flexus X实例还配备了X-Turbo加速技术和大模型底层智能调度技术,为关键业务应用提供加速功能。例如,在Flexus X实例上部署的MySQL、Redis和Nginx等应用,其性能最高可达业界同规格独享型实例的6倍(MySQL性能),长期运行时也能保持2倍的性能优势。
Flexus X实例在定价策略上定位于经济型级别,但其性能表现却超越了旗舰级云主机。通过动态业务画像规格优化等技术,用户在将业务从本地服务器或其他云服务提供商迁移到Flexus X实例时,可以节省高达30%的算力成本,从而实现业务的全面提速和效能提升,享受到云基础设施的显著改进体验。
EMQX,全称为Erlang/Enterprise Middleware MQTT Broker,是一款基于Erlang/OTP平台开发的开源物联网消息中间件。它专为大规模物联网应用设计,能够处理海量并发连接,并提供稳定的消息发布/订阅服务。作为一款高性能的MQTT协议服务器,EMQX不仅支持标准的MQTT v3.1、v3.1.1以及最新的v5.0版本协议,还提供了丰富的扩展功能来满足不同场景下的需求。
EMQX的核心优势在于其卓越的性能表现和高度可伸缩性。单个EMQX集群可以轻松管理数百万级别的设备连接,同时保持低延迟的消息传递能力。这使得EMQX成为构建大型物联网系统时的理想选择之一。此外,通过灵活配置规则引擎,用户可以根据业务逻辑定制化处理接收到的数据流,实现复杂事件处理、数据转换等功能。例如,当特定条件被触发时,可以自动执行预设的动作或将信息转发给其他系统进行进一步分析。
安全性方面,EMQX支持多种认证机制如用户名密码验证、客户端证书验证等,以确保只有授权用户才能访问敏感资源;同时也提供了TLS/SSL加密通信能力,保障了数据传输过程中的安全性和完整性。对于需要严格遵守行业标准的企业来说,这些特性尤为重要。
在集成度方面,EMQX展现了极高的灵活性与兼容性。无论是与其他数据库系统的对接(如MySQL, PostgreSQL, MongoDB等),还是与各种云服务提供商(如阿里云、AWS)的无缝衔接,EMQX都能很好地适应并促进整个生态系统的健康发展。EMQX还配备了详细的文档资料和技术支持服务,帮助开发者快速上手并解决遇到的问题。
链接:https://www.huaweicloud.com/
在官网首页的轮播图里可以看到,有Flexus云服务器的宣传。这是华为云匠心打造的下一代跃级产品,面向中低负载场景,性能倍增、体验跃级的服务器。
在产品页面,也可以看到Flexus云服务的选项,点击进去选购服务器。
链接:https://www.huaweicloud.com/product/flexus.html
在选购页面可以看到服务器推广器件,1年36块钱。 每个月的流量是100G,对于一些访问量不高的服务器或者测试用是非常合适的。
当前我要选择的服务器是:Flexus云服务器X实例 ,点击Flexus系列产品,选择X实例。Flexus云服务器X实例符合:柔性算力,六倍性能,旗舰体验,覆盖高科技、零售、金融、游戏等行业大多数通用工作负载场景。
针对时延敏感型业务请选择靠近您业务的区域,以降低网络时延,提高访问速度;针对和存量云产品有内网互通需求的业务,请选择和存量产品相同的区域。
我这选择ubuntu系统,用来搭建服务器。这个根据自己的情况选择,自己适合那一种就选择哪一种。
我选择150G大小。
设置好服务器的名字(如果你有多个服务器,为了自己好区别)和系统的登录密码。
云备份这个不买。有需要自己可以购买。
购买成功。
创建成功之后,邮箱会收到提示的。
链接:https://console.huaweicloud.com/ecm
在控制台可以看到服务器的详情。
点击服务器的名称,可以进去到详情页面。
填入设置好的密码。
登录成功。
自带的在浏览器里运行,每次需要打开浏览器,文件也不方便上传下载。
所以,这里开发阶段,我采用的 FinalShell登录到服务器。
新建SSH连接,输入连接信息。
登录成功。
接下来就可以进行开发了。
本章节将介绍如何在 Ubuntu 系统中下载安装并启动 EMQX。
支持的 Ubuntu 版本:
链接:https://www.emqx.io/docs/zh/v5.2/deploy/install-ubuntu.html
EMQX 支持通过 Apt 源安装,免除了用户需要手动处理依赖关系和更新软件包等的困扰,具有更加方便、安全和易用等优点。
在命令行终端,复制下面的命令过去,按下回车键。
【1】通过以下命令配置 EMQX Apt 源:
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
【2】运行以下命令安装 EMQX:
sudo apt-get install emqx
【3】运行以下命令启动 EMQX:
sudo systemctl start emqx
过程如下:
sudo systemctl emqx start 启动
sudo systemctl emqx stop 停止
sudo systemctl emqx restart 重启
EMQX 提供了一个内置的管理控制台,即 EMQX Dashboard。方便用户通过 Web 页面就能轻松管理和监控 EMQX 集群,并配置和使用所需的各项功能。
在浏览器里输入: http://122.112.225.194:18083 就可以访问EMQX的后台管理页面。可以管理以连接的客户端或检查运行状态。
这里面的IP地址,就是自己ECS云服务器的公网IP地址。
打开浏览器后,输入地址后打开的效果:
默认用户名和密码:
用户名:admin
密码:public
第一次登录会提示你修改新密码,如果不想设置,也可以选择跳过(公网服务器部署,还是要修改密码安全些)。
下面修改新密码:
登录成功的页面显示如下:
这里可以配置MQTT的一些参数,根据自己的需求进行配置。
新建一个客户端,点击连接。
连接之后,然后点击订阅,和发布,如果下面消息能正常的接收。说明MQTT服务器通信是已经正常,没问题了。
并且在这个页面也可以看到主题发布和主题订阅的格式。
接下来就打开我们自己的MQTT客户端登录MQTT服务器进行测试数据的通信。
端口选择: 1883
根据软件参数填入参数,登录,进行主题的发布和订阅。
说明: 目前还没有配置客户端认证,现在只要IP和端口输入正确,MQTT三元组可以随便输入,都可以登录上服务器的,服务器没有对三元组做校验。
EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQX。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQX 将根据匿名认证启用情况决定是否允许客户端连接。
然后打开EMQX的管理后台,可以看到我们的设备已经登录服务器了,名字为test1。
在订阅主题的页面也可以看到我们客户端设备订阅的主题。
EMQX 默认配置中启用了匿名认证,任何客户端都能接入 EMQX。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQX 将根据匿名认证启用情况决定是否允许客户端连接。
在正式产品里肯定是要启用认证的,不然任何设备都能接入。
下面就介绍如何配置 客户端认证。
【1】打开客户端认证页面
【2】选择密码认证
【3】选择内置数据库
【4】设置认证方式(都可以默认,不用改),直接点击创建。
【5】创建成功后,点击用户管理
【6】添加用户
【7】添加成功
【8】添加完毕之后,打开MQTT客户端可以进行测试。
登录的时候,MQTT用户名和密码必须输入正确,按照上一步添加的信息进行如实填写,否则是无法登录服务器的。
客户端授权页面可以配置每个客户端(设备)的主题发布,订阅权限。限制它是否可以发布主题,订阅主题。 如果有需要就可以进行配置。
http://127.0.0.1:18083/#/authorization/detail/built_in_database?tab=users
【1】创建数据源
【2】选择内置数据库
【3】完成创建
【4】点击权限管理
【5】选择客户端ID,点击添加
【6】配置权限
在集成选项里,可以对设备数据处理。 比如:转发到自己的HTTP服务器,转发到自己其他的MQTT服务器,创建规则,某些事件触发某些动作等等。
选择数据桥接。
可以把数据发送端自己的HTTP服务器,或者发送到其他的MQTT服务器。
选择HTTP服务 (如果自己有HTTP服务器,可以将数据转发给自己的HTTP服务器)。
为了方便测试设备间互相订阅主题,数据收发,在客户端认证页面至少添加2个设备。我这里分别添加了test1和test2。
设备A订阅设备B的主题,设备B订阅设备A的主题,实现数据互发。
设备A的MQTT信息:
MQTT服务器地址:122.112.225.194
MQTT服务器端口号:1883
MQTT客户端ID:AAA
MQTT用户名:test1
MQTT登录密码:12345678
订阅主题:BBB/#
发布主题:AAA/1
发布的消息:{ "msg": "我是AAA设备" }
设备B的MQTT信息:
MQTT服务器地址:122.112.225.194
MQTT服务器端口号:1883
MQTT客户端ID:BBB
MQTT用户名:test2
MQTT登录密码:12345678
订阅主题:AAA/#
发布主题:BBB/1
发布的消息:{ "msg": "我是BBB设备" }
只要是MQTT客户端能正常上云通信了,那么单片机也是一样的。
上位机也可以采用MQTT协议接入服务器,订阅设备的主题,就可以实时接收设备的消息(当然,也可以采用HTTP协议接入)。
EMQX支持将设备上传的数据转发到其他地方,比如,自己的HTTP服务器。方便自己服务器进行其他的处理。
通过数据桥接,用户可以实时地将消息从 EMQX 发送到外部数据系统,或者从外部数据系统拉取数据并发送到 EMQX 的某个主题。而 EMQX Dashboard 提供了可视化创建数据桥接的能力,只需在页面中配置相关资源即可。
本章节就介绍如何搭建自己的HTTP服务器。配置EMQX转发数据到自己的HTTP服务器,保存处理数据。
我这里直接使用python写代码搭建一个HTTP服务器。 ECS服务器上默认没有安装python3,需要先安装一下。
【1】安装python3
root@emqx:~/emqx# apt install python3
Reading package lists... Done
Building dependency tree
Reading state information... Done
python3 is already the newest version (3.6.7-1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 1 not upgraded.
【2】编写代码
from flask import Flask, json, request
app = Flask(__name__)
@app.route('/', methods=['POST'])
def print_messages():
reply= {"result": "ok", "message": "success"}
print("got post request: ", request.get_data())
return json.dumps(reply), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
将以上代码保存到一个名为 server.py 的文件中。
这段代码创建了一个使用 Flask 框架的 Web 服务器,可以接收根路径的 POST 请求。当接收到 POST 请求时,服务器会将请求的数据打印到终端,并返回一个 JSON 格式的响应给客户端。服务器将在本地运行,并监听默认的 8000 端口。
【3】运行程序
# 安装 flask 依赖
pip install flask
pip3 install flask
# 启动服务
python3 server.py
在命令行中执行 python3 server.py,就可以启动一个简单的HTTP服务器,可以接收并处理POST请求。当有POST请求发生时,服务器将返回收到的POST数据。可以根据自己的需要,进一步扩展处理POST请求的逻辑。
**运行示例: ** (代码可以在本地写好上传到服务器,也可以直接 vim server.py 打开编辑器直接编写 )
可以通过发送 POST 请求到 http://your-server-ip:8000/ 的方式来测试这个服务器。
比如:
http://122.112.225.194:8000/
【1】在集成选项里,可以对设备数据处理,将数据转发到自己的HTTP服务器。
【2】选择Webhook。
Webhook,使用 Webhook 来转发数据到 HTTP 服务;
使用 Webhook 其实就是将 EMQX 接收并处理后的数据发送到一个 HTTP 服务上,再根据预设好的 HTTP 服务来处理和集成业务数据。
同样用户需要有一个预先搭建好的 HTTP 服务,需要在配置信息页面填写 HTTP 请求的服务地址,选择一个请求方法 POST、GET、PUT 或 DELETE,配置请求头,将需要发送的数据使用模板语法填写到请求体(body)中即可。
【3】选配置Webhook
触发器选择所有消息和事件,URL里填自己的服务器地址。
【4】点击测试。 测试服务器是否OK。
【6】没问题就直接点击保存
【7】创建成功
【8】保存之后。 会自动创建规则和数据桥接。 非常方便。
【1】打开MQTT客户端,发送数据测试。
【2】看python服务器的终端,可以看到收到了EMQX服务器转发过来的数据。
如果自己接下来想要进行其他的操作,服务器写代码进行对应的处理即可。
【3】 点击这个服务可以看到已经触发转发的详情。
一般开发一套完整的物联网产品。一般会分为设备端,服务器,上位机部分。
设备端:就是硬件端。采集本地传感器的数据上传到服务器,或者接收服务器下发的指令完成某些控制。
比如:STM32 + ESP8266 + 各种传感器 就是一个硬件设备端。 可以通过ESP8266联网上传数据。
服务器:也就是MQTT服务器端,比如: 自己采用EMQX搭建的MQTT服务器,或者采用阿里云、华为云、OneNet这些平台的IOT服务器。
上位机:上位机就是给用户使用的,用于远程控制设备,查看设备。 比如:微信小程序、手机APP、电脑上位机、web网页端等等。
那么这个章节,就介绍利用EMQX提供的API接口与MQTT客户端设备进行通信,完成数据上传,命令下发等功能,可以利用此接口完成上位机的开发。
帮助文档地址: https://www.emqx.io/docs/zh/v5.0/admin/api.html#%E8%AE%A4%E8%AF%81
EMQX 提供了管理监控 REST API,这些 API 遵循 OpenAPI (Swagger) 3.0 规范。
EMQX 在 REST API 上做了版本控制,EMQX 5.0.0 以后的所有 API 调用均以 /api/v5 开头。
EMQX 服务启动后,可以访问 http://localhost:18083/api-docs/index.html (opens new window)来查看 API 的文档。还可以直接在 Swagger UI 上尝试执行一些 API。
比如: 我的EMQX服务器是在华为云ECS服务器上搭建,公网IP是: 122.112.225.194
那我访问API文档的地址就是下面这样的格式: 在浏览器里打开即可。
http://122.112.225.194:18083/api-docs/index.html
访问效果如下:
【1】登录EMQX的后台管理页面: http://122.112.225.194:18083/
【2】找到菜单里的 系统设置选项–>API密匙。
【3】创建密匙。
【4】填写密匙名称
【5】创建成功
【6】得到API Key 和 Secret Key
API Key : f072a6e9758b8cdf
Secret Key : LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG
上一步已经创建好API的访问密匙,这里就以 获取节点信息为例,调用获取节点信息的API接口,获取节点 信息。
接口在API文档里的介绍:
根据前面的API访问路径规则说明; 那么,获取节点信息的API完整访问路径为:
http://122.112.225.194:18083/api/v5/nodes
接下来就用python写一份代码,测试一下接口是否可以正常访问。 python代码直接放服务器运行(主要是我本地没有安装python环境,云服务器的环境是已经安装OK的,测试方便)。
【1】在云服务器上创建一个python文件,方便测试代码
【2】创建之后FinaShell自动上传到服务器
【3】编辑代码
在这里双击要编辑的文件,就可以打开文件进行编辑。默认采用内置的编辑器,也可以选择自己电脑上的外置编辑器。
【4】代码编辑完成,按下键盘快捷键Ctrl + S 保存文件内容,保存之后文件内容会自动同步到服务器。
保存后提示,自动上传。
写入的代码如下:
import urllib.request
import json
import base64
username = 'f072a6e9758b8cdf'
password = 'LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG'
url = 'http://122.112.225.194:18083/api/v5/nodes'
req = urllib.request.Request(url)
req.add_header('Content-Type', 'application/json')
auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode()
req.add_header('Authorization', auth_header)
with urllib.request.urlopen(req) as response:
data = json.loads(response.read().decode())
print(data)
【5】执行代码,返回结果
通过返回信息来看,节点信息获取是没有问题的。
root@emqx:~/emqx# python3 http_api_test.sh
[{'connections': 0, 'edition': 'Opensource', 'live_connections': 0, 'load1': 0.0, 'load15': 0.0, 'load5': 0.0, 'log_path': '/var/log/emqx', 'max_fds': 1048576, 'memory_total': '3.66G', 'memory_used': '612.59M', 'node': 'emqx@127.0.0.1', 'node_status': 'running', 'otp_release': '25.3.2-2/13.2.2', 'process_available': 2097152, 'process_used': 543, 'role': 'core', 'sys_path': '/usr/lib/emqx', 'uptime': 84000040, 'version': '5.3.1-alpha.1'}]
root@emqx:~/emqx# python3 http_api_test.sh
[{'connections': 0, 'edition': 'Opensource', 'live_connections': 0, 'load1': 0.0, 'load15': 0.0, 'load5': 0.0, 'log_path': '/var/log/emqx', 'max_fds': 1048576, 'memory_total': '3.66G', 'memory_used': '613.23M', 'node': 'emqx@127.0.0.1', 'node_status': 'running', 'otp_release': '25.3.2-2/13.2.2', 'process_available': 2097152, 'process_used': 543, 'role': 'core', 'sys_path': '/usr/lib/emqx', 'uptime': 84008046, 'version': '5.3.1-alpha.1'}]
在编写代码之前,可以先测试下API接口的效果,可以直接在Swagger UI界面直接调试API。
地址: http://122.112.225.194:18083/api-docs/index.html#/
例如:以获取以订阅主题列表的API接口为例。
【1】在Swagger UI界面上找到对应的API接口。
【2】点击API说明,展开详情
【3】点击右边的试试看按钮。
【4】点击执行
【5】然后会弹出提示框,让你填入用户名和密码。
这个用户名和密码就是前面创建API密匙生成的API Key(用户名) 和 Secret Key(密码) 。
API Key : f072a6e9758b8cdf
Secret Key : LzwPB71Yf7PTED39C7RGboz9C9ANhv83ULUynTANgog4hG
【6】根据提示输入用户名和密码,再点击登录。
【7】再次点击执行,就可以看到接口返回的数据了。
并且在页面上也写出了,请求的信息。使用curl命令行给出详细的请求过程,参考这个就可以自己写代码了。
【8】API调用,curl命令行执行代码如下:
curl -X 'GET' \
'http://122.112.225.194:18083/api/v5/topics?node=emqx%40127.0.0.1&page=1&limit=50' \
-H 'accept: application/json'
【9】python代码实现
import requests
url = 'http://122.112.225.194:18083/api/v5/topics?node=emqx%40127.0.0.1&page=1&limit=50'
headers = {'accept': 'application/json'}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
# 在这里处理返回的数据
print(data)
else:
print("请求失败,状态码:", response.status_code)
API里也支持发布主题,利用HTTP协议发布主题消息,如果设备端订阅了该主题,就可以收到API接口发布的消息。
【1】先找到发布主题的API接口
【2】点击API名字,展开详情
【3】点击右边的Try it out按钮。
【4】参数填写说明
因为这个接口是发送主题的,需要填参数,填自己需要发布什么主题,什么消息。
出来的框框里就是发布信息,根据自己需要修改。
topic就是发布的主题。 payload 就是发布的消息内容。 只要MQTT客户端订阅了这个主题,就可以收到发布的消息。
这个主题的名字可以随便改的。我这里就用默认的名字和内容测试。
{
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": false
}
【5】MQTT客户端登录。
打开MQTT客户端,登录服务器,订阅api/example/topic主题。
【6】在API调试页面,点击执行
【7】执行之后,在MQTT客户端的就可以收到API下发的消息了。
【8】API接口调用,curl命令行执行的代码如下:
curl -X 'POST' \
'http://122.112.225.194:18083/api/v5/publish' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": false
}'
【9】Python代码实现
import requests
import json
url = 'http://122.112.225.194:18083/api/v5/publish'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
"payload_encoding": "plain",
"topic": "api/example/topic",
"qos": 0,
"payload": "hello emqx api",
"properties": {
"payload_format_indicator": 0,
"message_expiry_interval": 0,
"response_topic": "some_other_topic",
"correlation_data": "string",
"user_properties": {
"foo": "bar"
},
"content_type": "text/plain"
},
"retain": False
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print("消息发布成功")
else:
print("消息发布失败,状态码:", response.status_code)
华为云Flexus X实例以其卓越的处理能力和稳定性著称,能够满足各种复杂业务场景下的需求,无论是大规模数据处理还是高并发访问的应用都能轻松应对。此外,华为云还提供了丰富的云上工具和服务,帮助企业快速搭建和优化自己的IT基础设施,降低运维成本的同时提升整体效率。