0


springboot 整合netty

增加netty依赖

        <!-- netty 心跳 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.28.Final</version>
    </dependency>

1.创建netty init 类

/**
* @author jxj
*/

public class WsServerInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {

            ChannelPipeline pipeline = ch.pipeline();

            //websocket基于http协议,所以需要http编解码器
            pipeline.addLast(new HttpServerCodec());
            
            //添加对于读写大数据流的支持
            pipeline.addLast(new ChunkedWriteHandler());
            
            //对httpMessage进行聚合
            pipeline.addLast(new HttpObjectAggregator(1024*64));

            // ================= 上述是用于支持http协议的 ==============

            //websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
            //比如处理一些握手动作(ping,pong) 等同于访问路劲
            pipeline.addLast(new WebSocketServerProtocolHandler("/heart/api/v1"));
            //自定义handler
            pipeline.addLast(new ChatHandler());
    }
}

2.netty 启动类

/**
* @author jxj
* 服务端基本配置,通过一个静态单例类,保证启动时候只被加载一次
*/
@Component
@Slf4j
public class WsServer {

        /**
         *单例静态内部类
         */
        public static class SingletonServer{
            static final WsServer INSTANCE = new WsServer();
        }

        public static WsServer getInstance(){
            return SingletonServer.INSTANCE;
        }

        private EventLoopGroup mainGroup ;
        private EventLoopGroup subGroup;
        private ServerBootstrap server;
        private ChannelFuture future;

        public WsServer(){
            mainGroup = new NioEventLoopGroup();
            subGroup = new NioEventLoopGroup();
            //添加自定义初始化处理器
            server = new ServerBootstrap();
            server.group(mainGroup, subGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new WsServerInitializer());
        }

        public void start(Integer port){

            future = this.server.bind(port);
        }
}

3.启动netty

/**
* 启动程序
*
* @author jxj
*/
@SpringBootApplication
public class Application{
    public static void main(String[] args){
        SpringApplication.run(Application.class,args);
        //7888 是netty 占用的端口
        WsServer.getInstance().start(7888);
    }
}

4.接收代码


/**
 * @author jxj
 */
@Component
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 保存channel端
     */
    private static ChannelGroup CLIENTS =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 客户端创建的时候触发,当客户端连接上服务端之后,就可以获取该channel,然后放到channelGroup中进行统一管理
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        CLIENTS.add(ctx.channel());
    }

    /**
     * 客户端销毁的时候触发,
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
          CLIENTS.remove(ctx.channel());
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
                //接收客户端发送的信息
                System.out.println(msg.text());
        //发送客户端的消息
         ctx.channel().writeAndFlush(new TextWebSocketFrame("发送心跳"));
    }

    /**
     * 出现异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("心跳异常:",cause);
        CLIENTS.remove(ctx.channel());
    }

}

netty 自带定时器 这里只举个例子 定时器不在spring管理中举读取的定时发送


/**
 * @author jxj
 */
@Component
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 保存channel端
     */
    private static ChannelGroup CLIENTS =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
       @Autowired
    private RestTemplate restTemplate;

    /**
     * 客户端创建的时候触发,当客户端连接上服务端之后,就可以获取该channel,然后放到channelGroup中进行统一管理
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        CLIENTS.add(ctx.channel());
    }

    /**
     * 客户端销毁的时候触发,
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
          CLIENTS.remove(ctx.channel());
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
                //接收客户端发送的信息
                System.out.println(msg.text());
        ctx.channel().eventLoop().scheduleWithFixedDelay(new Runnable() 
 
{   
        ChannelHandlerContext ctx;
        RestTemplate restTemplate;
         @Override
            public void run() {}
//对自身属性进行赋值
            public Runnable accept(ChannelHandlerContext c, RestTemplate restTemplate) {
                this.ctx = c;
                this.restTemplate= restTemplate;
                return this;
   //0 是延迟多少秒执行 后面两个参数时多长时间执行一次
            }}.accept(ctx,restTemplate),0,1, TimeUnit.MINUTES);
    }

    /**
     * 出现异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("心跳异常:",cause);
        CLIENTS.remove(ctx.channel());
    }

}
标签: spring boot java 后端

本文转载自: https://blog.csdn.net/weixin_56576835/article/details/129992578
版权归原作者 Surperw985 所有, 如有侵权,请联系我们删除。

“springboot 整合netty”的评论:

还没有评论