前言
在对dubbo有了较为深入的使用和理解后,来尝试从dubbo框架的角度重新认识下它,对照着dubbo官方的这张图进行反复的理解后,我们可以从已有掌握的技术出发,来尝试编写一个简单的dubbo实现。
dubbo技术实现
dubbo详细的说明这里就不再一一详述了,重点理解下面这张图
从这张图,可以得出下面几点能够指导我们编码的要点:
- 服务提供方和服务消费端为两个JVM进程;
- 服务提供方需要将服务注册到某个地方(注册中心),方便消费方找到服务并调用;
- 服务消费方从注册中心找到服务后,像调用本地接口一样调用注册中心的服务;
上面三点的补充说明
1、服务提供方和服务消费端为两个JVM进程
这意味着服务提供者和消费者的代码需要分开
2、服务提供方需要将服务注册到某个地方
需要提供一种方式,可以将服务接口注册上去,并被服务消费方的JVM获取到
3、消费方像调用本地接口一样调用注册中心的服务
意味着需要通过某种机制,能够将服务接口的代理实现进行加载,在上图中注意有个关键字 invoke
基于上面的实现思路,接下来让我们编写代码进行实现吧
一、创建maven工程
导入基本的依赖
<dependencies>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>9.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.8</version>
</dependency>
</dependencies>
二、服务提供方编码实现
按照上面的编写思路,服务提供方要实现的功能点主要包括如下几点
- 提供服务接口实现;
- 作为一个单独的JVM进程,可以考虑 jetty,tomcat,或者netty等;
- 能够解析服务消费方传递过来的请求,并找到服务接口的实现,并返回处理结果;
1、服务接口
public interface HelloService {
String sayHello(String message);
}
2、接口实现
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String message) {
return "hello : " + message;
}
}
3、使用内嵌式的tomcat容器发布服务
提供一个HttpServer
import org.apache.catalina.*;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.core.StandardEngine;
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.startup.Tomcat;
public class HttpServer {
public void start(String hostname, Integer port) {
Tomcat tomcat = new Tomcat();
Server server = tomcat.getServer();
Service service = server.findService("Tomcat");
Connector connector = new Connector();
connector.setPort(port);
Engine engine = new StandardEngine();
engine.setDefaultHost(hostname);
Host host = new StandardHost();
host.setName(hostname);
String contextPath = "";
Context context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());
host.addChild(context);
engine.addChild(host);
service.setContainer(engine);
service.addConnector(connector);
//将 Tomcat 接收到的所有请求都交给自定义的 DispatcherServlet 来处理
tomcat.addServlet(contextPath, "dispatcher", new MyServlet());
context.addServletMappingDecoded("/*", "dispatcher");
try {
tomcat.start();
tomcat.getServer().await();
} catch (LifecycleException e) {
e.printStackTrace();
}
}
}
4、自定义的Servlet
使用自定义的Servlet ,可以让程序得到一个的扩展
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
public class MyServlet extends HttpServlet {
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
new HttpServletHandler().handler(req,resp);
}
}
5、自定义HttpServletHandler
该类用于处理来自服务消费方的请求,并返回结果
import com.congge.framework.Invocation;
import com.congge.framework.resgister.LocalRegister;
import org.apache.commons.io.IOUtils;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class HttpServletHandler {
public void handler(HttpServletRequest req, HttpServletResponse resp) {
try {
Invocation invocation = (Invocation) new ObjectInputStream(req.getInputStream()).readObject();
String interfaceName = invocation.getInterfaceName();
Class implClass = LocalRegister.get(interfaceName);
try {
Method method = implClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
try {
String result = (String)method.invoke(implClass.newInstance(), invocation.getParams());
System.out.println("执行的结果:" + result);
IOUtils.write(result,resp.getOutputStream());
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
}
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
6、服务注册中心
尽管是一个简单的实现,发布出去的服务接口也需要一个注册中心承载,这里我们先使用一个简单的map结构实现服务注册中心,自定义LocalRegister,简单来说,map的key为接口名,value为实现类
import java.util.HashMap;
import java.util.Map;
public class LocalRegister {
private static Map<String,Class> map = new HashMap<>();
public static void register(String interfaceName,Class implClass){
map.put(interfaceName,implClass);
}
public static Class get(String interfaceName){
return map.get(interfaceName);
}
}
6、服务提供方启动类
public class Provider {
public static void main(String[] args) {
//注册接口
LocalRegister.register(HelloService.class.getName(),HelloServiceImpl.class);
HttpServer httpServer = new HttpServer();
httpServer.start("localhost",8081);
}
}
启动main程序,这样服务生产方的接口就发布出去了,暴露的端口为8081
三、服务消费方编码实现
照上面的编写思路,服务消费方要实现的功能点主要包括如下几点
- 从注册中心获取特定的服务接口并执行调用;
- 作为一个单独的JVM进程,可以考虑 jetty,tomcat,或者netty等;
- 需要有个类,用于集中处理发起接口请求调用
1、提供一个HttpClient
该类用于发起请求,请求特定的服务接口,试想在真实的调用中,比如是一个springboot的应用,服务消费方要调用生产者的服务,也是要走http协议或者通过netty的方式进行调用的;
import com.congge.framework.Invocation;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
public class HttpClient {
public String send(String hostname, Integer port, Invocation invocation) {
String result = null;
try {
URL url = new URL("http", hostname, port, "/");
try {
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setRequestMethod("POST");
httpURLConnection.setDoOutput(true);
OutputStream outputStream = httpURLConnection.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(outputStream);
oos.writeObject(invocation);
oos.flush();
InputStream inputStream = httpURLConnection.getInputStream();
result = IOUtils.toString(inputStream);
return result;
} catch (IOException e) {
e.printStackTrace();
}
} catch (MalformedURLException e) {
e.printStackTrace();
}
return result;
}
}
2、提供一个用于封装请求参数的对象类
该类封装了从服务消费方发出请求的完整参数对象,比如请求的接口名,方法名,参数类型等
public class Invocation implements Serializable {
private String interfaceName; //接口名
private String methodName; //方法名
private Class[] paramTypes; //方法参数类型列表
private Object[] params; //方法参数值列表
public Invocation(String interfaceName, String methodName, Class[] paramTypes, Object[] params) {
this.interfaceName = interfaceName;
this.methodName = methodName;
this.paramTypes = paramTypes;
this.params = params;
}
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
public Class[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class[] paramTypes) {
this.paramTypes = paramTypes;
}
}
3、消费方启动类
import com.congge.framework.Invocation;
import com.congge.framework.ProxyFactory;
import com.congge.framework.http.HttpClient;
import com.congge.service.HelloService;
public class Consumer {
public static void main(String[] args) {
HttpClient httpClient = new HttpClient();
Invocation invocation = new Invocation(HelloService.class.getName(),
"sayHello",new Class[]{String.class},new Object[]{"jerry"});
String result = httpClient.send("localhost", 8081, invocation);
System.out.println(result);
}
}
启动消费端main程序后,执行服务调用,成功获取到服务提供方接口的响应结果
四、优化改进点
通过上面的案例代码的演示,简单实现了一个dubbo的调用过程,但这这是一个非常简单的实现,从生产者和消费端来看,均存在一些不太合理的地方,下面做几处简单的优化改进;
1、消费方改进
从上面的这一段调用来说,显得比较麻烦,很明显,这一整段代码可以通过一个类似代理工厂的方式进行封装,然后消费端只需要注入服务接口,调用服务接口的方法即可,这才是我们希望看到的;
自定义一个请求代理工厂
我们知道,在使用dubbo的时候,只需要注入服务接口即可,然后消费方就可以直接调用接口中的方法名了,但接口是不能执行的,最终需要一个代理类去执行接口的方法调用,容易想到的就是使用JDK的动态代理的方式来完成这件事,该类要做的事情就是这些;
这里我们先考虑用最简单的方式实现(先不考虑使用分布式注册中心)
import com.congge.framework.http.HttpClient;
import com.congge.framework.resgister.FileMapRegister;
import com.congge.service.HelloService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
public class ProxyFactory {
@SuppressWarnings("unchecked")
public static <T> T getProxy(Class interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation(
interfaceClass.getName(),
method.getName(),
method.getParameterTypes(),
args);
HttpClient httpClient = new HttpClient();
//端口等信息的获取
String result = httpClient.send("localhost", 8081, invocation);
//聪注册中心获取
//URL url = RemoteMapRegistry.get(interfaceClass.getName()).get(0);
//方式一:Consumer从本地文件获取Provider地址
//URL url = FileMapRegister.getURL(interfaceClass.getName());
//方式二:Consumer从Zookeeper获取Provider地址
//URL url = ZookeeperRegister.getURL(interfaceClass.getName());
//String result = httpClient.send(url.getHostname(), url.getPort(), invocation);
return result;
}
});
}
}
这样改进之后,那么在启动类中就可以像下面这样调用
HelloService helloService = ProxyFactory.getProxy(HelloService.class);
String result = helloService.sayHello("zhangfei");
System.out.println(result);
2、服务提供方改进
从上面的服务提供方一侧来看,存在一个比较明显的问题是,服务注册的时候,服务接口的注册并不是注册到分布式注册中心上面,由于 provider和consumer是两个JVM进程,consumer想要从pr注册中心拿到服务接口信息,这样是行不通的;
基于这个问题,从provider一侧来说,需要将服务注册到分布式注册中心上面去;
这里为了演示方便,我们提供两种改进方式,provider在启动的时候,将服务信息注册到本地的文件中,和注册到zk上;
提供一个URL类,记录provider的host和port信息
import java.io.Serializable;
public class URL implements Serializable {
private String hostname;
private Integer port;
public URL(String hostname, Integer port) {
this.hostname = hostname;
this.port = port;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
@Override
public String toString() {
return "URL{" +
"hostname='" + hostname + '\'' +
", port=" + port +
'}';
}
}
提供一个FileMapRegister类,模拟将服务接口写入到本地文件
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class FileMapRegister {
private static Map<String, String> REGISTER = new HashMap<>();
public static void regist(String interfaceName, String implClass, String url) {
REGISTER.put(interfaceName, implClass + "::" + url);
saveFile();
}
public static URL getURL(String interfaceName) {
REGISTER = getFile();
String[] s = REGISTER.get(interfaceName).split("::")[1].split(":");
URL url = new URL(s[0],Integer.parseInt(s[1]));
return url;
}
public static Class getImplClass(String interfaceName) throws ClassNotFoundException {
REGISTER = getFile();
return Class.forName(REGISTER.get(interfaceName).split("::")[0]);
}
public static void saveFile() {
try {
FileOutputStream fileOutputStream = new FileOutputStream("D:\\temp.txt");
ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
objectOutputStream.writeObject(REGISTER);
} catch (IOException e) {
e.printStackTrace();
}
}
public static Map<String, String> getFile() {
try {
FileInputStream fileInputStream = new FileInputStream("D:\\temp.txt");
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
return (Map<String, String>) objectInputStream.readObject();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
提供一个ZookeeperRegister 类,可以将服务信息注册到zk
import com.congge.framework.URL;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.HashMap;
import java.util.Map;
public class ZookeeperRegister {
static CuratorFramework client;
static Map<String, String> UrlCache = new HashMap<>();
static {
client = CuratorFrameworkFactory
.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();
}
private static Map<String, String> REGISTER = new HashMap<>();
//Provider注册服务
public static void regist(String interfaceName, String implClass, String url) {
try {
Stat stat = client.checkExists().forPath(String.format("/dubbo/service/%s", interfaceName));
if(stat != null){
client.delete().forPath(String.format("/dubbo/service/%s", interfaceName));
}
String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(String.format("/dubbo/service/%s", interfaceName),(implClass + "::" + url).getBytes());
System.out.println("Provier服务注册: " + result);
} catch (Exception e) {
e.printStackTrace();
}
}
//获取Provider URL
public static URL getURL(String interfaceName) {
URL url = null;
String urlString = null;
//先查询缓存
if (UrlCache.containsKey(interfaceName)) {
urlString = UrlCache.get(interfaceName);
} else {
try {
byte[] bytes = client.getData().forPath(String.format("/dubbo/service/%s", interfaceName));
urlString = new String(bytes);
} catch (Exception e) {
e.printStackTrace();
}
}
String host = urlString.split("::")[1].split(":")[0];
String port = urlString.split("::")[1].split(":")[1];
return new URL(host,Integer.parseInt(port));
}
//获取Provider实现类
public static Class getImplClass(String interfaceName) throws Exception {
byte[] bytes = client.getData().forPath(String.format("/dubbo/service/%s", interfaceName));
String urlString = new String(bytes);
return Class.forName(urlString.split("::")[0]);
}
}
改造provider的启动类
服务启动时,将服务相关信息注册到本地文件
public class Provider {
public static void main(String[] args) {
//注册接口
LocalRegister.register(HelloService.class.getName(),HelloServiceImpl.class);
//注册服务的端口等信息
//URL url = new URL("localhost",8081);
//RemoteMapRegistry.register(HelloService.class.getName(), url);
FileMapRegister.regist(HelloService.class.getName(),HelloServiceImpl.class.getName(),"localhost:8081");
HttpServer httpServer = new HttpServer();
httpServer.start("localhost",8081);
}
}
改造consumer端的ProxyFactory类
将从本地的文件中获取服务接口相关信息
public class ProxyFactory {
@SuppressWarnings("unchecked")
public static <T> T getProxy(Class interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation(
interfaceClass.getName(),
method.getName(),
method.getParameterTypes(),
args);
HttpClient httpClient = new HttpClient();
//方式一:Consumer从本地文件获取Provider地址
//URL url = FileMapRegister.getURL(interfaceClass.getName());
//方式二:Consumer从Zookeeper获取Provider地址
//URL url = ZookeeperRegister.getURL(interfaceClass.getName());
String result = httpClient.send(url.getHostname(), url.getPort(), invocation);
return result;
}
});
}
}
以上改造完毕后,再次启动provider和consumer,观察运行效果,consumer仍然可以正确拿到结果;
关于改造点,可以延续着这个思路继续进行下去,比如注册中心使用zk或redis,下面提几点以供参考:
- 消费端的服务调用容错处理,假如调用接口超时?或者异常?
- 消费端的服务重试;
- 消费端的负载均衡策略如何指定?
- 将嵌入式tomcat容器改为netty;
版权归原作者 小码农叔叔 所有, 如有侵权,请联系我们删除。