EMQX Webhook+java+MySQL曲折的持久化尝试
背景:EMQX开源版不提供数据持久化插件使用,可能不好应用到实际,就随意写个demo
环境:Ubuntu 20.0.4(测试)
IDEA 2021.2.2
Mysql 8.0.26
EMQ X 4.4.3
JDK 1.8(开发)
Web服务器用到的框架SpringBoot、Mybatis
可能存在问题:存储数据的延迟较高,不支持大数据量等等
Tips:如果会用PHP(可惜我不会)
可以参考B站视频教程利用webhook将mqtt数据保存到Mysql数据库
零、思路
来自https://blog.csdn.net/weixin_44821644/article/details/101388095
思路:设备的数据上传到emqx服务器,我们需要一个web服务器来接收EMQX服务器post过来数据,然后再将数据保存到数据库。
一、最终效果
物理机Windows 10使用EMQX测试时,需要一些关闭防火墙的操作,所以索性在虚拟机中测试
虚拟机中需要有Java环境
根据虚拟机中的数据库配置使用相应的配置文件,主要就是username、password、database的区别
java -jar userdemo.jar -spring.config.location=/home/ubuntu64/java/appliction.properties
用/queryAll查看数据
多打印几次数据总不是坏事
二、Web服务器搭建
1.项目结构
不正确的四层架构
2.项目依赖
pom.xml
<!-- JDK1.8--><properties><java.version>1.8</java.version></properties><dependencies><!-- mybatis--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.1</version></dependency><!-- //JDBC--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><!-- //Web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- //数据库MySql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- //Lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- //阿里的fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.1</version></dependency><dependency><groupId>commons-beanutils</groupId><artifactId>commons-beanutils</artifactId><version>1.7.0</version></dependency></dependencies>
3.项目配置
application-dev.properties
spring.datasource.username=root
spring.datasource.password=0000
#URL建议带上“?”后面的参数设定时区和编码方式
spring.datasource.url=jdbc:mysql://localhost:3306/exp_4?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
mybatis.type-aliases-package=com.vote01
mybatis.mapper-locations=classpath:mybatis/mapper/*.xml
4.具体代码
User
packagecom.vote01.Dao;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;@Data@NoArgsConstructor@AllArgsConstructorpublicclassUser{privateint id;privateString uname;privateString pwd;}
UserMapper(接口类)
packagecom.vote01.Mapper;importcom.vote01.Dao.User;importorg.apache.ibatis.annotations.Mapper;importorg.springframework.stereotype.Repository;importjava.util.List;@Mapper@RepositorypublicinterfaceUserMapper{List<User>queryUserList();void addUser (User user);}
UserMapper.xml
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPEmapperPUBLIC"-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mappernamespace="com.vote01.Mapper.UserMapper"><selectid="queryUserList"resultType="com.vote01.Dao.User">
select * from user ;
</select><insertid="addUser"parameterType="com.vote01.Dao.User">
insert into user (id,uname ,pwd) values (#{id},#{uname},#{pwd})
</insert></mapper>
UserService
packagecom.vote01.Service;importcom.vote01.Dao.User;importcom.vote01.Mapper.UserMapper;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.util.Collection;importjava.util.List;@ServicepublicclassUserService{@AutowiredprivateUserMapper userMapper;publicList<User>queryUserList(){return userMapper.queryUserList();}publicvoidaddUser(User user){
userMapper.addUser(user);}}
EMQXController
在这边捣鼓很久
/quick3:接收类型是HttpServletRequest,从中提取数据,使用阿里fastjson提供的方法封装成对象,再调用Service…
/add:Windows10 下简单写个网页表单提交测试可以完成保存,信心满满配置emqx,但是用不了
broker测试的时候接收过来显示为NULL,后面尝试过换成其他接收参数类型也没成功,最后选定HttpServletRequest类型写出quick3,可能是用的方法不对导致Windows10下通不过,虚拟机环境可以正确完成数据保存
简直人麻了
#Windows10打印提交的表单数据id=102&uname=b&pwd=123456#Ubuntu打印,经典的json数组形式{"id":102,"uname":"b","pwd":"123456"}
具体情况具体分析,主要还是得先看看EMQX服务器POST请求发过来的数据到底有什么
packagecom.vote01.Controller;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importcom.vote01.Dao.User;importcom.vote01.Service.UserService;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.*;importjavax.servlet.ServletInputStream;importjavax.servlet.http.HttpServletRequest;importjava.io.IOException;importjava.util.List;@RestControllerpublicclassEMQXController{@AutowiredprivateUserService userService;@GetMapping("/queryAll")publicList<User>queryAll(){List<User> userList = userService.queryUserList();for(User user : userList){System.out.println(user);}return userList;}@PostMapping("/quick3")publicStringquick3(HttpServletRequest request){ServletInputStream is =null;try{
is = request.getInputStream();StringBuilder sb =newStringBuilder();byte[] buf =newbyte[1024];int len =0;while((len = is.read(buf))!=-1){
sb.append(newString(buf,0, len));}System.out.println("sb:"+sb.toString());/*****************************//*封装对象*//*建议注释掉这部分,先看传过来的数据是怎么样的形式*/String temp=sb.toString();System.out.println("temp:"+temp);JSONObjectJ=JSON.parseObject(temp);System.out.println("json:"+J);User user =J.toJavaObject(User.class);System.out.println("user:"+user);
userService.addUser(user);System.out.println("Save===>:"+user);/*****************************/return"获取到的文本内容为:"+ sb.toString();}catch(IOException e){
e.printStackTrace();}finally{try{if(is !=null){
is.close();}}catch(IOException e){
e.printStackTrace();}}returnnull;}/*
@PostMapping("/add")
public String add(@RequestParam Map<String, Object> params) {
// User user = new User(id, name, password);
// userMapper.addUser(user);
// System.out.println("save====>" + user);
System.out.println("save====>" + params);
int id = Integer.parseInt(params.get("id").toString());
String uname = params.get("uname").toString();
String pwd = params.get("pwd").toString();
User user = new User(id, uname, pwd);
userMapper.addUser(user);
System.out.println("save====>" + user);
return "ok";
}
*/}
5.Mysql
use exp_4
createtableuser(
id intnotnullprimarykey,
uname varchar(30)null,
pwd varchar(30)null);INSERTINTO exp_4.user(id, uname, pwd)VALUES(102,'b','123456');INSERTINTO exp_4.user(id, uname, pwd)VALUES(103,'b','3456');INSERTINTO exp_4.user(id, uname, pwd)VALUES(104,'c','1256');INSERTINTO exp_4.user(id, uname, pwd)VALUES(105,'s','123456');INSERTINTO exp_4.user(id, uname, pwd)VALUES(106,'r','1256');INSERTINTO exp_4.user(id, uname, pwd)VALUES(107,'y','123456');
6.打包
测试接口
/queryAll
后用Maven打包成jar包丢进虚拟机测试
接口/quick3在windows 10环境下尝试网页提交表单没成功
!!!省略了一部分打包前置操作
三、EMQX的部署
1.设置规则引擎
目的是从Playload中提取需要的数据,并按照一定的键值对格式封装成JSON
SQL测试
2.设置响应动作&新建资源
此时Web服务器(我用的8080端口)已在后台运行
URL
http://localhost:8080/quick3
测试连接显示连接可用
部分环境下URL可写成
http://localhost/quick3
3.测试
最后,我们需要进行测试,进入websocket,连接,发布消息到指定主题
windows10就是在这里测试的时候要关防火墙
当然关了防火墙也可能死活连不上
版权归原作者 WENDY_PRIDE 所有, 如有侵权,请联系我们删除。