0


Zookeeper实现服务注册/发现

what that?

Zookeeper在分布式开发中使用频繁,但许多框架都对其进行了封装,初学者可能无法较好的理解其工作原理,该文章演示了使用Zookeeper实现服务注册,服务发现的简单demo,希望能达到抛砖引玉的效果;

why need RegisterCenter?

之所以需要访问注册和服务发现是因为分布式系统中,服务之间需要相互调用,但若每个服务自己维护一份依赖的服务信息的话,就显得很麻烦,且自身维护的数据无法保证其实时性,当依赖的服务信息发生变更时,无法及时获取更新,解决方案就是引入一个注册中心,服务提供方将自己的信息写入到注册中心,服务使用方从注册中心来获取服务信息; 如下图:

client表示服务使用方,server表示服务提供方

实现的效果: 客户端可自动发现服务信息,当服务状态发生变化时(上线,下线,更换地址),客户端可以及时响应变化,效果如下图:

效果演示

实现

  1. 首先保证Zookeeper以安装启动,且可以正常访问
  2. 创建Maven项目并添加Zookeeper的Java客户端依赖(注意版本号需>3.6) <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.1</version> </dependency>
  3. 编写服务提供方package com.jerry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.ACL;import java.io.IOException;import java.io.InputStream;import java.net.*;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.Enumeration;import static java.net.InetAddress.getLocalHost;public class UserService { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { new UserService().serving(); } public void serving() throws IOException, KeeperException, InterruptedException { //获取本机ip地址 String ip = null; Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); while (networkInterfaces.hasMoreElements()) { NetworkInterface ni = (NetworkInterface) networkInterfaces.nextElement(); Enumeration<InetAddress> nias = ni.getInetAddresses(); while (nias.hasMoreElements()) { InetAddress ia = (InetAddress) nias.nextElement(); if (!ia.isLinkLocalAddress() && !ia.isLoopbackAddress() && ia instanceof Inet4Address) { ip = ia.getHostAddress(); } } } int port = 8988; //启动服务 ServerSocket socket = new ServerSocket(port); System.out.println("服务器已启动..."); //注册服务 serverRegister(ip, port); //处理请求 clientHandler(socket); } private void clientHandler(ServerSocket socket) throws IOException { while (true) { Socket accept = socket.accept(); InputStream inputStream = accept.getInputStream(); byte[] barr = new byte[1024]; while (true) { int size = inputStream.read(barr); if (size == -1) { //System.out.println("客户端已关闭.."); accept.close(); break; } String s = new String(barr, 0, size); //输出客户端消息 System.out.println(accept.getInetAddress().getHostAddress() + ": " + s); } } } private void serverRegister(String ip, int port) throws IOException, KeeperException, InterruptedException { //注册服务 ZooKeeper zooKeeper = new ZooKeeper("10.211.55.4: 2181",3000, null); try { ArrayList<ACL> acl = new ArrayList<>(); acl.add(new ACL(31, ZooDefs.Ids.ANYONE_ID_UNSAFE)); zooKeeper.create("/userServer", (ip + ":" + port).getBytes(StandardCharsets.UTF_8), acl, CreateMode.EPHEMERAL); System.out.println("服务发布成功!"); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); throw e; } }}
  4. 编写服务服务使用方package com.yyh;import org.apache.zookeeper.*;import java.io.IOException;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.Socket;import java.util.Scanner;public class UserClient implements Watcher { String node = "/userServer"; //服务信息所在的节点 服务提供方和服务消费方一致 private ZooKeeper zooKeeper; String server_ip; int server_port; public static void main(String[] args) throws Exception { //开始服务监听 UserClient userClient = new UserClient(); userClient.run(); //当访问可用时与服务交互 Scanner scanner = new Scanner(System.in); while (true){ System.out.println("输入要发送的信息(e:退出)"); String text = scanner.next(); if (text.equals("e"))System.exit(-1); if (userClient.server_ip == null){ System.err.println("没有可用的服务..."); }else { userClient.sendToServer(text); } } } private void run() throws Exception { //连接zookeeper zooKeeper = new ZooKeeper("10.211.55.4:2181", 3000, null); //尝试获取服务信息 getServerInfo(); //添加对服务信息的永久监听 zooKeeper.addWatch(node,this,AddWatchMode.PERSISTENT); } //获取服务信息 private void getServerInfo() { try { byte[] data = zooKeeper.getData(node, false, null); String[] infos = new String(data).split(":"); server_ip = infos[0]; server_port = Integer.parseInt(infos[1]); System.out.println("获取服务信息成功!"); System.out.println(server_ip+":"+ server_port); } catch (KeeperException e) { System.err.println("服务信息不存在! 等待服务上线........"); } catch (InterruptedException e) { e.printStackTrace(); } } //当节点状态发送变化时将执行该方法(通知处理) @Override public void process(WatchedEvent event) { if (event.getPath().equals(node)) { //根据具体逻辑处理不同的事件类型,此处只关心节点的创建删除和更新 if (event.getType() == Event.EventType.NodeCreated) { System.err.println("服务上线了"); getServerInfo(); } else if (event.getType() == Event.EventType.NodeDataChanged) { System.err.println("服务更新了"); getServerInfo(); }else if (event.getType()== Event.EventType.NodeDeleted){ server_ip = null; server_port = 0; System.err.println("服务下线了"); } } } public void sendToServer(String text) { InetSocketAddress server_address = new InetSocketAddress(server_ip, server_port); Socket socket = new Socket(); try { socket.connect(server_address); //System.out.println("连接服务器成功!"); OutputStream outputStream = socket.getOutputStream(); outputStream.write(text.getBytes()); System.out.println("消息发送成功!"); } catch (IOException e) { e.printStackTrace(); } try { socket.close(); } catch (IOException e) { e.printStackTrace(); } }}
  5. 打包服务端代码,该步骤可忽略,仅为了测试客户端正确性, 为了在打包时附带其全部依赖,此处借助Spring的打包插件,在pom中添加以下内容: <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>1.5.6.RELEASE</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>注意:Spring-boot打包插件会自动获取项目中的主函数,必须保证主函数只有一个,所以需要暂时注释客户端的主函数,最后执行maven的package,得到jar包
  6. 将jar上传至虚拟机并运行java -jar ZookeeperTest-1.0-SNAPSHOT.jar若没有其他问题则客户端依然可以正常连接服务器发送消息;

以上便是使用Zookeeper实现服务注册和服务发现的具体步骤,在实际开发中,我们可能还会将提供的服务部署为集群,这时可将集群中的各个服务信息作为子节点注册到指定节点下,客户端监听该节点变化,获取子节点列表从而获取到服务列表,还可以在此基础上加上负载均衡算法实现对服务列表的合理访问; 如图:

喜欢  0   

本文转载自: https://blog.csdn.net/weixin_30705125/article/details/135959292
版权归原作者 prosoo 所有, 如有侵权,请联系我们删除。

“Zookeeper实现服务注册/发现”的评论:

还没有评论