通常客户端只会主动发送心跳消息,目的是为了保持与服务端连接,而其他消息往往需要服务端发送消息至客户端调取。
实现步骤
- 客户端在第一次与服务端建立连接时,将此连接的通道在
Map
中保存下来,为了保证线程安全,可以使用线程安全的ConcurrentHashMap
。 - 在发送消息给客户端时,通过设备标识遍历
ConcurrentHashMap
找到目标客户端连接通道。找到后先判断通道是否存活,如果连接是存活状态,就通过此通道发送消息给客户端,如果不是存活状态,就从Map
中删除此通道信息。 - 将消息发送至客户端后,服务端正常接收客户端传回的信息。
实现代码
前两篇文章中已经提供了
netty
的整体框架代码,这里只提供一些核心的关键代码,其余代码不再赘述。
指路:
- Netty系列(一):Springboot整合Netty,自定义协议实现
- Netty系列(二):Netty拆包/沾包问题的解决方案
新建一个
ChannelMap
类,在客户端第一次连接时保存
channel
连接。后续服务端向客户端发送消息时,先从
Map
中找到对应的客户端消息通道连接,再向通道中写入消息进行发送。
/**
* @Author 鳄鱼儿
* @Description 连接通道保存MAP
* @date 2022/11/27 16:30
* @Version 1.0
*/publicclassChannelMap{/**
* 存放客户端标识ID(消息ID)与channel的对应关系
*/privatestaticvolatileConcurrentHashMap<String,Channel> channelMap =null;privateChannelMap(){}publicstaticConcurrentHashMap<String,Channel>getChannelMap(){if(null== channelMap){synchronized(ChannelMap.class){if(null== channelMap){
channelMap =newConcurrentHashMap<>();}}}return channelMap;}publicstaticChannelgetChannel(String id){returngetChannelMap().get(id);}}
在客户端建立连接(服务端收到心跳消息)时,将channel加入map中。
publicclassServerListenerHandlerextendsSimpleChannelInboundHandler<Message>{privatestaticfinalLogger log =LoggerFactory.getLogger(ServerListenerHandler.class);/**
* 设备接入连接时处理
*
* @param ctx
*/@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx){
log.info("有新的连接:[{}]", ctx.channel().id().asLongText());}/**
* 数据处理
*
* @param ctx
* @param msg
*/@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,Message msg){// 获取消息实例中的消息体String content = msg.getContent();// 对不同消息类型进行处理MessageEnum type =MessageEnum.getStructureEnum(msg);switch(type){caseCONNECT:// 将通道加入ChannelMapChannelMap.getChannelMap().put(msg.getId(), ctx.channel());// 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户IDAttributeKey<String> key =AttributeKey.valueOf("id");
ctx.channel().attr(key).setIfAbsent(msg.getId());// TODO 心跳消息处理caseSTATE:// TODO 设备状态default:System.out.println(type.content +"消息内容"+ content);}}/**
* 设备下线处理
*
* @param ctx
*/@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx){
log.info("设备下线了:{}", ctx.channel().id().asLongText());// map中移除channelremoveId(ctx);}/**
* 设备连接异常处理
*
* @param ctx
* @param cause
*/@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause){// 打印异常
log.info("异常:{}", cause.getMessage());// map中移除channelremoveId(ctx);// 关闭连接
ctx.close();}privatevoidremoveId(ChannelHandlerContext ctx){AttributeKey<String> key =AttributeKey.valueOf("id");// 获取channel中idString id = ctx.channel().attr(key).get();// map移除channelChannelMap.getChannelMap().remove(id);}}
写一个服务端发送消息的业务层类,并通过客户端id在map中获取到channel通道,将消息转化成json字符串后,通过
writeAndFlush
发送至客户端。
/**
* @Author 鳄鱼儿
* @Description 向客户端发送消息
* @date 2022/11/27 17:29
* @Version 1.0
*/@ServicepublicclassPushMsgServiceImplimplementsPushMsgService{/**
* 向一个客户端发送消息
*
* @param msg
*/@Overridepublicvoidpush(Message msg){// 客户端IDString id = msg.getId();Channel channel =ChannelMap.getChannel(id);if(null== channel){thrownewRuntimeException("客户端已离线");}
channel.writeAndFlush(msg);}}
注意:
writeAndFlush
参数是自定义编码的泛型对象实例。如本文自定义的
Message
消息解析类。
publicclassMessageEncodeHandlerextendsMessageToByteEncoder<Message>{privatestaticString delimiter;publicMessageEncodeHandler(String delimiter){this.delimiter = delimiter;}@Overrideprotectedvoidencode(ChannelHandlerContext channelHandlerContext,Message message,ByteBuf out)throwsException{
out.writeBytes((message.toJsonString()+ delimiter).getBytes(CharsetUtil.UTF_8));}}
之后再编写一个
Controller
类(这里省略),在
Controller
类中调用
PushMsgService
中pushff,就可以完成对客户端的消息发送。
版权归原作者 鳄鱼儿 所有, 如有侵权,请联系我们删除。