0


Java 使用Websocket 与MQ消息队列实现即时消息

Java 使用Websocket 与MQ消息队列实现即时消息

项目需求:根据不同用户账号产生的数据需要即时展示到首页大屏中进行展示,实现方式
1:前端短时间内轮训调用后端接口,后端返回最新相关数据进行展示
2:使用websocket即时通信,一产生新数据,就立即发送。数据产生有MQ进行推送,保证实时性
第一种方式舍弃,频繁请求接口,大部分请求都无效请求,成本过大
实现思路:
1:建立websocket连接,缓存连接用户信息,使用session,保证即时同账号不同登录页也能接收
2:使用MQ,监听MQ产生的推送消息topic
3: MQ监听消息处理类接收消息,根据消息处理业务情况,并根据数据筛选出需要推送到所属用户
4:保持在线用户连接,定时任务每30秒发送缓存内还保持连接的用户心跳数据
技术选型使用:netty-websocket
详细说明查看:
https://gitee.com/Yeauty/netty-websocket-spring-boot-starter
websocket在线测试工具,可在线测试:
http://coolaf.com/tool/chattest

前言

在实际开发使用过程中,产线环境都是使用HTTPS 以及配合 Nginx进行使用,
但是在测试环境下,自己则是通过ws 的方式进行连接测试,即:ws://IP地址 + 端口号/websocket
所以关于HTTPS下使用 wss 协议的问题,以及配合 Nginx 使用域名方式建立连接
不使用 IP地址 + 端口号 连接 WebSocket,因为这种方式不够优雅

ws 和 wss 又是什么鬼?

Websocket使用 ws 或 wss 的统一资源标志符,类似于 HTTP 或 HTTPS
其中 wss 表示在 TLS 之上的 Websocket ,相当于 HTTPS 了
如:
ws://example.com/Websocket
wss://example.com/Websocket
默认情况下,Websocket 的 ws 协议使用 80 端口;运行在TLS之上时,wss 协议默认使用 443 端口。其实说白了,wss 就是 ws 基于 SSL 的安全传输,与 HTTPS 一样样的道理。
如果你的网站是 HTTPS 协议的,那你就不能使用 ws:// 了
浏览器会 block 掉连接,和 HTTPS 下不允许 HTTP 请求一样,如下图:

ws_https

Mixed Content: The page at 'https://domain.com/' was loaded over HTTPS, but attempted to connect to the insecure WebSocket endpoint 'ws://x.x.x.x:xxxx/'. This request has been blocked; this endpoint must be available over WSS.
这种情况,我们就需要使用 wss:\\ 安全协议了
如果把ws的方式来去使用wss的时候

在这里插入图片描述

VM512:35 WebSocket connection to 'wss://IP地址:端口号/websocket' failed: Error in connection establishment: net::ERR_SSL_PROTOCOL_ERROR
很明显 SSL 协议错误,说明就是证书问题了。
这时候我们一直拿的是 IP地址 + 端口号 这种方式连接 WebSocket 的
这没有证书存在,生产环境不可能用 IP地址 + 端口号 这种方式连接 WebSocket 的
要用域名方式连接 WebSocket 。

Nginx 配置域名支持 WSS

Nginx配置 HTTPS 域名位置加入配置:

location /websocket {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";}

接着拿域名再次连接试一下,不出意外会看 101 状态码:
在这里插入图片描述

这样就完成了在 HTTPPS 下以域名方式连接 WebSocket 
接下来直接上代码

Maven导包

<dependency><groupId>org.yeauty</groupId><artifactId>netty-websocket-spring-boot-starter</artifactId><version>0.12.0</version></dependency>

JAVA代码

websocket处理类

packagecom.biz.controller.home;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.TypeReference;importcom.redis.provider.impl.StringRedisProvider;importcom.core.constant.SecurityConstant;importcom.security.def.BoyunLoginUser;importcom.security.def.BoyunUserDTO;importcom.biz.def.UserInfoWebSocket;importcom.biz.def.WebSocketServerDto;importio.netty.handler.timeout.IdleStateEvent;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.factory.annotation.Autowired;importorg.yeauty.annotation.*;importorg.yeauty.pojo.Session;importjava.io.IOException;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.atomic.AtomicInteger;/**
 * webSocket
 *
 * @author 夕四
 * @date 2022-01-24 16:10
 **/@Slf4j@ServerEndpoint(path ="/ws/{sid}", port ="9011")publicclassMyWebSocket{/**
     * 当前在线连接数
     */publicstaticfinalAtomicInteger ONLINE_NUM =newAtomicInteger(0);/**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */privatestaticConcurrentHashMap<String,WebSocketServerDto> webSocketMap =newConcurrentHashMap<>();@AutowiredprivateStringRedisProvider stringRedisProvider;@BeforeHandshakepublicvoidhandshake(Session session){
        session.setSubprotocols("stomp");}@OnOpenpublicvoidonOpen(Session session,@PathVariableString sid){// 连接数 +1
        ONLINE_NUM.incrementAndGet();
        log.info("{}连接成功,当前在线数量:{}", sid, ONLINE_NUM.get());if(StringUtils.isBlank(sid)){
            session.close();return;}
        log.info(">>>>>>>>>>>>>>>>> sid:{}", sid);
        session.setAttribute(SecurityConstant.TOKEN, sid);
        log.info(">>>>>>>>>>>>>>>>> sid2:{}", sid);// redis判断sid的用户值String value = stringRedisProvider.get(SecurityConstant.PROJECT_PREFIX + sid);
        log.info(">>>>>>>>>>>>>>>>> value:{}", value);if(StringUtils.isBlank(value)){
            log.debug("{} 未登录", sid);
            session.close();return;}BoyunLoginUser<BoyunUserDTO> tokenUser = JSON.parseObject(value,newTypeReference<BoyunLoginUser<BoyunUserDTO>>(){});
        log.info(">>>>>>>>>>>>>>>>> tokenUser:{}", tokenUser.toString());// 绑定自定义属性Long userId = tokenUser.getUser().getUserId();
        session.setAttribute(SecurityConstant.INNER_USER_ID, userId);WebSocketServerDto webSocketServerDto = webSocketMap.get(sid);if(webSocketServerDto !=null){
            log.info("关闭");// 如果已存在,先关闭以前的连接,可能会出现覆盖后再添加
            webSocketServerDto.getSession().close();}
        webSocketServerDto =newWebSocketServerDto();
        webSocketServerDto.setSession(session);
        webSocketMap.put(sid, webSocketServerDto);UserInfoWebSocket.setSessionIdMap(userId, sid);}@OnClosepublicvoidonClose(Session session)throwsIOException{// 连接数 -1
        ONLINE_NUM.decrementAndGet();String sid = session.getAttribute(SecurityConstant.TOKEN);Long userId = session.getAttribute(SecurityConstant.INNER_USER_ID);
        log.info("{} 连接关闭,在线数量:{}", sid, ONLINE_NUM.get());
        webSocketMap.remove(sid);UserInfoWebSocket.delSessionIdMap(userId, sid);}@OnErrorpublicvoidonError(Session session,Throwable throwable){
        log.error("发生错误的连接:{},userId:{}", session.getAttribute(SecurityConstant.TOKEN), session.getAttribute(SecurityConstant.INNER_USER_ID));}@SneakyThrows@OnMessagepublicvoidonMessage(Session session,String message){if(!"123456789".equals(message)){//            session.sendText(message);//            log.info("发送消息:{}", message);}}@OnBinarypublicvoidonBinary(Session session,byte[] bytes){// 不打印心跳数据if(!"123456789".equals(newString(bytes))){
            log.info("收到客户端:{} 的消息:{}", session.getAttribute(SecurityConstant.TOKEN),newString(bytes));//            session.sendText("123456789");}}@OnEventpublicvoidonEvent(Session session,Object evt){if(evt instanceofIdleStateEvent){IdleStateEvent idleStateEvent =(IdleStateEvent) evt;switch(idleStateEvent.state()){case READER_IDLE:
                    log.info("读数据闲置");break;case WRITER_IDLE:
                    log.info("写数据闲置");break;case ALL_IDLE:
                    log.info("读、写数据闲置");break;default:break;}}}/**
     * 发送自定义消息
     */publicstaticvoidsendInfo(String message,String sid){// 任意一个参数是空,返回falseif(StringUtils.isAnyBlank(message, sid)){
            log.debug("参数错误,sid:{},message:{}", sid, message);}
        log.debug("推送消息到窗口{},推送内容: {}", sid, message);WebSocketServerDto webSocketServer = webSocketMap.get(sid);if(webSocketServer !=null){Session session = webSocketServer.getSession();if(session ==null){
                log.error("{} 不存在session", sid);return;}// 不活跃了if(!session.isActive()){// 移除Long userId = session.getAttribute(SecurityConstant.INNER_USER_ID);
                webSocketMap.remove(sid);UserInfoWebSocket.delSessionIdMap(userId, sid);}
            session.sendText(message);}}}

WebSocket 需要推送的用户信息缓存

packagecom.biz.def;importlombok.extern.slf4j.Slf4j;importjava.util.*;importjava.util.concurrent.locks.ReentrantReadWriteLock;/**
 * WebSocket信息缓存
 *
 * @author 夕四
 * @date 2022-02-23
 */@Slf4jpublicclassUserInfoWebSocket{/**
     * key为锁id,value为锁id对应的用户列表
     */privatestaticMap<Long,Set<String>> cachedMap =newHashMap<>();privatestaticReentrantReadWriteLock rwlock =newReentrantReadWriteLock();/**
     * 根据用户id获取sessionId
     *
     * @param userId
     * @return
     */publicstaticSet<String>getSessionIdSet(Long userId){
        rwlock.readLock().lock();try{return cachedMap.get(userId);}finally{
            rwlock.readLock().unlock();}}/**
     * 根据用户id集合获取sessionId集合
     *
     * @param userIdList
     * @return
     */publicstaticSet<String>getSessionIdSetByUserIdList(List<Long> userIdList){
        rwlock.readLock().lock();Set<String> result =newHashSet<>();try{
            log.info("<<<<<<<<<<<<<<<<<<<<   cachedMap:{}",cachedMap);for(Long userId : userIdList){Set<String> sidList = cachedMap.get(userId);if(sidList !=null&&!sidList.isEmpty()){
                    result.addAll(sidList);}}}finally{
            rwlock.readLock().unlock();}return result;}/**
     * 获取sessionId集合
     *
     * @return
     */publicstaticSet<String>getSessionIdSet(){
        rwlock.readLock().lock();Set<String> result =newHashSet<>();try{for(Map.Entry<Long,Set<String>> longSetEntry : cachedMap.entrySet()){Set<String> sidList = longSetEntry.getValue();
                result.addAll(sidList);}}finally{
            rwlock.readLock().unlock();}return result;}/**
     * 添加用户id对应的sessionId
     *
     * @param userId
     * @param sid
     */publicstaticvoidsetSessionIdMap(Long userId,String sid){
        rwlock.writeLock().lock();try{Set<String> list = cachedMap.get(userId);if(list ==null){
                list =newHashSet<>();}
            list.add(sid);
            cachedMap.put(userId, list);}finally{
            rwlock.writeLock().unlock();}}/**
     * 删除某个用户的sessionId
     *
     * @param userId
     * @param sid
     */publicstaticvoiddelSessionIdMap(Long userId,String sid){
        rwlock.writeLock().lock();try{Set<String> set = cachedMap.get(userId);if(set ==null){return;}
            set.remove(sid);if(set.isEmpty()){
                cachedMap.remove(userId);}else{
                cachedMap.put(userId, set);}}finally{
            rwlock.writeLock().unlock();}}/**
     * 删除用户id
     *
     * @param userId
     */publicstaticvoiddelUserId(Long userId){
        rwlock.writeLock().lock();try{
            cachedMap.remove(userId);}finally{
            rwlock.writeLock().unlock();}}}

webSocket传输对象

packagecom.biz.def;importorg.yeauty.pojo.Session;importjava.util.Date;importjava.util.Objects;/**
 * webSocket传输对象
 */publicclassWebSocketServerDto{/**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */privateSession session;/**
     * 接收sid
     */privateString sid;/**
     * 心跳时间
     */privateDate heartTime;publicWebSocketServerDto(){}publicWebSocketServerDto(Session session,String sid,Date heartTime){this.session = session;this.sid = sid;this.heartTime = heartTime;}publicSessiongetSession(){return session;}publicvoidsetSession(Session session){this.session = session;}publicStringgetSid(){return sid;}publicvoidsetSid(String sid){this.sid = sid;}publicDategetHeartTime(){return heartTime;}publicvoidsetHeartTime(Date heartTime){this.heartTime = heartTime;}@Overridepublicbooleanequals(Object o){if(this== o)returntrue;if(o ==null||getClass()!= o.getClass())returnfalse;WebSocketServerDto that =(WebSocketServerDto) o;returnObjects.equals(session, that.session)&&Objects.equals(sid, that.sid)&&Objects.equals(heartTime, that.heartTime);}@OverridepublicinthashCode(){returnObjects.hash(session, sid, heartTime);}@OverridepublicStringtoString(){return"WebSocketServerDto{"+"session="+ session +", sid='"+ sid +'\''+", heartTime="+ heartTime +'}';}}

websocket的推送消息体

packagecom.bsj.studentcard.upms.pc.biz.def;importlombok.Data;importlombok.NoArgsConstructor;/**
 * websocket推送消息
 *
 * @author 夕四
 * @date 2022-02-23 12:55
 **/@Data@NoArgsConstructorpublicclassWsMsgDataVO<T>{/**
     * 推送数据
     */privateT data;/**
     * 标记标签
     */privateString tag;publicWsMsgDataVO(T data,String tag){this.data = data;this.tag = tag;}}

MQ的topic监听

packagecom.bsj.studentcard.upms.pc.biz.config.mq;importorg.springframework.cloud.stream.annotation.Input;importorg.springframework.cloud.stream.annotation.Output;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.SubscribableChannel;/**
 * 消息接口
 *
 * @author 夕四
 * @date 2022-05-14 16:25
 **/publicinterfaceMySource{/**
     * 报警回调
     */String ALARM_SOS ="alarmSos";/**
     * 报警回调发送
     *
     * @return
     */@Input(ALARM_SOS)SubscribableChannelalarmSos();}

MQ监听消息处理websock发送

packagecom.bsj.studentcard.upms.pc.biz.config.mq;importcn.hutool.core.bean.BeanUtil;importcn.hutool.core.collection.CollUtil;importcn.hutool.core.date.DateUtil;importcn.hutool.core.util.StrUtil;importcom.alibaba.fastjson.JSONObject;importcom.bsj.studentcard.upms.pc.biz.controller.home.MyWebSocket;importcom.bsj.studentcard.upms.pc.biz.def.UserInfoWebSocket;importcom.bsj.studentcard.upms.pc.biz.def.WsMsgDataVO;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.spring.support.RocketMQHeaders;importorg.springframework.cloud.stream.annotation.StreamListener;importorg.springframework.messaging.Message;importorg.springframework.messaging.MessageHeaders;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;importorg.springframework.util.MimeTypeUtils;importjava.util.Arrays;importjava.util.HashMap;importjava.util.List;importjava.util.Set;importjava.util.function.Function;importjava.util.stream.Collectors;/**
 * 发送消息配置
 *
 * @author 夕四
 * @date 2022-05-14 16:29
 **/@Slf4j@Component@RequiredArgsConstructorpublicclassMqSenderService{privatefinalMySource source;/**
     * 需要MQ推送至websocket的topic
     *
     * @param msg
     */@StreamListener(MySource.ALARM_SOS)publicvoidsosAlarm(Message<String> message){String result = message.getPayload();if(StrUtil.isEmpty(result)){
            log.warn("检测到报警消息为空");return;}List<AlarmLogDTO> oCardCallBackVOS =JSONObject.parseArray(result,AlarmLogDTO.class);if(CollUtil.isEmpty(oCardCallBackVOS)){
            log.warn("检测到SOS报警消息为空");return;}
        log.info("监听到sos报警输出为:{}", result);//接下来的就是业务处理数据List<Long> cardIds = oCardCallBackVOS.stream().map(AlarmLogDTO::getCardId).distinct().collect(Collectors.toList());List<UserDataVO> userDataVOS =CommonBaseCacheForest.listTopUserList(cardIds);HashMap<Long,UserDataVO> commonMap = userDataVOS.stream().collect(Collectors.toMap(UserDataVO::getCardId,Function.identity(),(key1, key2)-> key2,HashMap::new));for(AlarmLogDTO callBack : oCardCallBackVOS){Long cardId = callBack.getCardId();//根据产生的数据查找对应推送到所属的用户UserDataVO userDataVO = commonMap.get(cardId);List<Long> userIds = userDataVO.getUserId();if(CollUtil.isEmpty(userIds)){
                log.info("该数据不属于任何用户,cardId:{}", cardId);continue;}Set<String> sidSet =UserInfoWebSocket.getSessionIdSetByUserIdList(userIds);if(CollUtil.isEmpty(sidSet)){
                log.info("当前用户没一个登录,不发送");continue;}//组装推送消息的消息体OAlarmLogPageVO oAlarmLogPageVO =formatData(callBack, commonDTO);WsMsgDataVO<OAlarmLogPageVO> msg =newWsMsgDataVO<OAlarmLogPageVO>(oAlarmLogPageVO,AlarmTypeEnum.SOS_ALARM.name());
            sidSet.forEach(sid ->MyWebSocket.sendInfo(JSONObject.toJSONString(msg), sid));}}/**
     * 每30秒对缓存起来的用户保持心跳连接,防止掉线
     *
     */@Scheduled(cron ="*/30 * * * * ?")publicvoidkeepHeartbeat(){Set<String> sidSet =UserInfoWebSocket.getSessionIdSet();WsMsgDataVO<String> msg =newWsMsgDataVO<String>("心跳连接","heartBeat");
        sidSet.forEach(sid ->MyWebSocket.sendInfo(JSONObject.toJSONString(msg), sid));}}
标签: websocket java rabbitmq

本文转载自: https://blog.csdn.net/weixin_41451078/article/details/125464209
版权归原作者 夕四丶 所有, 如有侵权,请联系我们删除。

“Java 使用Websocket 与MQ消息队列实现即时消息”的评论:

还没有评论