EMQX webhook消息转发Web服务器
一、前言
需求:获取设备通过mqtt协议发送过来的数据并将数据保存到外部服务中,期间需要使用EMQX代理服务器将消息转发到自有的Web服务中
实现:通过EMQX中的webhook实现消息转发
官方:https://www.emqx.io/docs/zh/v5.0/data-integration/data-bridge-webhook.html
二、实现
1、EMQX服务器搭建
EMQX下载、安装、启动
- 到EMQX官网进行下载:https://www.emqx.io/zh/downloads?os=Windows![EMQX下载安装启动](https://img-blog.csdnimg.cn/direct/a080a7f1224e477aa5c91df71fd3d1f5.png)
- 安装运行完成后,可直接访问 EMQX Dashboard 管理控制台 -
http://localhost:18083/
(localhost
根据实际IP地址修改) - 默认用户名及密码 - admin / public
- 停止EMQX服务 - 打开cmd,进入到emqx所在文件夹中的
bin
目录- 输入一下指令./emqx/bin/emqx stop
2、本地Web服务搭建
本次Web服务器使用Python Flask进行搭建
创建Flask项目
- 相关功能的代码实现直接在
app.py
中完成
代码
本次将 mqtt客户端 和 接收EMQX转发的消息数据 都写在该Flask项目中
import json
from flask import request, jsonify, Flask, Blueprint, render_template, session, current_app
from flask_mqtt import Mqtt
from werkzeug.local import LocalProxy
app = Flask(__name__)# 代理地址(根据实际使用的IP地址进行修改,需要和EMQX处于同一地址)
app.config['MQTT_BROKER_URL']='127.0.0.1'# 端口
app.config['MQTT_BROKER_PORT']=1883# 当需要验证用户名和密码时,设置该项(根据实际情况设定)# app.config['MQTT_USERNAME'] = 'user'# 当需要验证用户名和密码时,设置该项(根据实际情况设定)# app.config['MQTT_PASSWORD'] = '123456'# 设置心跳时间,单位为秒
app.config['MQTT_KEEPALIVE']=60# 如果服务器支持 TLS,则设置为 True
app.config['MQTT_TLS_ENABLED']=False# 主题(根据实际情况设定)
topic ='t/1'# 实例化
mqtt_client = Mqtt(app)@app.route('/')defindex():# 初始路由return render_template('index.html')@mqtt_client.on_connect()defhandle_connect(client, userdata, flags, rc):"""连接回调函数"""if rc ==0:print('Connected successfully')# 订阅主题
mqtt_client.subscribe(topic)else:# 连接失败print('Bad connection. Code:', rc)@mqtt_client.on_message()defhandle_mqtt_message(client, userdata, message):""" 消息回调函数 """# 定义接受到的消息
data =dict(# 主题
topic=message.topic,# 内容
payload=message.payload.decode())print(data)# 打印输出接收到的消息print('Received message on topic: {topic} with payload: {payload}'.format(**data))@app.route('/publish', methods=['POST'])defpublish_message():"""
消息发布接口(实际应用中,该接口可能需要处理一些复杂业务逻辑)
"""# 获取前端页面提交的数据,并格式化
request_data = request.get_json()# print("接收到的数据", request_data)# 发布消息
publish_result = mqtt_client.publish(request_data['topic'], request_data['payload'])# 返回JSON数据return jsonify({'code': publish_result[0]})@app.route('/emqx', methods=['POST'])deftest_emqx_conn():"""
测试 搭建简易EMQX HTTP服务(用于接收EMQX转发过来的消息)
在后面的 webhook数据桥接 创建中,URL填写为:http://127.0.0.1:5000/emqx
"""# 响应
reply ={"result":"ok","message":"success"}print("got post request: ", request.get_data())return json.dumps(reply),200if __name__ =='__main__':
app.debug =True
app.run()
3、EMQX中创建webhook数据桥接
- EMQX控制台中找到
webhook
,点击创建 - 输入数据桥接名称(要求是大小写英文字母和数字的组合)
- 请求方法选择 POST,
- URL 为 http://127.0.0.1:5000/emqx(根据实际使用填写)
- 其他使用默认值
- 点击最下方保存按钮完成规则创建。
4、EMQX中创建数据转发规则
- 创建好webhook后,会自动根据创建的webhook桥接生成一个规则
- 直接点击生成的规则中的 设置
SQL编辑器
根据个人实际业务进行修改,修改完成后直接点击更新
三、效果
版权归原作者 FREE_QIU 所有, 如有侵权,请联系我们删除。