0


WebSocket的那些事(5-Spring STOMP支持之连接外部消息代理)

目录

一、序言

上节我们在 WebSocket的那些事(4-Spring中的STOMP支持详解) 中详细说明了通过

Spring内置消息代理

结合STOMP子协议进行Websocket通信,以及相关注解的使用及原理。

但是Spring内置消息代理会有一些限制,比如只支持STOMP协议的一部分命令,像

acks

receipts

命令都是不支持的,还有由于内置消息代理把消息存储在内存,当应用不可用时,客户端也就订阅不到到后台推送的消息。

这节我们将会使用支持STOMP协议的外部消息代理(

RabbitMQ

)进行Websocket通信。


二、开启RabbitMQ外部消息代理

服务端路由发送消息以及客户端订阅消息都要通过STOMP协议与RabbitMQ进行交互,由于RabbitMQ默认没有启动STOMP插件,因此我们需要先启用该插件。

rabbitmq-plugins enable rabbitmq_stomp

启动该插件后,RabbitMQ中

STOMP适配器

默认会监听

61613

端口,如果是云服务器,需要把该端口在安全组中放开。

关于该插件说明请参考:RabbitMQ中STOMP插件说明。


三、代码示例

我们在 WebSocket的那些事(4-Spring中的STOMP支持详解)中写了一个简单的聊天Demo示例,下面我们对该聊天Demo示例进行改造,将Spring内置消息代理替换成

RabbitMQ

外部消息代理。

1、Maven依赖项

服务端和客户端与外部消息代理都是通过TCP进行通信,Spring底层默认使用的是

Netty

Reactor

,因此需要引入相关依赖项。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-reactor-netty</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency>

2、相关实体

(1) 请求消息参数

@DatapublicclassWebSocketMsgDTO{privateString name;privateString content;}

(2) 响应消息内容

@Data@Builder@AllArgsConstructor@NoArgsConstructorpublicclassWebSocketMsgVO{privateString content;}

(3) 自定义认证用户信息

@Data@AllArgsConstructor@NoArgsConstructorpublicclassStompAuthenticatedUserimplementsPrincipal{/**
     * 用户唯一ID
     */privateString userId;/**
     * 用户昵称
     */privateString nickName;/**
     * 用于指定用户消息推送的标识
     * @return
     */@OverridepublicStringgetName(){returnthis.userId;}}

3、自定义用户认证拦截器

@Slf4jpublicclassUserAuthenticationChannelInterceptorimplementsChannelInterceptor{privatestaticfinalStringUSER_ID="User-ID";privatestaticfinalStringUSER_NAME="User-Name";@OverridepublicMessage<?>preSend(Message<?> message,MessageChannel channel){StompHeaderAccessor accessor =MessageHeaderAccessor.getAccessor(message,StompHeaderAccessor.class);// 如果是连接请求,记录userIdif(StompCommand.CONNECT.equals(accessor.getCommand())){String userID = accessor.getFirstNativeHeader(USER_ID);String username = accessor.getFirstNativeHeader(USER_NAME);

            log.info("Stomp User-Related headers found, userID: {}, username:{}", userID, username);
            accessor.setUser(newStompAuthenticatedUser(userID, username));}return message;}}

4、Websocket外部消息代理配置

Spring中与外部消息代理通信的中间方被称之为Broker Relay,它会维护一个系统共享的单一TCP连接和外部消息代理进行通信,该TCP连接仅仅适用于服务端,用来发送消息,而不是接收消息,通过Broker Relay

systemLogin

systemPasscode

属性可以设置该连接的认证信息。

Broker Relay也会为每个连接的Websocket客户端创建一个TCP连接,该连接用来接收消息,通过

clientLogin

clientPasscode

属性可以设置连接的认证信息。

/**
 * Websocket连接外部消息代理配置
 * @author Nick Liu
 * @date 2023/9/6
 */@Configuration@EnableWebSocketMessageBrokerpublicclassWebsocketExternalMessageBrokerConfigimplementsWebSocketMessageBrokerConfigurer{@OverridepublicvoidconfigureClientInboundChannel(ChannelRegistration registration){// 拦截器配置
        registration.interceptors(newUserAuthenticationChannelInterceptor());}@OverridepublicvoidregisterStompEndpoints(StompEndpointRegistry registry){
        registry.addEndpoint("/websocket")// WebSocket握手端口.addInterceptors(newHttpSessionHandshakeInterceptor()).setAllowedOriginPatterns("*")// 设置跨域.withSockJS();// 开启SockJS回退机制}@OverridepublicvoidconfigureMessageBroker(MessageBrokerRegistry registry){
        registry.setApplicationDestinationPrefixes("/app")// 发送到服务端目的地前缀.enableStompBrokerRelay("/topic")// 开启外部消息代理,指定消息订阅前缀.setRelayHost("localhost")// 外部消息代理Host.setRelayPort(61613)// 外部消息代理STOMP端口.setSystemLogin("admin")// 共享系统连接用户名,该连接主要用来发送消息.setSystemPasscode("admin")// 共享系统连接密码,该连接主要用来发送消息.setClientLogin("admin")// 客户端连接用户名,该连接主要用来接收消息.setClientPasscode("admin")// 客户端连接密码,该连接主要用来接收消息.setVirtualHost("/stomp");// RabbitMQ虚拟主机}}

备注:我们可以为服务端与客户端的连接设置不同的用户,针对客户端连接用户进行权限管控,保证系统的安全性,在这里为了方便测试我们统一用一个用户。

5、ChatController

STOMP协议并没有规定消息代理必须支持哪种类型的

Destinations(目的地)

,但是RabbitMQ STOMP适配器只支持一些指定的目的地类型,如下图:
在这里插入图片描述

  • /exchange:指定交换机和路由key,发送和订阅来自队列的消息。
  • /queue:发送和订阅受STOMP网关管理的队列的消息,最多只有一个订阅者能到消息。
  • /amq/queue:发送和订阅不受STOMP网关管理的队列的消息。
  • /topic:发送和订阅来自临时或者持久Topic的消息,多个订阅者都能接收到消息。
  • /temp-queue/:发送和订阅来自临时队列的消息。

参考文档见:RabbitMQ中STOMP插件说明。

在下面的示例中,我们选用了

/topic

的开头的消息发送和订阅前缀,目的地格式只能为

/topic/{routing-key}

routing-key不能有斜杠,否则会报错。

@Slf4j@Controller@RequiredArgsConstructorpublicclassChatController{privatefinalSimpUserRegistry simpUserRegistry;privatefinalSimpMessagingTemplate simpMessagingTemplate;/**
     * 模板引擎为Thymeleaf,需要加上spring-boot-starter-thymeleaf依赖,
     * @return
     */@GetMapping("/page/chat")publicModelAndViewturnToChatPage(){returnnewModelAndView("chat");}/**
     * 群聊消息处理
     * 这里我们通过@SendTo注解指定消息目的地为"/topic/chat/group",如果不加该注解则会自动发送到"/topic" + "/chat/group"
     * @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象
     * @return 消息内容,方法返回值将会广播给所有订阅"/topic/chat/group"的客户端
     */@MessageMapping("/chat/group")@SendTo("/topic/chat-group")publicWebSocketMsgVOgroupChat(WebSocketMsgDTO webSocketMsgDTO){
        log.info("Group chat message received: {}",FastJsonUtils.toJsonString(webSocketMsgDTO));String content =String.format("来自[%s]的群聊消息: %s", webSocketMsgDTO.getName(), webSocketMsgDTO.getContent());returnWebSocketMsgVO.builder().content(content).build();}/**
     * 私聊消息处理
     * 这里我们通过@SendToUser注解指定消息目的地为"/topic/chat/private",发送目的地默认会拼接上"/user/"前缀
     * 实际发送目的地为"/user/topic/chat/private"
     * @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象
     * @return 消息内容,方法返回值将会基于SessionID单播给指定用户
     */@MessageMapping("/chat/private")@SendToUser("/topic/chat-private")publicWebSocketMsgVOprivateChat(WebSocketMsgDTO webSocketMsgDTO){
        log.info("Private chat message received: {}",FastJsonUtils.toJsonString(webSocketMsgDTO));String content ="私聊消息回复:"+ webSocketMsgDTO.getContent();returnWebSocketMsgVO.builder().content(content).build();}/**
     * 定时消息推送,这里我们会列举所有在线的用户,然后单播给指定用户。
     * 通过SimpMessagingTemplate实例可以在任何地方推送消息。
     */@Scheduled(fixedRate =10*1000)publicvoidpushMessageAtFixedRate(){
        log.info("当前在线人数: {}", simpUserRegistry.getUserCount());if(simpUserRegistry.getUserCount()<=0){return;}// 这里的Principal为StompAuthenticatedUser实例Set<StompAuthenticatedUser> users = simpUserRegistry.getUsers().stream().map(simpUser ->StompAuthenticatedUser.class.cast(simpUser.getPrincipal())).collect(Collectors.toSet());

        users.forEach(authenticatedUser ->{String userId = authenticatedUser.getUserId();String nickName = authenticatedUser.getNickName();WebSocketMsgVO webSocketMsgVO =newWebSocketMsgVO();
            webSocketMsgVO.setContent(String.format("定时推送的私聊消息, 接收人: %s, 时间: %s", nickName,LocalDateTime.now()));

            log.info("开始推送消息给指定用户, userId: {}, 消息内容:{}", userId,FastJsonUtils.toJsonString(webSocketMsgVO));
            simpMessagingTemplate.convertAndSendToUser(userId,"/topic/chat-push", webSocketMsgVO);});}}

6、前端页面chat.html

<!DOCTYPEhtml><htmllang="en"xmlns:th="http://www.thymeleaf.org"><head><metacharset="UTF-8"><title>chat</title><scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script><scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script><scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.4/jquery.min.js"></script><style>#mainWrapper{width: 600px;margin: auto;}</style></head><body><divid="mainWrapper"><div><labelfor="username"style="margin-right: 5px">姓名:</label><inputid="username"type="text"/></div><divid="msgWrapper"><pstyle="vertical-align: top">发送的消息:</p><textareaid="msgSent"style="width: 600px;height: 100px"></textarea><pstyle="vertical-align: top">收到的群聊消息:</p><textareaid="groupMsgReceived"style="width: 600px;height: 100px"></textarea><pstyle="vertical-align: top">收到的私聊消息:</p><textareaid="privateMsgReceived"style="width: 600px;height: 200px"></textarea></div><divstyle="margin-top: 5px;"><buttononclick="connect()">连接</button><buttononclick="sendGroupMessage()">发送群聊消息</button><buttononclick="sendPrivateMessage()">发送私聊消息</button><buttononclick="disconnect()">断开连接</button></div></div><scripttype="text/javascript">$(()=>{$('#msgSent').val('');$("#groupMsgReceived").val('');$("#privateMsgReceived").val('');});let stompClient =null;// 连接服务器constconnect=()=>{const header ={"User-ID":newDate().getTime().toString(),"User-Name":$('#username').val()};const ws =newSockJS('http://localhost:8080/websocket');
        stompClient = Stomp.over(ws);
        stompClient.connect(header,()=>subscribeTopic());}// 订阅主题constsubscribeTopic=()=>{alert("连接成功!");// 订阅广播消息
        stompClient.subscribe('/topic/chat-group',function(message){
                console.log(`Group message received : ${message.body}`);const resp =JSON.parse(message.body);const previousMsg =$("#groupMsgReceived").val();$("#groupMsgReceived").val(`${previousMsg}${resp.content}\n`);});// 订阅单播消息
        stompClient.subscribe('/user/topic/chat-private',message=>{
                console.log(`Private message received : ${message.body}`);const resp =JSON.parse(message.body);const previousMsg =$("#privateMsgReceived").val();$("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);});// 订阅定时推送的单播消息
        stompClient.subscribe(`/user/topic/chat-push`,message=>{
                console.log(`Private message received : ${message.body}`);const resp =JSON.parse(message.body);const previousMsg =$("#privateMsgReceived").val();$("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);});};// 断连constdisconnect=()=>{
        stompClient.disconnect(()=>{$("#msgReceived").val('Disconnected from WebSocket server');});}// 发送群聊消息constsendGroupMessage=()=>{const msg ={name:$('#username').val(),content:$('#msgSent').val()};
        stompClient.send('/app/chat/group',{},JSON.stringify(msg));}// 发送私聊消息constsendPrivateMessage=()=>{const msg ={name:$('#username').val(),content:$('#msgSent').val()};
        stompClient.send('/app/chat/private',{},JSON.stringify(msg));}</script></body></html>

四、测试示例

1、群聊、私聊、后台定时推送测试

启动应用程序,日志打印显示系统连接建立成功,如下:

在这里插入图片描述

打开浏览器访问

http://localhost:8080/page/chat

可进入聊天页,同时打开两个窗口访问。
在这里插入图片描述


在这里插入图片描述

2、登录RabbitMQ控制台查看队列信息

在这里插入图片描述
可以看到所有消息都发送到了

amq.topic

交换机上(Topic类型), RabbitMQ会为每个连接的客户端创建3个队列。

因为我们在

ChatController

中定义了三个目的地,Routing Key分别是

/topic/chat-group

/topic/chat-private

/topic/chat-push

。群聊消息目的地

/topic/chat-group

绑定了两个队列,用于实现广播订阅,其它两个Routing Key分别绑定到了不同的队列上,实现唯一订阅。


五、结语

下一节我们将会详细说明RabbitMQ STOMP适配器支持的各种消息目的地类型的区别以及适用场景。

在这里插入图片描述


本文转载自: https://blog.csdn.net/lingbomanbu_lyl/article/details/132716636
版权归原作者 凌波漫步& 所有, 如有侵权,请联系我们删除。

“WebSocket的那些事(5-Spring STOMP支持之连接外部消息代理)”的评论:

还没有评论