0


【手写一个RPC框架】simpleRPC-06

在这里插入图片描述

本项目所有代码可见:https://github.com/weiyu-zeng/SimpleRPC

前言

本次改进我们将引入zookeeper作为RPC框架的注册中心,服务端在zookeeper上注册自己的服务,而客户端调用服务,回去zookeeper上根据服务名寻找调用的服务器地址,使得我们RPC支持集群调度通信的能力。

实现

zookeeper安装与使用

zookeeper安装请见:

【zookeeper】windows版zookeeper安装与启动 可能遇到的各种问题

安装好之后,我们打开zookeeper的server:

在这里插入图片描述
server启动如下:
在这里插入图片描述

开启zookeeper的client:

在这里插入图片描述
如下,说明成功启动了
在这里插入图片描述

按回车:
在这里插入图片描述

输入

  1. ls /

我们查看目录:

在这里插入图片描述

到此为止,先放在这不要关,我们写代码去。

项目创建

创建一个名为simpleRPC-06的module:

在这里插入图片描述

创建com.rpc的package:

在这里插入图片描述

依赖配置

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>SimpleRPC</artifactId>
  7. <groupId>org.example</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>simpleRPC-06</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
  18. <dependency>
  19. <groupId>org.projectlombok</groupId>
  20. <artifactId>lombok</artifactId>
  21. <version>1.18.12</version>
  22. <scope>provided</scope>
  23. </dependency>
  24. <dependency>
  25. <groupId>io.netty</groupId>
  26. <artifactId>netty-all</artifactId>
  27. <version>4.1.51.Final</version>
  28. </dependency>
  29. <!-- 阿里的fastjson序列化框架 -->
  30. <dependency>
  31. <groupId>com.alibaba</groupId>
  32. <artifactId>fastjson</artifactId>
  33. <version>1.2.67</version>
  34. </dependency>
  35. <!--这个jar包应该依赖log4j,不引入log4j会有控制台会有warn,但不影响正常使用-->
  36. <dependency>
  37. <groupId>org.apache.curator</groupId>
  38. <artifactId>curator-recipes</artifactId>
  39. <version>2.13.0</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.slf4j</groupId>
  43. <artifactId>slf4j-nop</artifactId>
  44. <version>1.7.30</version>
  45. </dependency>
  46. </dependencies>
  47. </project>

请注意一下,curator必须要和zookeeper版本适配,如果curator版本太高,项目将无法运行。

我们在resources目录下配置一下 log4j的配置,文件名为 log4j.properties:

在这里插入图片描述

log4j.properties

  1. log4j.rootLogger=INFO, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

register

我们创建一个名为register的package:

在这里插入图片描述

创建注册中心的注册服务接口ServiceRegister.java:

  1. package com.rpc.register;
  2. import java.net.InetSocketAddress;
  3. /**
  4. * @author weiyu_zeng
  5. *
  6. * 服务注册接口,两大基本功能,注册:保存服务与地址。 查询:根据服务名查找地址
  7. */
  8. public interface ServiceRegister {
  9. void register(String serviceName, InetSocketAddress serverAddress);
  10. InetSocketAddress serviceDiscovery(String serviceName);
  11. }

然后创建服务注册实现类 ZkServiceRegister.java:

  1. package com.rpc.register;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import org.apache.zookeeper.CreateMode;
  7. import org.apache.zookeeper.data.Stat;
  8. import java.net.InetSocketAddress;
  9. import java.util.List;
  10. /**
  11. * @author weiyu_zeng
  12. *
  13. * Curator:是Zookeeper开源的客户端框架,封装了很多API,使用起来非常的方便
  14. * CuratorFramework:连接zookeeper服务的框架,客户端创建使用静态工厂方式CuratorFrameworkFactory进行创建
  15. * tickTime:zk的心跳间隔(heartbeat interval),也是session timeout基本单位.单位为毫秒.
  16. * minSessionTimeout:最小超时时间,zk设置的默认值为2*tickTime.
  17. * maxSessionTimeout:最大超时时间,zk设置的默认值为20*tickTime.
  18. * retryPolicy()重连策略:
  19. * Curator 四种重连策略:
  20. * 1.RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
  21. * 以sleepMsBetweenRetries的间隔重连,直到超过maxElapsedTimeMs的时间设置
  22. *
  23. * 2.RetryNTimes(int n, int sleepMsBetweenRetries)
  24. * 指定重连次数
  25. *
  26. * 3.RetryOneTime(int sleepMsBetweenRetry)
  27. * 重连一次,简单粗暴
  28. *
  29. * 4.ExponentialBackoffRetry
  30. * ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
  31. * ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
  32. * 时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
  33. *
  34. * namespace(): 为了避免各个应用的zk patch冲突, Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选).
  35. * 这样你在create ZNode的时候都会自动加上这个namespace作为这个node path的root.
  36. * CuratorFramework.create():开始创建操作,可以调用额外的方法(比如方式mode 或者后台执行background) 并在最后调用forPath()
  37. * 指定要操作的ZNode
  38. * CuratorFramework.checkExists(): 开始检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()
  39. * 指定要操作的ZNode
  40. * CuratorFramework.start() / close():启动和关闭客户端
  41. * CuratorFramework(client).create().withMode(CreateMode.EPHEMERAL):这将使用给定的数据创建临时结点 EPHEMERAL ZNode
  42. * CuratorFramework.getChildren():开始获得ZNode的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态watch,
  43. * background or get stat)并在最后调用forPath()指定要操作的ZNode
  44. *
  45. * InetSocketAddress:该类实现了可序列化接口,直接继承自java.net.SocketAddress类。实现 IP 套接字地址(IP 地址 + 端口号)。
  46. * 它还可以是一个对(主机名 + 端口号),在此情况下,将尝试解析主机名。如果解析失败,则该地址将被视为未解析
  47. * 地址,但是其在某些情形下仍然可以使用,比如通过代理连接。
  48. * 构造方法:InetSocketAddress(InetAddress addr, int port) 根据 IP 地址和端口号创建套接字地址。
  49. * InetSocketAddress(String hostname, int port) 根据主机名(IP地址指代)和端口号创建套接字地址。
  50. * InetSocketAddress.getHostName():获取 hostname。即地址的主机名部分。
  51. * InetSocketAddress.getPort() 获取端口号。
  52. */
  53. public class ZkServiceRegister implements ServiceRegister {
  54. // curator 提供的zookeeper客户端
  55. private CuratorFramework client;
  56. // zookeeper根路径结点
  57. private static final String ROOT_PATH = "MyRPC";
  58. // 构造方法
  59. // 这里负责zookeeper客户端的初始化,并与zookeeper服务端建立连接。
  60. // 初始化包括指定重连策略,指定连接zookeeper的端口,指定超时时间,指定命名空间
  61. // 初始化完成之后start()开启zookeeper客户端。
  62. public ZkServiceRegister() {
  63. // 重连策略:指数时间重试
  64. RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
  65. // zookeeper的地址固定,不管是服务提供者还是消费者,都要与之建立连接
  66. // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
  67. // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
  68. // 使用心跳监听状态
  69. this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
  70. .sessionTimeoutMs(40000)
  71. .retryPolicy(policy)
  72. .namespace(ROOT_PATH)
  73. .build();
  74. this.client.start();
  75. System.out.println("zookeeper 连接成功");
  76. }
  77. // 注册:传入服务方法名(String),传入主机名和端口号的套接字地址(InetSocketAddress)
  78. @Override
  79. public void register(String serviceName, InetSocketAddress serverAddress) {
  80. try {
  81. // serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址
  82. Stat stat = client.checkExists().forPath("/" + serviceName);
  83. if (stat == null) {
  84. client.create()
  85. .creatingParentsIfNeeded()
  86. .withMode(CreateMode.PERSISTENT)
  87. .forPath("/" + serviceName);
  88. }
  89. // 路径地址,一个/代表一个节点
  90. String path = "/" + serviceName + "/" + getServiceAddress(serverAddress);
  91. // 临时节点,服务器下线就删除节点
  92. client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
  93. } catch (Exception e) {
  94. System.out.println("此服务已存在");
  95. }
  96. }
  97. // 根据服务名返回地址
  98. @Override
  99. public InetSocketAddress serviceDiscovery(String serviceName) {
  100. try {
  101. List<String> strings = client.getChildren().forPath("/" + serviceName);
  102. // 这里默认用的第一个,后面加负载均衡
  103. String string = strings.get(0);
  104. return parseAddress(string);
  105. } catch (Exception e) {
  106. e.printStackTrace();
  107. }
  108. return null;
  109. }
  110. // 地址 -> XXX.XXX.XXX.XXX:port 字符串
  111. private String getServiceAddress(InetSocketAddress serverAddress) {
  112. return serverAddress.getHostName() + ":" + serverAddress.getPort();
  113. }
  114. // 字符串解析为地址:按照":"切分开,前半是host(String),后半是port(int)
  115. private InetSocketAddress parseAddress(String address) {
  116. String[] result = address.split(":");
  117. return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
  118. }
  119. }

接下来可以对service,client和server进行修改。

client

NettyRPCClient.java 做一点修改:

  1. package com.rpc.client;
  2. import com.rpc.common.RPCRequest;
  3. import com.rpc.common.RPCResponse;
  4. import com.rpc.register.ServiceRegister;
  5. import com.rpc.register.ZkServiceRegister;
  6. import io.netty.bootstrap.Bootstrap;
  7. import io.netty.channel.Channel;
  8. import io.netty.channel.ChannelFuture;
  9. import io.netty.channel.EventLoopGroup;
  10. import io.netty.channel.nio.NioEventLoopGroup;
  11. import io.netty.channel.socket.nio.NioSocketChannel;
  12. import io.netty.util.AttributeKey;
  13. import java.net.InetSocketAddress;
  14. /**
  15. * @author zwy
  16. *
  17. * 实现RPCClient接口
  18. */
  19. public class NettyRPCClient implements RPCClient {
  20. private static final Bootstrap bootstrap;
  21. private static final EventLoopGroup eventLoopGroup;
  22. private String host;
  23. private int port;
  24. private ServiceRegister serviceRegister; // ServiceRegister接口类class
  25. // 构造函数:初始化zookeeper
  26. public NettyRPCClient() {
  27. this.serviceRegister = new ZkServiceRegister();
  28. }
  29. // netty客户端初始化,重复使用
  30. static {
  31. eventLoopGroup = new NioEventLoopGroup();
  32. bootstrap = new Bootstrap();
  33. bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
  34. .handler(new NettyClientInitializer());
  35. }
  36. /**
  37. * 这里需要操作一下,因为netty的传输都是异步的,你发送request,会立刻返回一个值, 而不是想要的相应的response
  38. */
  39. @Override
  40. public RPCResponse sendRequest(RPCRequest request) {
  41. InetSocketAddress address = serviceRegister.serviceDiscovery(request.getInterfaceName());
  42. host = address.getHostName();
  43. port = address.getPort();
  44. try {
  45. ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
  46. Channel channel = channelFuture.channel();
  47. // 发送数据
  48. channel.writeAndFlush(request);
  49. channel.closeFuture().sync();
  50. // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
  51. // AttributeKey是,线程隔离的,不会由线程安全问题。
  52. // 实际上不应通过阻塞,可通过回调函数,后面可以再进行优化
  53. AttributeKey<RPCResponse> key = AttributeKey.valueOf("RPCResponse");
  54. RPCResponse response = channel.attr(key).get();
  55. System.out.println(response);
  56. return response;
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. return null;
  61. }
  62. }

TestClient.java 也做相应的修改:

  1. package com.rpc.client;
  2. import com.rpc.common.Blog;
  3. import com.rpc.common.User;
  4. import com.rpc.service.BlogService;
  5. import com.rpc.service.UserService;
  6. /**
  7. * @author zwy
  8. */
  9. public class TestClient {
  10. public static void main(String[] args) {
  11. // 不需传host,port
  12. RPCClient rpcClient = new NettyRPCClient();
  13. // 把这个客户端传入代理客户端
  14. RPCClientProxy rpcClientProxy = new RPCClientProxy(rpcClient);
  15. // 代理客户端根据不同的服务,获得一个代理类, 并且这个代理类的方法以或者增强(封装数据,发送请求)
  16. UserService userService = rpcClientProxy.getProxy(UserService.class);
  17. // 服务的方法1
  18. User userByUserId = userService.getUserByUserId(10);
  19. System.out.println("从服务器端得到的user为:" + userByUserId);
  20. // 服务的方法2
  21. User user = User.builder().userName("张三").id(100).sex(true).build();
  22. Integer integer = userService.insertUserId(user);
  23. System.out.println("向服务器端插入数据" + integer);
  24. // 服务的方法3
  25. BlogService blogService = rpcClientProxy.getProxy(BlogService.class);
  26. Blog blogById = blogService.getBlogById(10000);
  27. System.out.println("从服务端得到的blog为:" + blogById);
  28. }
  29. }

client中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:

RPCClient.java

  1. package com.rpc.client;
  2. import com.rpc.common.RPCRequest;
  3. import com.rpc.common.RPCResponse;
  4. /**
  5. * @author zwy
  6. *
  7. * RPC客户端:发送请求,获得response
  8. */
  9. public interface RPCClient {
  10. RPCResponse sendRequest(RPCRequest request);
  11. }

RPCClientProxy.java

  1. package com.rpc.client;
  2. import com.rpc.common.RPCRequest;
  3. import com.rpc.common.RPCResponse;
  4. import lombok.AllArgsConstructor;
  5. import java.lang.reflect.InvocationHandler;
  6. import java.lang.reflect.Method;
  7. import java.lang.reflect.Proxy;
  8. /**
  9. * @author zwy
  10. *
  11. * 客户端代理:把动态代理封装request对象(这里和simpleRPC-02的ClientProxy函数一样,保留了动态代理的设计)
  12. */
  13. @AllArgsConstructor
  14. public class RPCClientProxy implements InvocationHandler {
  15. private RPCClient client;
  16. // jdk动态代理,每一次代理对象调用方法,会经过此方法增强(反射获取request对象,socket发送至客户端)
  17. @Override
  18. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  19. // request的构建,使用了lombok中的builder,更加简洁
  20. RPCRequest request = RPCRequest.builder().interfaceName(method.getDeclaringClass().getName())
  21. .methodName(method.getName())
  22. .params(args)
  23. .paramsTypes(method.getParameterTypes())
  24. .build();
  25. // 数据传输
  26. RPCResponse response = client.sendRequest(request);
  27. // System.out.println(response);
  28. return response.getData();
  29. }
  30. <T> T getProxy(Class<T> clazz) {
  31. Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
  32. return (T)o;
  33. }
  34. }

NettyClientInitializer.java

  1. package com.rpc.client;
  2. import com.rpc.codec.JsonSerializer;
  3. import com.rpc.codec.MyDecode;
  4. import com.rpc.codec.MyEncode;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.ChannelPipeline;
  7. import io.netty.channel.socket.SocketChannel;
  8. /**
  9. * @author zwy
  10. *
  11. * 同样的与服务端解码和编码格式
  12. */
  13. public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
  14. @Override
  15. protected void initChannel(SocketChannel ch) throws Exception {
  16. ChannelPipeline pipeline = ch.pipeline();
  17. // 使用自定义的编解码器
  18. pipeline.addLast(new MyDecode());
  19. // 编码需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的
  20. pipeline.addLast(new MyEncode(new JsonSerializer()));
  21. pipeline.addLast(new NettyClientHandler());
  22. }
  23. }

NettyClientHandler.java

  1. package com.rpc.client;
  2. import com.rpc.common.RPCResponse;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.util.AttributeKey;
  6. /**
  7. * @author zwy
  8. */
  9. public class NettyClientHandler extends SimpleChannelInboundHandler<RPCResponse> {
  10. @Override
  11. protected void channelRead0(ChannelHandlerContext ctx, RPCResponse msg) throws Exception {
  12. // 接收到response, 给channel设计别名,让sendRequest里读取response
  13. AttributeKey<RPCResponse> key = AttributeKey.valueOf("RPCResponse");
  14. ctx.channel().attr(key).set(msg);
  15. ctx.channel().close();
  16. }
  17. // 跟NettyRPCServerHandler一样
  18. @Override
  19. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  20. cause.printStackTrace();
  21. ctx.close();
  22. }
  23. }

service

服务暴露类加入注册的功能,ServiceProvider.java 做相应的修改:

  1. package com.rpc.service;
  2. import com.rpc.register.ServiceRegister;
  3. import com.rpc.register.ZkServiceRegister;
  4. import java.net.InetSocketAddress;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. /**
  8. * @author zwy
  9. */
  10. public class ServiceProvider {
  11. /**
  12. * 一个实现类可能实现多个服务接口,
  13. */
  14. private Map<String, Object> interfaceProvider;
  15. private ServiceRegister serviceRegister;
  16. private String host;
  17. private int port;
  18. public ServiceProvider(String host, int port){
  19. // 需要传入服务端自身的服务的网络地址
  20. this.host = host;
  21. this.port = port;
  22. this.interfaceProvider = new HashMap<>();
  23. this.serviceRegister = new ZkServiceRegister();
  24. }
  25. public void provideServiceInterface(Object service) throws Exception {
  26. Class<?>[] interfaces = service.getClass().getInterfaces();
  27. for(Class clazz : interfaces){
  28. // 本机的映射表
  29. interfaceProvider.put(clazz.getName(),service);
  30. // 在注册中心注册服务
  31. serviceRegister.register(clazz.getName(), new InetSocketAddress(host, port));
  32. }
  33. }
  34. public Object getService(String interfaceName){
  35. return interfaceProvider.get(interfaceName);
  36. }
  37. }

service中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:

BlogService.java

  1. package com.rpc.service;
  2. import com.rpc.common.Blog;
  3. public interface BlogService {
  4. Blog getBlogById(Integer id);
  5. }

BlogServiceImpl.java

  1. package com.rpc.service;
  2. import com.rpc.common.Blog;
  3. public class BlogServiceImpl implements BlogService {
  4. @Override
  5. public Blog getBlogById(Integer id) {
  6. Blog blog = Blog.builder()
  7. .id(id)
  8. .title("我的博客")
  9. .useId(22).build();
  10. System.out.println("客户端查询了" + id + "博客");
  11. return blog;
  12. }
  13. }

UserService.java

  1. package com.rpc.service;
  2. import com.rpc.common.User;
  3. /**
  4. * @author zwy
  5. */
  6. public interface UserService {
  7. // 客户端通过这个接口调用服务端的实现类
  8. User getUserByUserId(Integer id);
  9. // 给这个服务增加一个功能
  10. Integer insertUserId(User user);
  11. }

UserServiceImpl.java

  1. package com.rpc.service;
  2. import com.rpc.common.User;
  3. /**
  4. * @author zwy
  5. */
  6. public class UserServiceImpl implements UserService {
  7. @Override
  8. public User getUserByUserId(Integer id) {
  9. // 模拟从数据库中取用户的行为
  10. User user = User.builder()
  11. .id(id)
  12. .userName("he2121")
  13. .sex(true).build();
  14. System.out.println("客户端查询了" + id + "的用户");
  15. return user;
  16. }
  17. @Override
  18. public Integer insertUserId(User user) {
  19. System.out.println("插入数据成功: " + user);
  20. return 1;
  21. }
  22. }

server

TestServer.java 做相应的修改:

  1. package com.rpc.server;
  2. import com.rpc.service.*;
  3. public class TestServer {
  4. public static void main(String[] args) throws Exception {
  5. UserService userService = new UserServiceImpl();
  6. BlogService blogService = new BlogServiceImpl();
  7. // 这里重用了服务暴露类,顺便在注册中心注册,实际上应分开,每个类做各自独立的事
  8. ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 8899); // 8899
  9. serviceProvider.provideServiceInterface(userService);
  10. serviceProvider.provideServiceInterface(blogService);
  11. RPCServer RPCServer = new NettyRPCServer(serviceProvider);
  12. RPCServer.start(8899);
  13. }
  14. }

server中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:

RPCServer.java

  1. package com.rpc.server;
  2. /**
  3. * @author zwy
  4. */
  5. public interface RPCServer {
  6. void start(int port);
  7. void stop();
  8. }

NettyRPCServer.java

  1. package com.rpc.server;
  2. import com.rpc.service.ServiceProvider;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import lombok.AllArgsConstructor;
  8. /**
  9. * @author zwy
  10. */
  11. @AllArgsConstructor
  12. public class NettyRPCServer implements RPCServer {
  13. private ServiceProvider serviceProvider;
  14. @Override
  15. public void start(int port) {
  16. // netty服务线程组负责建立连接(TCP/IP连接),work负责具体的请求
  17. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  18. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  19. System.out.println("Netty服务端启动了");
  20. try {
  21. // 启动Netty服务器
  22. ServerBootstrap serverBootstrap = new ServerBootstrap();
  23. // 初始化
  24. serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
  25. .childHandler(new NettyServerInitializer(serviceProvider));
  26. // 同步阻塞
  27. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  28. // 死循环监听
  29. channelFuture.channel().closeFuture().sync();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. } finally {
  33. bossGroup.shutdownGracefully();
  34. workGroup.shutdownGracefully();
  35. }
  36. }
  37. @Override
  38. public void stop() {
  39. }
  40. }

NettyServerInitializer.java

  1. package com.rpc.server;
  2. import com.rpc.codec.JsonSerializer;
  3. import com.rpc.codec.MyDecode;
  4. import com.rpc.codec.MyEncode;
  5. import com.rpc.service.ServiceProvider;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.ChannelPipeline;
  8. import io.netty.channel.socket.SocketChannel;
  9. import lombok.AllArgsConstructor;
  10. /**
  11. * @author zwy
  12. */
  13. @AllArgsConstructor
  14. public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
  15. private ServiceProvider serviceProvider;
  16. @Override
  17. protected void initChannel(SocketChannel ch) throws Exception {
  18. ChannelPipeline pipeline = ch.pipeline();
  19. // 使用自定义的解码器
  20. pipeline.addLast(new MyDecode());
  21. // 使用自定义的编码器,而且解码器需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的
  22. pipeline.addLast(new MyEncode(new JsonSerializer()));
  23. pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
  24. }
  25. }

NettyRPCServerHandler.java

  1. package com.rpc.server;
  2. import com.rpc.common.RPCRequest;
  3. import com.rpc.common.RPCResponse;
  4. import com.rpc.service.ServiceProvider;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.SimpleChannelInboundHandler;
  7. import lombok.AllArgsConstructor;
  8. import java.lang.reflect.InvocationTargetException;
  9. import java.lang.reflect.Method;
  10. /**
  11. * @author zwy
  12. */
  13. @AllArgsConstructor
  14. public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RPCRequest> {
  15. private ServiceProvider serviceProvider;
  16. @Override
  17. protected void channelRead0(ChannelHandlerContext ctx, RPCRequest msg) throws Exception {
  18. // System.out.println(msg);
  19. RPCResponse response = getResponse(msg);
  20. ctx.writeAndFlush(response);
  21. ctx.close();
  22. }
  23. @Override
  24. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  25. cause.printStackTrace();
  26. ctx.close();
  27. }
  28. // 这里和WorkThread里的getResponse差不多
  29. RPCResponse getResponse(RPCRequest request) {
  30. // 得到服务名
  31. String interfaceName = request.getInterfaceName();
  32. // 得到服务器相应类
  33. Object service = serviceProvider.getService(interfaceName);
  34. // 反射调用方法
  35. Method method = null;
  36. try {
  37. method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes());
  38. Object invoke = method.invoke(service, request.getParams());
  39. return RPCResponse.success(invoke);
  40. } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
  41. e.printStackTrace();
  42. System.out.println("方法执行错误");
  43. return RPCResponse.fail();
  44. }
  45. }
  46. }

common

和simpleRPC-05一样,可以直接复制过来。

在这里插入图片描述

codec

和simpleRPC-05一样,可以直接复制过来。

在这里插入图片描述

文件结构

simpleRPC-06的文件结构如下:

在这里插入图片描述

在这里插入图片描述

运行

启动TestServer.java :
在这里插入图片描述
然后启动TestClient.java:

在这里插入图片描述
在这里插入图片描述

我们来看看我们最开始开的zookeeper客户端:

现在输入

  1. ls /

在这里插入图片描述

发现我们多了一个结点 MyRPC:

输入

  1. ls /MyRPC

在这里插入图片描述
可以看到我们注册的服务都在这里,成功!

标签: rpc 网络 网络协议

本文转载自: https://blog.csdn.net/fisherish/article/details/122277235
版权归原作者 锥栗 所有, 如有侵权,请联系我们删除。

“【手写一个RPC框架】simpleRPC-06”的评论:

还没有评论