方案思路:
- 用户连接和角色管理: 维护一个映射关系,存储每个用户的连接信息和角色信息。可以使用
ConcurrentHashMap
存储,key 为ChannelHandlerContext
(Netty 连接上下文),value 为用户角色。 - 消息分组: 根据用户角色将用户分组,以便实现向特定角色用户推送消息。
- 消息推送: 当需要推送消息时,根据目标角色获取对应的用户连接,并通过 Netty 发送消息。
- 心跳机制: 使用心跳机制检测断开连接的用户,及时清理资源。
实现步骤:
1. 项目搭建和依赖引入:
- 创建一个 Spring Boot 项目。
- 在
pom.xml
中添加 Netty 和 Guava 依赖:
复制代码
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.87.Final</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
</dependencies>
2. 创建 Netty 服务端:
- 创建一个
NettyServer
类,使用@Component
注解将其注册为 Spring Bean。 - 在
@PostConstruct
方法中初始化 Netty 服务端,并在@PreDestroy
方法中关闭服务端资源。
复制代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class NettyServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel channel;
// 存储用户连接和角色信息
private static final Map<ChannelHandlerContext, String> userChannelMap = new ConcurrentHashMap<>();
@PostConstruct
public void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加 WebSocket 协议处理器
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 添加自定义处理器
pipeline.addLast(new WebSocketFrameHandler());
}
});
ChannelFuture future = bootstrap.bind(8081).sync();
if (future.isSuccess()) {
channel = future.channel();
System.out.println("Netty server started on port 8081");
}
}
@PreDestroy
public void stop() {
if (channel != null) {
channel.close();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
System.out.println("Netty server stopped");
}
// ... 其他代码 ...
}
3. 创建 WebSocket 处理器:
- 创建一个
WebSocketFrameHandler
类,继承SimpleChannelInboundHandler<TextWebSocketFrame>
处理 WebSocket 文本帧。
复制代码
// ... 其他代码 ...
class WebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("New client connected: " + ctx.channel().remoteAddress());
// TODO: 获取用户角色并存储到 userChannelMap 中
// 例如:userChannelMap.put(ctx, getUserRoleFromToken(ctx));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client disconnected: " + ctx.channel().remoteAddress());
// TODO: 从 userChannelMap 中移除用户
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String message = msg.text();
System.out.println("Received message from client: " + message);
// TODO: 处理接收到的消息,例如广播给其他用户
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
4. 实现消息推送方法:
- 在
NettyServer
中添加sendMessage
方法,根据目标角色发送消息:
复制代码
// ... 其他代码 ...
public void sendMessage(String message, String role) {
userChannelMap.entrySet().stream()
.filter(entry -> role == null || role.equals(entry.getValue())) // 根据角色过滤
.forEach(entry -> {
ChannelHandlerContext ctx = entry.getKey();
if (ctx.channel().isActive()) {
ctx.writeAndFlush(new TextWebSocketFrame(message));
} else {
// 处理失效连接
userChannelMap.remove(ctx);
}
});
}
5. 创建 Controller 提供发送消息接口:
- 创建一个
MessageController
,提供 RESTful 接口发送消息:
复制代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private NettyServer nettyServer;
@GetMapping("/send")
public String sendMessage(@RequestParam String message, @RequestParam(required = false) String role) {
nettyServer.sendMessage(message, role);
return "Message sent successfully.";
}
}
6. 处理用户认证和角色获取:
- 在
WebSocketFrameHandler
的channelActive
方法中,需要添加逻辑从连接信息中获取用户角色。这部分逻辑取决于你的认证方式,例如 JWT 认证。
复制代码
// ... 其他代码 ...
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("New client connected: " + ctx.channel().remoteAddress());
// 从 WebSocket 连接信息中获取 token
String token = getJwtTokenFromHandshake(ctx.channel());
// 验证 token 并获取用户角色
String role = validateTokenAndGetRole(token);
if (role != null) {
userChannelMap.put(ctx, role);
} else {
// 处理未授权连接
ctx.close();
}
}
7. (可选) 实现心跳机制:
- 可以使用 Netty 的
IdleStateHandler
检测空闲连接,并定期发送心跳包。 - 客户端需要响应心跳包,以保持连接活跃。
8. 客户端实现:
- 使用 JavaScript 或其他 WebSocket 客户端库连接到 Netty 服务端。
- 处理服务端推送的消息。
代码示例:
- 服务端 (SpringBoot + Netty): 参考上面步骤的代码片段。
- 客户端 (JavaScript):
复制代码
var websocket = new WebSocket("ws://localhost:8081/ws");
websocket.onopen = function(event) {
console.log("WebSocket connection opened");
};
websocket.onmessage = function(event) {
console.log("Received message: " + event.data);
// 处理接收到的消息
};
websocket.onerror = function(event) {
console.error("WebSocket error:", event);
};
function sendMessage() {
var message = document.getElementById("message").value;
websocket.send(message);
}
总结:
- 使用 Netty 可以高效地实现消息推送功能,并灵活控制消息发送目标。
- 需要根据实际需求,结合用户认证、心跳机制等功能,完善消息推送系统。
- 以上代码示例提供了一个基本框架,你可以根据自己的业务逻辑进行修改和扩展。
本文转载自: https://blog.csdn.net/waiter456/article/details/140836097
版权归原作者 双普拉斯 所有, 如有侵权,请联系我们删除。
版权归原作者 双普拉斯 所有, 如有侵权,请联系我们删除。