1. 创建SpringBoot父工程
创建一个SpringBoot工程,然后创建三个子模块
整体工程目录:一个server服务(netty服务器),两个client服务(netty客户端)
pom文件引入netty依赖,springboot依赖
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><packaging>pom</packaging><modules><module>server</module><module>client1</module><module>client2</module></modules><!--继承 SpringBoot 框架的一个父项目,所有自己开发的 Spring Boot 都必须的继承--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.1.RELEASE</version><relativePath/><!-- lookup parent from repository --></parent><groupId>org.example</groupId><artifactId>nettyTest</artifactId><version>1.0-SNAPSHOT</version><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--SpringBoot 框架 web 项目起步依赖,通过该依赖自动关联其它依赖,不需要我们一个一个去添加--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.34.Final</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency></dependencies></project>
2. 创建Netty服务器
NettySpringBootApplication
packageserver;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublicclassNettySpringBootApplication{publicstaticvoidmain(String[] args){SpringApplication.run(NettySpringBootApplication.class, args);}}
NettyServiceHandler
packageserver.netty;importio.netty.channel.Channel;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;importio.netty.channel.group.ChannelGroup;importio.netty.channel.group.DefaultChannelGroup;importio.netty.util.concurrent.GlobalEventExecutor;publicclassNettyServiceHandlerextendsSimpleChannelInboundHandler<String>{privatestaticfinalChannelGroup group =newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{// 获取到当前与服务器连接成功的channelChannel channel = ctx.channel();
group.add(channel);System.out.println(channel.remoteAddress()+" 上线,"+"在线数量:"+ group.size());}@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{// 获取到当前要断开连接的ChannelChannel channel = ctx.channel();System.out.println(channel.remoteAddress()+"下线,"+"在线数量:"+ group.size());}@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,String msg)throwsException{Channel channel = ctx.channel();System.out.println("netty客户端"+ channel.remoteAddress()+"发送过来的消息:"+ msg);
group.forEach(ch ->{// JDK8 提供的lambda表达式if(ch != channel){
ch.writeAndFlush(channel.remoteAddress()+":"+ msg +"\n");}});}publicvoidexceptionCaught(ChannelHandlerContext channelHandlerContext,Throwable throwable)throwsException{
throwable.printStackTrace();
channelHandlerContext.close();}}
SocketInitializer
packageserver.netty;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelPipeline;importio.netty.channel.socket.SocketChannel;importio.netty.handler.codec.LineBasedFrameDecoder;importio.netty.handler.codec.bytes.ByteArrayDecoder;importio.netty.handler.codec.bytes.ByteArrayEncoder;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importorg.springframework.stereotype.Component;@ComponentpublicclassSocketInitializerextendsChannelInitializer<SocketChannel>{@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{ChannelPipeline pipeline = socketChannel.pipeline();// // 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择// pipeline.addLast(new ByteArrayDecoder());// pipeline.addLast(new ByteArrayEncoder());//添加一个基于行的解码器
pipeline.addLast(newLineBasedFrameDecoder(2048));
pipeline.addLast(newStringDecoder());
pipeline.addLast(newStringEncoder());// 添加上自己的处理器
pipeline.addLast(newNettyServiceHandler());}}
NettyServer
packageserver.netty;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelOption;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.nio.NioServerSocketChannel;importlombok.Getter;importlombok.extern.slf4j.Slf4j;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;@Component@Slf4jpublicclassNettyServer{privatefinalstaticLogger logger =LoggerFactory.getLogger(NettyServer.class);@ResourceprivateSocketInitializer socketInitializer;@GetterprivateServerBootstrap serverBootstrap;/**
* netty服务监听端口
*/@Value("${netty.port:6666}")privateint port;/**
* 主线程组数量
*/@Value("${netty.bossThread:1}")privateint bossThread;/**
* 启动netty服务器
*/publicvoidstart(){this.init();this.serverBootstrap.bind(this.port);
logger.info("Netty started on port: {} (TCP) with boss thread {}",this.port,this.bossThread);}/**
* 初始化netty配置
*/privatevoidinit(){NioEventLoopGroup bossGroup =newNioEventLoopGroup(this.bossThread);NioEventLoopGroup workerGroup =newNioEventLoopGroup();this.serverBootstrap =newServerBootstrap();this.serverBootstrap.group(bossGroup, workerGroup)// 两个线程组加入进来.channel(NioServerSocketChannel.class)// 配置为nio类型.option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接个数.childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(this.socketInitializer);// 加入自己的初始化器}}
NettyStartListener
packageserver.netty;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
* 监听Spring容器启动完成,完成后启动Netty服务器
* @author Gjing
**/@ComponentpublicclassNettyStartListenerimplementsApplicationRunner{@ResourceprivateNettyServer nettyServer;@Overridepublicvoidrun(ApplicationArguments args)throwsException{this.nettyServer.start();}}
application.yml
server:port:8000netty:port:6666bossThread:1
3. 创建Netty客户端
Client1
packageclient;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublicclassClient1{publicstaticvoidmain(String[] args){SpringApplication.run(Client1.class, args);}}
NettyClientHandler
packageclient.netty;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;publicclassNettyClientHandlerextendsSimpleChannelInboundHandler<String>{publicvoidchannelRead0(ChannelHandlerContext channelHandlerContext,String msg){System.out.println("收到服务端消息:"+ channelHandlerContext.channel().remoteAddress()+"的消息:"+ msg);}publicvoidexceptionCaught(ChannelHandlerContext channelHandlerContext,Throwable throwable)throwsException{
channelHandlerContext.close();}}
SocketInitializer
packageclient.netty;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelPipeline;importio.netty.channel.socket.SocketChannel;importio.netty.handler.codec.LineBasedFrameDecoder;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importorg.springframework.stereotype.Component;@ComponentpublicclassSocketInitializerextendsChannelInitializer<SocketChannel>{@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(newLineBasedFrameDecoder(2048));
pipeline.addLast(newStringDecoder());
pipeline.addLast(newStringEncoder());
pipeline.addLast(newNettyClientHandler());//加入自己的处理器}}
NettyClient
packageclient.netty;importio.netty.bootstrap.Bootstrap;importio.netty.channel.Channel;importio.netty.channel.ChannelFuture;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.nio.NioSocketChannel;importlombok.Getter;importlombok.extern.slf4j.Slf4j;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.io.BufferedReader;importjava.io.IOException;importjava.io.InputStreamReader;importjava.nio.charset.StandardCharsets;@Component@Slf4jpublicclassNettyClient{privatefinalstaticLogger logger =LoggerFactory.getLogger(NettyClient.class);@ResourceprivateSocketInitializer socketInitializer;@GetterprivateBootstrap bootstrap;@GetterprivateChannel channel;/**
* netty服务监听端口
*/@Value("${netty.port:6666}")privateint port;@Value("${netty.host:127.0.0.1}")privateString host;/**
* 启动netty
*/publicvoidstart(){this.init();this.channel =this.bootstrap.connect(host, port).channel();
logger.info("Netty connect on port: {}, the host {}, the channel {}",this.port,this.host,this.channel);try{InputStreamReader is =newInputStreamReader(System.in,StandardCharsets.UTF_8);BufferedReader br =newBufferedReader(is);while(true){System.out.println("输入:");this.channel.writeAndFlush(br.readLine()+"\r\n");}}catch(IOException e){
e.printStackTrace();}}/**
* 初始化netty配置
*/privatevoidinit(){EventLoopGroup group =newNioEventLoopGroup();this.bootstrap =newBootstrap();//设置线程组
bootstrap.group(group).channel(NioSocketChannel.class)//设置客户端的通道实现类型.handler(this.socketInitializer);}}
NettyStartListener
packageclient.netty;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;@ComponentpublicclassNettyStartListenerimplementsApplicationRunner{@ResourceprivateNettyClient nettyClient;@Overridepublicvoidrun(ApplicationArguments args)throwsException{this.nettyClient.start();}}
application.yml
server:
port: 8001
netty:
port: 6666
host: "127.0.0.1"
然后按照相同的方法创建client2
4. 测试
版权归原作者 青铜爱码士 所有, 如有侵权,请联系我们删除。