0


zookeeper节点类型,整合代码实现服务器动态监听

节点类型

持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除

可分为无序号和有序号的,顺序号可以被用于 为所有的事件进行全局排序,这样客户端可以通 过顺序号推断事件的顺序

短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除

也可分为无序号和有序号的

Linux中创建节点

创建永久节点名称为jiedian1,值为dadada,无序号

create /jiedian1 "dadada"

创建子节点

create /jiedian1/zijiedian "dadada"

获取节点值

get -s /jiedian1

创建带序号的节点 只需要加-s

create -s /jiedian1/zijiedian "xuhao"

创建临时

create -e /jiedian2 "linshi"

临时带序号

create -e -s /jiedian2 "linshixuhao"

此时 quit 退出后,临时节点会消失

修改节点

set /jiedian1 "123"

监听器原理

监听那个节点数据变化,若变化通知客户端

1)首先要有一个main()线程
2)在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3)通过connect线程将注册的监听事件发送给Zookeeper。
4)在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
5)Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
6)listener线程内部调用了process()方法。

使用命令行操作,注册一次监听一次

get -w /jiedian1

此时用其他机器对节点进行修改,此时就会通知当前客户端

整合代码

创建maven工程,添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>ch.qos.logback</groupId>
  9. <artifactId>logback-core</artifactId>
  10. <version>1.2.5</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.zookeeper</groupId>
  14. <artifactId>zookeeper</artifactId>
  15. <version>3.5.7</version>
  16. </dependency>
  17. </dependencies>

配置日志文件,在resource下创建log4j.properties

  1. log4j.rootLogger=INFO, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

编写测试类

  1. public class zkClient {
  2. //不能有空格
  3. private static String connectString = "192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
  4. //超时时间
  5. private static int sessionTimeout = 2000;
  6. private ZooKeeper zkClient = null;
  7. //接连集群
  8. @Before
  9. public void init() throws Exception {
  10. zkClient = new ZooKeeper(connectString, sessionTimeout, new
  11. Watcher() {
  12. @Override
  13. public void process(WatchedEvent watchedEvent) {
  14. // 收到事件通知后的回调函数(用户的业务逻辑)
  15. System.out.println(watchedEvent.getType() + "--"
  16. + watchedEvent.getPath());
  17. // 再次启动监听
  18. try {
  19. List<String> children = zkClient.getChildren("/",
  20. true);
  21. for (String child : children) {
  22. System.out.println(child);
  23. }
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. });
  29. }
  30. // 创建子节点
  31. @Test
  32. public void create() throws Exception {
  33. // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
  34. String nodeCreated = zkClient.create("/lzq",
  35. "jiedain".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
  36. CreateMode.PERSISTENT);
  37. }
  38. // 获取子节点并监听 注册一次生效一次
  39. @Test
  40. public void getChildren() throws Exception {
  41. List<String> children = zkClient.getChildren("/", true);
  42. for (String child : children) {
  43. System.out.println(child);
  44. }
  45. // 延时阻塞
  46. Thread.sleep(Long.MAX_VALUE);
  47. }
  48. }

写数据流程

写流程之写入请求直接发送给Leader节点,直接写,然后同步到从节点,当超过半数写完,返回给客户端

写流程之写入请求发送给follower节点,通知主节点写。。。。

服务器动态上下线监听

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知 到主节点服务器的上下线

先在集群上创建/servers 节点

create /servers "servers"

IDEA中,服务器端向 Zookeeper 注册代码

  1. import java.io.IOException;
  2. import org.apache.zookeeper.CreateMode;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;
  6. import org.apache.zookeeper.ZooDefs.Ids;
  7. public class DistributeServer {
  8. private static String connectString =
  9. "192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
  10. private static int sessionTimeout = 2000;
  11. private ZooKeeper zk = null;
  12. private String parentNode = "/servers";
  13. // 创建到 zk 的客户端连接
  14. public void getConnect() throws IOException{
  15. zk = new ZooKeeper(connectString, sessionTimeout, new
  16. Watcher() {
  17. @Override
  18. public void process(WatchedEvent event) {
  19. }
  20. });
  21. }
  22. // 注册服务器
  23. public void registServer(String hostname) throws Exception{
  24. String create = zk.create(parentNode + "/server",
  25. hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
  26. CreateMode.EPHEMERAL_SEQUENTIAL);
  27. System.out.println(hostname +" is online "+ create);
  28. }
  29. // 业务功能
  30. public void business(String hostname) throws Exception{
  31. System.out.println(hostname + " is working ...");
  32. Thread.sleep(Long.MAX_VALUE);
  33. }
  34. public static void main(String[] args) throws Exception {
  35. // 1 获取 zk 连接
  36. DistributeServer server = new DistributeServer();
  37. server.getConnect();
  38. // 2 利用 zk 连接注册服务器信息
  39. server.registServer(args[0]);
  40. // 3 启动业务功能
  41. server.business(args[0]);
  42. }
  43. }

客户端

  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.zookeeper.WatchedEvent;
  5. import org.apache.zookeeper.Watcher;
  6. import org.apache.zookeeper.ZooKeeper;
  7. public class DistributeClient {
  8. private static String connectString =
  9. "192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
  10. private static int sessionTimeout = 2000;
  11. private ZooKeeper zk = null;
  12. private String parentNode = "/servers";
  13. // 创建到 zk 的客户端连接
  14. public void getConnect() throws IOException {
  15. zk = new ZooKeeper(connectString, sessionTimeout, new
  16. Watcher() {
  17. @Override
  18. public void process(WatchedEvent event) {
  19. // 再次启动监听
  20. try {
  21. getServerList();
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. });
  27. }
  28. // 获取服务器列表信息
  29. public void getServerList() throws Exception {
  30. // 1 获取服务器子节点信息,并且对父节点进行监听
  31. List<String> children = zk.getChildren(parentNode, true);
  32. // 2 存储服务器信息列表
  33. ArrayList<String> servers = new ArrayList<>();
  34. // 3 遍历所有节点,获取节点中的主机名称信息
  35. for (String child : children) {
  36. byte[] data = zk.getData(parentNode + "/" + child,
  37. false, null);
  38. servers.add(new String(data));
  39. }
  40. // 4 打印服务器列表信息
  41. System.out.println(servers);
  42. }
  43. // 业务功能
  44. public void business() throws Exception{
  45. System.out.println("client is working ...");
  46. Thread.sleep(Long.MAX_VALUE);
  47. }
  48. public static void main(String[] args) throws Exception {
  49. // 1 获取 zk 连接
  50. DistributeClient client = new DistributeClient();
  51. client.getConnect();
  52. // 2 获取 servers 的子节点信息,从中获取服务器信息列表
  53. client.getServerList();
  54. // 3 业务进程启动
  55. client.business();
  56. }
  57. }

此时修改或创建节点就会在控制台输出


本文转载自: https://blog.csdn.net/weixin_52210557/article/details/123437492
版权归原作者 远走于梦游 所有, 如有侵权,请联系我们删除。

“zookeeper节点类型,整合代码实现服务器动态监听”的评论:

还没有评论