0


【微服务】Java手写dubbo框架

前言

在对dubbo有了较为深入的使用和理解后,来尝试从dubbo框架的角度重新认识下它,对照着dubbo官方的这张图进行反复的理解后,我们可以从已有掌握的技术出发,来尝试编写一个简单的dubbo实现。

dubbo技术实现

dubbo详细的说明这里就不再一一详述了,重点理解下面这张图

从这张图,可以得出下面几点能够指导我们编码的要点:

  • 服务提供方和服务消费端为两个JVM进程;
  • 服务提供方需要将服务注册到某个地方(注册中心),方便消费方找到服务并调用;
  • 服务消费方从注册中心找到服务后,像调用本地接口一样调用注册中心的服务;

上面三点的补充说明

1、服务提供方和服务消费端为两个JVM进程

这意味着服务提供者和消费者的代码需要分开

2、服务提供方需要将服务注册到某个地方

需要提供一种方式,可以将服务接口注册上去,并被服务消费方的JVM获取到

3、消费方像调用本地接口一样调用注册中心的服务

意味着需要通过某种机制,能够将服务接口的代理实现进行加载,在上图中注意有个关键字 invoke

基于上面的实现思路,接下来让我们编写代码进行实现吧

一、创建maven工程

导入基本的依赖

    <dependencies>

        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.16.Final</version>
        </dependency>

        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-core</artifactId>
            <version>9.0.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>

        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>

        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.51</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-x-discovery</artifactId>
            <version>5.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.8</version>
        </dependency>

    </dependencies>

二、服务提供方编码实现

按照上面的编写思路,服务提供方要实现的功能点主要包括如下几点

  1. 提供服务接口实现;
  2. 作为一个单独的JVM进程,可以考虑 jetty,tomcat,或者netty等;
  3. 能够解析服务消费方传递过来的请求,并找到服务接口的实现,并返回处理结果;

1、服务接口

public interface HelloService {

    String sayHello(String message);

}

2、接口实现

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello(String message) {
        return "hello : " + message;
    }
}

3、使用内嵌式的tomcat容器发布服务

提供一个HttpServer

import org.apache.catalina.*;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.core.StandardEngine;
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.startup.Tomcat;

public class HttpServer {

    public void start(String hostname, Integer port) {

        Tomcat tomcat = new Tomcat();

        Server server = tomcat.getServer();
        Service service = server.findService("Tomcat");

        Connector connector = new Connector();
        connector.setPort(port);

        Engine engine = new StandardEngine();
        engine.setDefaultHost(hostname);

        Host host = new StandardHost();
        host.setName(hostname);

        String contextPath = "";
        Context context = new StandardContext();
        context.setPath(contextPath);
        context.addLifecycleListener(new Tomcat.FixContextListener());

        host.addChild(context);
        engine.addChild(host);

        service.setContainer(engine);
        service.addConnector(connector);

        //将 Tomcat 接收到的所有请求都交给自定义的 DispatcherServlet 来处理
        tomcat.addServlet(contextPath, "dispatcher", new MyServlet());
        context.addServletMappingDecoded("/*", "dispatcher");

        try {
            tomcat.start();
            tomcat.getServer().await();
        } catch (LifecycleException e) {
            e.printStackTrace();
        }

    }

}

4、自定义的Servlet

使用自定义的Servlet ,可以让程序得到一个的扩展

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

public class MyServlet extends HttpServlet {

    @Override
    protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        new HttpServletHandler().handler(req,resp);
    }
}

5、自定义HttpServletHandler

该类用于处理来自服务消费方的请求,并返回结果

import com.congge.framework.Invocation;
import com.congge.framework.resgister.LocalRegister;
import org.apache.commons.io.IOUtils;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class HttpServletHandler {

    public void handler(HttpServletRequest req, HttpServletResponse resp) {

        try {
            Invocation invocation = (Invocation) new ObjectInputStream(req.getInputStream()).readObject();
            String interfaceName = invocation.getInterfaceName();
            Class implClass = LocalRegister.get(interfaceName);
            try {
                Method method = implClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
                try {
                    String result = (String)method.invoke(implClass.newInstance(), invocation.getParams());
                    System.out.println("执行的结果:" + result);
                    IOUtils.write(result,resp.getOutputStream());
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                } catch (InstantiationException e) {
                    e.printStackTrace();
                }
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

}

6、服务注册中心

尽管是一个简单的实现,发布出去的服务接口也需要一个注册中心承载,这里我们先使用一个简单的map结构实现服务注册中心,自定义LocalRegister,简单来说,map的key为接口名,value为实现类

import java.util.HashMap;
import java.util.Map;

public class LocalRegister {

    private static Map<String,Class> map = new HashMap<>();

    public static void register(String interfaceName,Class implClass){
        map.put(interfaceName,implClass);
    }

    public static Class get(String interfaceName){
       return map.get(interfaceName);
    }

}

6、服务提供方启动类


public class Provider {

    public static void main(String[] args) {

        //注册接口
        LocalRegister.register(HelloService.class.getName(),HelloServiceImpl.class);
        HttpServer httpServer = new HttpServer();
        httpServer.start("localhost",8081);

     }

}

启动main程序,这样服务生产方的接口就发布出去了,暴露的端口为8081

三、服务消费方编码实现

照上面的编写思路,服务消费方要实现的功能点主要包括如下几点

  1. 从注册中心获取特定的服务接口并执行调用;
  2. 作为一个单独的JVM进程,可以考虑 jetty,tomcat,或者netty等;
  3. 需要有个类,用于集中处理发起接口请求调用

1、提供一个HttpClient

该类用于发起请求,请求特定的服务接口,试想在真实的调用中,比如是一个springboot的应用,服务消费方要调用生产者的服务,也是要走http协议或者通过netty的方式进行调用的;

import com.congge.framework.Invocation;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

public class HttpClient {

    public String send(String hostname, Integer port, Invocation invocation) {
        String result = null;
        try {
            URL url = new URL("http", hostname, port, "/");
            try {
                HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();

                httpURLConnection.setRequestMethod("POST");
                httpURLConnection.setDoOutput(true);

                OutputStream outputStream = httpURLConnection.getOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(outputStream);
                oos.writeObject(invocation);
                oos.flush();
                InputStream inputStream = httpURLConnection.getInputStream();
                result = IOUtils.toString(inputStream);
                return result;
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (MalformedURLException e) {
            e.printStackTrace();
        }
        return result;
    }
}

2、提供一个用于封装请求参数的对象类

该类封装了从服务消费方发出请求的完整参数对象,比如请求的接口名,方法名,参数类型等

public class Invocation implements Serializable {

    private String interfaceName;  //接口名
    private String methodName;   //方法名
    private Class[] paramTypes;  //方法参数类型列表
    private Object[] params;     //方法参数值列表

    public Invocation(String interfaceName, String methodName, Class[] paramTypes, Object[] params) {
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.paramTypes = paramTypes;
        this.params = params;
    }

    public String getInterfaceName() {
        return interfaceName;
    }

    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }

    public Class[] getParamTypes() {
        return paramTypes;
    }

    public void setParamTypes(Class[] paramTypes) {
        this.paramTypes = paramTypes;
    }
}

3、消费方启动类

import com.congge.framework.Invocation;
import com.congge.framework.ProxyFactory;
import com.congge.framework.http.HttpClient;
import com.congge.service.HelloService;

public class Consumer {

    public static void main(String[] args) {
        HttpClient httpClient = new HttpClient();
        Invocation invocation = new Invocation(HelloService.class.getName(),
                "sayHello",new Class[]{String.class},new Object[]{"jerry"});
        String result = httpClient.send("localhost", 8081, invocation);
        System.out.println(result);

    }

}

启动消费端main程序后,执行服务调用,成功获取到服务提供方接口的响应结果

四、优化改进点

通过上面的案例代码的演示,简单实现了一个dubbo的调用过程,但这这是一个非常简单的实现,从生产者和消费端来看,均存在一些不太合理的地方,下面做几处简单的优化改进;

1、消费方改进

从上面的这一段调用来说,显得比较麻烦,很明显,这一整段代码可以通过一个类似代理工厂的方式进行封装,然后消费端只需要注入服务接口,调用服务接口的方法即可,这才是我们希望看到的;

自定义一个请求代理工厂

我们知道,在使用dubbo的时候,只需要注入服务接口即可,然后消费方就可以直接调用接口中的方法名了,但接口是不能执行的,最终需要一个代理类去执行接口的方法调用,容易想到的就是使用JDK的动态代理的方式来完成这件事,该类要做的事情就是这些;

这里我们先考虑用最简单的方式实现(先不考虑使用分布式注册中心)

import com.congge.framework.http.HttpClient;
import com.congge.framework.resgister.FileMapRegister;
import com.congge.service.HelloService;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;

public class ProxyFactory {

    @SuppressWarnings("unchecked")
    public static <T> T getProxy(Class interfaceClass) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Invocation invocation = new Invocation(
                        interfaceClass.getName(),
                        method.getName(),
                        method.getParameterTypes(),
                        args);
                HttpClient httpClient = new HttpClient();
                //端口等信息的获取
                String result = httpClient.send("localhost", 8081, invocation);

                //聪注册中心获取
                //URL url = RemoteMapRegistry.get(interfaceClass.getName()).get(0);

                //方式一:Consumer从本地文件获取Provider地址
                //URL url = FileMapRegister.getURL(interfaceClass.getName());

                //方式二:Consumer从Zookeeper获取Provider地址
                //URL url = ZookeeperRegister.getURL(interfaceClass.getName());

                //String result = httpClient.send(url.getHostname(), url.getPort(), invocation);
                return result;
            }
        });
    }

}

这样改进之后,那么在启动类中就可以像下面这样调用

        HelloService helloService = ProxyFactory.getProxy(HelloService.class);
        String result = helloService.sayHello("zhangfei");
        System.out.println(result);

2、服务提供方改进

从上面的服务提供方一侧来看,存在一个比较明显的问题是,服务注册的时候,服务接口的注册并不是注册到分布式注册中心上面,由于 provider和consumer是两个JVM进程,consumer想要从pr注册中心拿到服务接口信息,这样是行不通的;

基于这个问题,从provider一侧来说,需要将服务注册到分布式注册中心上面去;

这里为了演示方便,我们提供两种改进方式,provider在启动的时候,将服务信息注册到本地的文件中,和注册到zk上;

提供一个URL类,记录provider的host和port信息

import java.io.Serializable;

public class URL implements Serializable {

    private String hostname;
    private Integer port;

    public URL(String hostname, Integer port) {
        this.hostname = hostname;
        this.port = port;
    }

    public String getHostname() {
        return hostname;
    }

    public void setHostname(String hostname) {
        this.hostname = hostname;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    @Override
    public String toString() {
        return "URL{" +
                "hostname='" + hostname + '\'' +
                ", port=" + port +
                '}';
    }

}

提供一个FileMapRegister类,模拟将服务接口写入到本地文件

import java.io.*;
import java.util.HashMap;
import java.util.Map;

public class FileMapRegister {

    private static Map<String, String> REGISTER = new HashMap<>();

    public static void regist(String interfaceName, String implClass, String url) {
        REGISTER.put(interfaceName, implClass + "::" + url);
        saveFile();
    }

    public static URL getURL(String interfaceName) {
        REGISTER = getFile();
        String[] s = REGISTER.get(interfaceName).split("::")[1].split(":");
        URL url = new URL(s[0],Integer.parseInt(s[1]));
        return url;
    }

    public static Class getImplClass(String interfaceName) throws ClassNotFoundException {
        REGISTER = getFile();
        return Class.forName(REGISTER.get(interfaceName).split("::")[0]);
    }

    public static void saveFile() {
        try {
            FileOutputStream fileOutputStream = new FileOutputStream("D:\\temp.txt");
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
            objectOutputStream.writeObject(REGISTER);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static Map<String, String> getFile() {
        try {
            FileInputStream fileInputStream = new FileInputStream("D:\\temp.txt");
            ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
            return (Map<String, String>) objectInputStream.readObject();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }

}

提供一个ZookeeperRegister 类,可以将服务信息注册到zk

import com.congge.framework.URL;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.HashMap;
import java.util.Map;
public class ZookeeperRegister {

    static CuratorFramework client;

    static Map<String, String> UrlCache = new HashMap<>();

    static {
        client = CuratorFrameworkFactory
                .newClient("localhost:2181", new RetryNTimes(3, 1000));
        client.start();

    }

    private static Map<String, String> REGISTER = new HashMap<>();

    //Provider注册服务
    public static void regist(String interfaceName, String implClass, String url) {
        try {
            Stat stat = client.checkExists().forPath(String.format("/dubbo/service/%s", interfaceName));
            if(stat != null){
                client.delete().forPath(String.format("/dubbo/service/%s", interfaceName));
            }
            String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(String.format("/dubbo/service/%s", interfaceName),(implClass + "::" + url).getBytes());
            System.out.println("Provier服务注册: " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    //获取Provider URL
    public static URL getURL(String interfaceName) {
        URL url = null;
        String urlString = null;
        //先查询缓存
        if (UrlCache.containsKey(interfaceName)) {
            urlString = UrlCache.get(interfaceName);

        } else {
            try {
                byte[] bytes = client.getData().forPath(String.format("/dubbo/service/%s", interfaceName));
                urlString = new String(bytes);

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        String host = urlString.split("::")[1].split(":")[0];
        String port = urlString.split("::")[1].split(":")[1];
        return new URL(host,Integer.parseInt(port));
    }

    //获取Provider实现类
    public static Class getImplClass(String interfaceName) throws Exception {
        byte[] bytes = client.getData().forPath(String.format("/dubbo/service/%s", interfaceName));
        String urlString = new String(bytes);
        return Class.forName(urlString.split("::")[0]);
    }
}

改造provider的启动类

服务启动时,将服务相关信息注册到本地文件

public class Provider {

    public static void main(String[] args) {

        //注册接口
        LocalRegister.register(HelloService.class.getName(),HelloServiceImpl.class);

        //注册服务的端口等信息
        //URL url = new URL("localhost",8081);
        //RemoteMapRegistry.register(HelloService.class.getName(), url);

        FileMapRegister.regist(HelloService.class.getName(),HelloServiceImpl.class.getName(),"localhost:8081");

        HttpServer httpServer = new HttpServer();
        httpServer.start("localhost",8081);

     }

}

改造consumer端的ProxyFactory类

将从本地的文件中获取服务接口相关信息

public class ProxyFactory {

    @SuppressWarnings("unchecked")
    public static <T> T getProxy(Class interfaceClass) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Invocation invocation = new Invocation(
                        interfaceClass.getName(),
                        method.getName(),
                        method.getParameterTypes(),
                        args);
                HttpClient httpClient = new HttpClient();
                

                //方式一:Consumer从本地文件获取Provider地址
                //URL url = FileMapRegister.getURL(interfaceClass.getName());

                //方式二:Consumer从Zookeeper获取Provider地址
                //URL url = ZookeeperRegister.getURL(interfaceClass.getName());

                String result = httpClient.send(url.getHostname(), url.getPort(), invocation);
                return result;
            }
        });
    }

}

以上改造完毕后,再次启动provider和consumer,观察运行效果,consumer仍然可以正确拿到结果;

关于改造点,可以延续着这个思路继续进行下去,比如注册中心使用zk或redis,下面提几点以供参考:

  • 消费端的服务调用容错处理,假如调用接口超时?或者异常?
  • 消费端的服务重试;
  • 消费端的负载均衡策略如何指定?
  • 将嵌入式tomcat容器改为netty;

本文转载自: https://blog.csdn.net/zhangcongyi420/article/details/127027398
版权归原作者 小码农叔叔 所有, 如有侵权,请联系我们删除。

“【微服务】Java手写dubbo框架”的评论:

还没有评论