0


Netty系列(三):Netty服务端发送消息到客户端

通常客户端只会主动发送心跳消息,目的是为了保持与服务端连接,而其他消息往往需要服务端发送消息至客户端调取。

实现步骤

  1. 客户端在第一次与服务端建立连接时,将此连接的通道在 Map 中保存下来,为了保证线程安全,可以使用线程安全的 ConcurrentHashMap
  2. 在发送消息给客户端时,通过设备标识遍历 ConcurrentHashMap 找到目标客户端连接通道。找到后先判断通道是否存活,如果连接是存活状态,就通过此通道发送消息给客户端,如果不是存活状态,就从 Map 中删除此通道信息。
  3. 将消息发送至客户端后,服务端正常接收客户端传回的信息。

实现代码

前两篇文章中已经提供了

netty

的整体框架代码,这里只提供一些核心的关键代码,其余代码不再赘述。

指路:

  1. Netty系列(一):Springboot整合Netty,自定义协议实现
  2. Netty系列(二):Netty拆包/沾包问题的解决方案

新建一个

ChannelMap

类,在客户端第一次连接时保存

channel

连接。后续服务端向客户端发送消息时,先从

Map

中找到对应的客户端消息通道连接,再向通道中写入消息进行发送。

/**
 * @Author 鳄鱼儿
 * @Description 连接通道保存MAP
 * @date 2022/11/27 16:30
 * @Version 1.0
 */publicclassChannelMap{/**
     * 存放客户端标识ID(消息ID)与channel的对应关系
     */privatestaticvolatileConcurrentHashMap<String,Channel> channelMap =null;privateChannelMap(){}publicstaticConcurrentHashMap<String,Channel>getChannelMap(){if(null== channelMap){synchronized(ChannelMap.class){if(null== channelMap){
                    channelMap =newConcurrentHashMap<>();}}}return channelMap;}publicstaticChannelgetChannel(String id){returngetChannelMap().get(id);}}

在客户端建立连接(服务端收到心跳消息)时,将channel加入map中。

publicclassServerListenerHandlerextendsSimpleChannelInboundHandler<Message>{privatestaticfinalLogger log =LoggerFactory.getLogger(ServerListenerHandler.class);/**
     * 设备接入连接时处理
     *
     * @param ctx
     */@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx){
        log.info("有新的连接:[{}]", ctx.channel().id().asLongText());}/**
     * 数据处理
     *
     * @param ctx
     * @param msg
     */@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,Message msg){// 获取消息实例中的消息体String content = msg.getContent();// 对不同消息类型进行处理MessageEnum type =MessageEnum.getStructureEnum(msg);switch(type){caseCONNECT:// 将通道加入ChannelMapChannelMap.getChannelMap().put(msg.getId(), ctx.channel());// 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户IDAttributeKey<String> key =AttributeKey.valueOf("id");
                ctx.channel().attr(key).setIfAbsent(msg.getId());// TODO 心跳消息处理caseSTATE:// TODO 设备状态default:System.out.println(type.content +"消息内容"+ content);}}/**
     * 设备下线处理
     *
     * @param ctx
     */@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx){
        log.info("设备下线了:{}", ctx.channel().id().asLongText());// map中移除channelremoveId(ctx);}/**
     * 设备连接异常处理
     *
     * @param ctx
     * @param cause
     */@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause){// 打印异常
        log.info("异常:{}", cause.getMessage());// map中移除channelremoveId(ctx);// 关闭连接
        ctx.close();}privatevoidremoveId(ChannelHandlerContext ctx){AttributeKey<String> key =AttributeKey.valueOf("id");// 获取channel中idString id = ctx.channel().attr(key).get();// map移除channelChannelMap.getChannelMap().remove(id);}}

写一个服务端发送消息的业务层类,并通过客户端id在map中获取到channel通道,将消息转化成json字符串后,通过

writeAndFlush

发送至客户端。

/**
 * @Author 鳄鱼儿
 * @Description 向客户端发送消息
 * @date 2022/11/27 17:29
 * @Version 1.0
 */@ServicepublicclassPushMsgServiceImplimplementsPushMsgService{/**
     * 向一个客户端发送消息
     *
     * @param msg
     */@Overridepublicvoidpush(Message msg){// 客户端IDString id = msg.getId();Channel channel =ChannelMap.getChannel(id);if(null== channel){thrownewRuntimeException("客户端已离线");}
        channel.writeAndFlush(msg);}}

注意:

writeAndFlush

参数是自定义编码的泛型对象实例。如本文自定义的

Message

消息解析类。

publicclassMessageEncodeHandlerextendsMessageToByteEncoder<Message>{privatestaticString delimiter;publicMessageEncodeHandler(String delimiter){this.delimiter = delimiter;}@Overrideprotectedvoidencode(ChannelHandlerContext channelHandlerContext,Message message,ByteBuf out)throwsException{
        out.writeBytes((message.toJsonString()+ delimiter).getBytes(CharsetUtil.UTF_8));}}

之后再编写一个

Controller

类(这里省略),在

Controller

类中调用

PushMsgService

中pushff,就可以完成对客户端的消息发送。


本文转载自: https://blog.csdn.net/Ber_Bai/article/details/128155253
版权归原作者 鳄鱼儿 所有, 如有侵权,请联系我们删除。

“Netty系列(三):Netty服务端发送消息到客户端”的评论:

还没有评论