0


ZooKeeper

一、概述

1、为什么使用zk

随着应用规模的迅速扩张,单台机器的部署已经难以支撑用户大规模、高并发的请求了, 因此服务化、集群化、分布式概念应运而生。 针对这种场景,人们通常使用的做法就是将软件按照模块进行拆分,形成独立的子系统,然后在局域网内部署到多台机器上面, 形成了一个集群。 这种方式即可以分滩请求压力,又可以起到灾备的效果。

然而, 集群的维护和多节点应用程序的协作运行远比单机模式复杂,需要顾及到的细节问题实在太多,比如说同一分配置在多台机器上的同步, 客户端程序实时感知服务机状态,应用与应用之间的公共资源的互斥访问等等一系列的问题。 如果这些问题都依靠开发人员或维护人员去解决的话, 非旦消耗人力,而且也达不到实时准确的效果。

所幸的是,zookeeper能够给我们非常完美的解决这些问题,zookeeper天生的就是为解决分布式协调服务这个问题而来, 应用zookeeper,能够非常好的解决如下问题:

  • 配置信息同步
  • 分布式锁控制
  • 消息的发布与订阅(典型的生产者消费者模型)
  • 集群内节点状态的快速感知

2、概念

大数据生态系统里的很多组件的命名都是某种动物或者昆虫, 比如hadoop就是大象,hive是蜜蜂。zookeeper即动物园管理者, 顾名思义就是管理大数据生态系统各组件的管理员

Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。

3、节点模型

每个目录称为一个znode,具有唯一的路径标识

每个znode可以存储1MB数据

每个znode是有版本的,每个znode存储的数据可以设置版本,也就是说一个访问路径可以有多份数据

znode可以被监控包括这个目录节点数据修改以及子节点变化,一旦变化可以通知设置监控的客户端

4、特点

5、应用场景

提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

统一命名服务

统一配置管理

统一集群管理

服务动态上下线

负载均衡管理

二、安装

1、window安装

官网下载:Apache ZooKeeper

找到页面中的in the achive链接打开

打开选择要下载的版本,这里选择3.7.0,注意从3.5.7开始要下载带bin的tar.gz

即apache-zookeeper-3.7.0-bin.tar.gz

下载下来解压,解压工具解压不了使用window自带的powershell解压即可

进入安装目录下的bin直接执行zkServer.cmd命令启动发现有错误,原因是默认配置是liunx配置,所以需要改配置

打开conf目录,将zoo_sample.cfg文件复制一份,文件名zoo.cfg

并且对zoo.cfg文件中的dataDir修改 dataDir=../data

然后在bin同级目录下创建data文件夹

进入bin目录,cmd中执行zkServer.cmd命令启动zk,然后重新打开一个cmd执行zkCli.cmd命令

查看根节点下的目录ls /

2、linux安装

下载:https://archive.apache.org/dist/zookeeper/

解压到/opt/module

tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

修改conf

#(1)将/opt/module/zookeeper-3.4.10/conf这个路径下的zoo_sample.cfg修改为zoo.cfg;
[atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg

#(2)打开zoo.cfg文件,修改dataDir路径:
[atguigu@hadoop102 zookeeper-3.4.10]$ vim zoo.cfg
修改如下内容:
dataDir=/opt/module/zookeeper-3.4.10/zkData

#(3)在/opt/module/zookeeper-3.4.10/这个目录上创建zkData文件夹
[atguigu@hadoop102 zookeeper-3.4.10]$ mkdir zkData # 默认不会给你创建

启动

#(1)启动Zookeeper
[atguigu@hadoop102 zookeeper-3.4.10]$  /opt/module/zookeeper-3.4.10/bin/zkServer.sh start

#(2)查看进程是否启动
[atguigu@hadoop102 zookeeper-3.4.10]$ jps
4020 Jps
4001 QuorumPeerMain # 这个是

#(3)查看状态:
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh status

ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: standalone

#(4)启动客户端:
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkCli.sh

#(5)退出客户端:
[zk: localhost:2181(CONNECTED) 0] quit

#(6)停止Zookeeper
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh stop

3、集群安装

1.集群规划
在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper。

2.解压安装
#(1)解压Zookeeper安装包到/opt/module/目录下
[atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
#(2)同步/opt/module/zookeeper-3.4.10目录内容到hadoop103、hadoop104
[atguigu@hadoop102 module]$ xsync zookeeper-3.4.10/

3.配置服务器编号
#(1)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData
[atguigu@hadoop102 zookeeper-3.4.10]$ mkdir -p zkData
#(2)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件
[atguigu@hadoop102 zkData]$ touch myid
#添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码

#(3)编辑myid文件
[atguigu@hadoop102 zkData]$ vi myid
#在文件中添加与server对应的编号:
2

#(4)拷贝配置好的zookeeper到其他机器上
[atguigu@hadoop102 zkData]$ xsync myid
并分别在hadoop102、hadoop103上修改myid文件中内容为3、4

4.配置zoo.cfg文件
#(1)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg
[atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
[atguigu@hadoop102 conf]$ vim zoo.cfg
#修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.4.10/zkData
# client端口为2181,即如kafka用2181与zk交互
clientPort=2181
#增加如下配置
#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
#server.第几号服务器=服务器的ip:服务器与集群leader交互端口2888:选举leader的端口3888
# 这个第几号服务器是上面myid的内容

#(3)同步zoo.cfg配置文件
[atguigu@hadoop102 conf]$ xsync zoo.cfg
#(4)配置参数解读

集群操作

#(1)分别启动Zookeeper
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
#(2)查看状态
[atguigu@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower

[atguigu@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader

[atguigu@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower

# 也可以通过以下方式加入到集群
./zkCli.sh ‐server 192.168.60.130:2181
./zkCli.sh ‐server 192.168.60.130:2182
./zkCli.sh ‐server 192.168.60.130:2183

注:配置参数

Zookeeper中的配置文件zoo.cfg中参数含义解读如下:

1.tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒

Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。

它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)

2.initLimit =10:LF初始通信时限

集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。

3.syncLimit =5:LF同步通信时限

集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。

4.dataDir:数据文件目录+数据持久化路径

主要用于保存Zookeeper中的数据。

5.clientPort =2181:客户端连接端口,即如kafka用2181与zk交互

监听客户端连接的端口。

6 、server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

4、zk客户端可视化工具

4.1、ZooInspector

下载地址

https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
解压后进入目录ZooInspector\build,运行zookeeper-dev-ZooInspector.jar

4.2、taokeeper

基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件,
安装前要求服务前先配置nc 和 sshd
1.下载数据库脚本

wget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql

三、命令行操作

1、节点类型

zookeeper中的节点有两种, 分别为临时节点和永久节点。 节点的类型在创建时即被确定, 并且不能改变。

PERSISTENT–持久化目录节点:客户端与zookeeper断开连接后,该节点依旧存在;
PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点:客户端与zookeeper断开连接后,该节点依旧存在,允许重复创建相同key,只是Zookeeper给该节点名称进行顺序编号;
EPHEMERAL-临时目录节点:客户端与zookeeper断开连接后,该节点被删除。。 虽然每个临时的Znode都会绑定到一个客户端会话, 但他们对所有的客户端还是可见的。另外, ZooKeeper的临时节点不允许拥有子节点。;
EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点:客户端与zookeeper断开连接后(一旦会话(Session)结束),该节点被删除,允许重复创建相同key,只是Zookeeper给该节点名称进行顺序编号;

2、zookeeper常用shell命令

2.1、命令概述

2.2、查看当前znode信息

#查看当前znode信息
[zk: localhost:2181(CONNECTED) 3] ls /
[dubbo, sanguo0000000003, services, zookeeper]

2.3、node详情

ls -s /命令或者stat / 命令都行

[zk: localhost:2181(CONNECTED) 2] ls -s /
[dubbo, sanguo0000000003, services, zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x66
cversion = 4
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 4

1)czxid-创建节点的事务zxid

每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。

事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。

2)ctime - znode被创建的毫秒数(从1970年开始)

3)mzxid - znode最后更新的事务zxid

4)mtime - znode最后修改的毫秒数(从1970年开始)

5)pZxid-znode最后更新的子节点zxid

6)cversion - znode子节点变化号,znode子节点修改次数

7)dataversion - znode数据变化号

8)aclVersion - znode访问控制列表的变化号

9)ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。

10)dataLength- znode的数据长度

11)numChildren - znode子节点数量

2.4、创建无序永久节点

[zk: localhost:2181(CONNECTED) 3] create /sanguo/shuguo "liubei"

2.5、创建有序永久节点

[zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000000
[zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000001
[zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu"
Created /sanguo/weiguo/xuchu0000000002

如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。

2.6、创建 无序号临时节点

[zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu"

当前客户端可以查看值 ,退出客户端重新进入值删除了

2.7、创建有序号临时节点

[zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu"

2.8、修改节点值

[zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"

2.9、删除节点

[zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin

2.10、递归删除节点

[zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo

2.11、节点监听

查看监听原理节

四、javaApi操作

1、原生api

创建maven引入依赖

<dependencies>
      <!-- https://mvnrepository.com/artifact/junit/junit -->
      <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>RELEASE</version>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
      <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-core</artifactId>
          <version>2.14.1</version>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
      <dependency>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
          <version>3.7.0</version>
      </dependency>
  </dependencies>

需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”

log4j.rootLogger=INFO, stdout 

log4j.appender.stdout=org.apache.log4j.ConsoleAppender 

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 

log4j.appender.logfile=org.apache.log4j.FileAppender 

log4j.appender.logfile.File=target/spring.log 

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n 

1.1、创建zk客户端

public class TestZookeeper {

    private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    private int sessionTimeout = 2000;
    private ZooKeeper zooKeeper;

    @Test
    public void init() throws IOException {
        zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

1.2、创建子节点

public class TestZookeeper {

//    private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    private String connectionString = "127.0.0.1:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zkClient;

    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                
            }
        });
    }

    //创建节点
    @Test
    public void create() throws InterruptedException, KeeperException {
        // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型 
        String path = zkClient.create("/atguigu", "chenchen".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(path);
    }
}

1.3、获取子节点并监听节点变化

public class TestZookeeper {

//    private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
   private String connectionString = "127.0.0.1:2181";
   private int sessionTimeout = 2000;
   private ZooKeeper zkClient;

   @Before
   public void init() throws IOException {
       zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
           @Override
           public void process(WatchedEvent watchedEvent) {//监听节点路径的变化
               List<String> children = null;
               try {
                   System.out.println("********************************");
                   children = zkClient.getChildren("/", true);
                   for (int i = 0; i < children.size(); i++) {
                       System.out.println(children.get(i));
                   }
                   System.out.println("********************************");
               } catch (KeeperException e) {
                   e.printStackTrace();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       });
   }

   //获取子节点并监控节点的变化
   @Test
   public void getDataAndWatch() throws InterruptedException, KeeperException, IOException {
       List<String> children = zkClient.getChildren("/", true);
       for (int i = 0; i < children.size(); i++) {
           System.out.println(children.get(i));
       }
       //让程序不要结束
       System.in.read();
   }

}

1.4、判断znode是否存在

public class TestZookeeper {

//    private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
  private String connectionString = "127.0.0.1:2181";
  private int sessionTimeout = 2000;
  private ZooKeeper zkClient;

  @Before
  public void init() throws IOException {
      zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
          @Override
          public void process(WatchedEvent watchedEvent) {
              List<String> children = null;
              try {
                  System.out.println("********************************");
                  children = zkClient.getChildren("/", true);
                  for (int i = 0; i < children.size(); i++) {
                      System.out.println(children.get(i));
                  }
                  System.out.println("********************************");
              } catch (KeeperException e) {
                  e.printStackTrace();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      });
  }

  //判断节点是否存在
  @Test
  public void exist() throws InterruptedException, KeeperException {
      Stat exists = zkClient.exists("/dubbo", true);
      if (exists == null) {
          System.out.println("没有该节点");
      }else{
          System.out.println("该节点存在");
      }
  }
}

1.5、删除znode

public class TestZookeeper {

//    private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    private String connectionString = "127.0.0.1:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zkClient;

    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {//监听节点路径的变化
                List<String> children = null;
                try {
                    System.out.println("********************************");
                    children = zkClient.getChildren("/", true);
                    for (int i = 0; i < children.size(); i++) {
                        System.out.println(children.get(i));
                    }
                    System.out.println("********************************");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    //删除节点
    @Test
    public void delete() throws InterruptedException, KeeperException {
        zkClient.delete("/atguigu2",-1);
    }
}

2、zookeeper 开源客户端curator

Curator 是 Apache ZooKeeper 的Java客户端库。Curator 项目的目标是简化 ZooKeeper 客户端的使用。

Apache Curator –

2.1、创建连接

创建maven项目引入依赖和日志文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>curator-zk</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>

        <!--curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

log4j.properties

log4j.rootLogger=off,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n

连接zk

@Before
public void testConnect() {
    //重试策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    //2.第二种方式
    //CuratorFrameworkFactory.builder();
    client = CuratorFrameworkFactory.builder()
        .connectString("192.168.200.130:2181")
        .sessionTimeoutMs(60 * 1000)
        .connectionTimeoutMs(15 * 1000)
        .retryPolicy(retryPolicy)
        .namespace("test")
        .build();
    //开启连接
    client.start();
}

2.2、节点创建

/**
* 创建节点:create 持久 临时 顺序 数据
* 1. 基本创建 :create().forPath("")
* 2. 创建节点 带有数据:create().forPath("",data)
* 3. 设置节点的类型:create().withMode().forPath("",data)
* 4. 创建多级节点  /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
*/
@Test
public void testCreate() throws Exception {
    //2. 创建节点 带有数据
    //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    String path = client.create().forPath("/app2", "hehe".getBytes());
    System.out.println(path);
}

@Test
public void testCreate2() throws Exception {
    //1. 基本创建
    //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
    String path = client.create().forPath("/app1");
    System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
    //3. 设置节点的类型
    //默认类型:持久化
    String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
    System.out.println(path);
}
@Test
public void testCreate4() throws Exception {
    //4. 创建多级节点  /app1/p1
    //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
    String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
    System.out.println(path);
}

2.3、节点查看

  /**
    * 查询节点:
    * 1. 查询数据:get: getData().forPath()
    * 2. 查询子节点: ls: getChildren().forPath()
    * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
    */
    @Test
    public void testGet1() throws Exception {
        //1. 查询数据:get
        byte[] data = client.getData().forPath("/app1");
        System.out.println(new String(data));
    }

    @Test
    public void testGet2() throws Exception {
        // 2. 查询子节点: ls
        List<String> path = client.getChildren().forPath("/");
        System.out.println(path);
    }
    @Test
    public void testGet3() throws Exception {
        Stat status = new Stat();
        System.out.println(status);
        //3. 查询节点状态信息:ls -s
        client.getData().storingStatIn(status).forPath("/app1");
        System.out.println(status);
    }

2.4、节点删除

/**
* 删除节点: delete deleteall
* 1. 删除单个节点:delete().forPath("/app1");
* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");
* 4. 回调:inBackground
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
    // 1. 删除单个节点
    client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
    //2. 删除带有子节点的节点
    client.delete().deletingChildrenIfNeeded().forPath("/app4");
}

@Test
public void testDelete3() throws Exception {
    //3. 必须成功的删除
    client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
    //4. 回调
    client.delete().guaranteed().inBackground(new BackgroundCallback(){
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            System.out.println("我被删除了~");
            System.out.println(event);
        }
    }).forPath("/app1");
}

2.5、节点修改

   /**
    * 修改数据
    * 1. 基本修改数据:setData().forPath()
    * 2. 根据版本修改: setData().withVersion().forPath()
    * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
    *
    * @throws Exception
    */
    @Test
    public void testSet() throws Exception {
        client.setData().forPath("/app1", "itcast".getBytes());
    }

    @Test
    public void testSetForVersion() throws Exception {
        Stat status = new Stat();
        //3. 查询节点状态信息:ls -s
        client.getData().storingStatIn(status).forPath("/app1");
        int version = status.getVersion();//查询出来的 3
        System.out.println(version);
        client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
    }

2.6、节点监听

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便

需要开发人员自己反复注册Watcher,比较繁琐。

Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

ZooKeeper提供了三种Watcher:

NodeCache : 只是监听某一个特定的节点

PathChildrenCache : 监控一个ZNode的子节点.

TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

/**
* 演示 NodeCache:给指定一个节点注册监听器
*/
@Test
public void testNodeCache() throws Exception {
    //1. 创建NodeCache对象
    final NodeCache nodeCache = new NodeCache(client,"/app1");
    //2. 注册监听
       nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            System.out.println("节点变化了~");
            //获取修改节点后的数据
            byte[] data = nodeCache.getCurrentData().getData();
            System.out.println(new String(data));
            }
        });
        //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
        nodeCache.start(true);
        while (true){
    }
}

/**演示PathChildrenCache
*/
@Test
public void testPathChildrenCache() throws Exception {
    //1.创建监听对象
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
    //2. 绑定监听器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {                @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            System.out.println("子节点变化了~");
            System.out.println(event);
            //监听子节点的数据变更,并且拿到变更后的数据
            //1.获取类型
            PathChildrenCacheEvent.Type type = event.getType();
            //2.判断类型是否是update
            if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                System.out.println("数据变了!!!");
                byte[] data = event.getData().getData();
                System.out.println(new String(data));
            }
        }
    });
    //3. 开启
    pathChildrenCache.start();
    while (true){
    }
}

   /**
    * 演示 TreeCache:监听某个节点自己和所有子节点们
    */
    @Test
    public void testTreeCache() throws Exception {
        //1. 创建监听器
        TreeCache treeCache = new TreeCache(client,"/app2");
        //2. 注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("节点变化了");
                System.out.println(event);
            }
        });
        //3. 开启
        treeCache.start();
        while (true){
        }
    }

2.7、分布式锁

在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。

但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。

那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁
在这里插入图片描述

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。

客户端获取锁时,在lock节点下创建临时顺序节点。

然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。

如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。

如果发现比自己小的那个节点被删除,则客户端的

Watcher会收到相应通知,此时再次判断自己创建的节点

是否是lock子节点中序号最小的,如果是则获取到了锁,

如果不是则重复以上步骤继续获取到比自己小的一个节点

并注册监听。
在这里插入图片描述

分布式锁-模拟12306售票案例

Curator实现分布式锁API

在Curator中有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

在这里插入图片描述

创建线程进行加锁设置

public class Ticket12306 implements Runnable{
    private int tickets = 10;//数据库的票数
    private InterProcessMutex lock ;
    @Override
    public void run() {
        while(true){
            //获取锁
            try {
            lock.acquire(3, TimeUnit.SECONDS);
                if(tickets > 0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    Thread.sleep(100);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }
    }
}

创建连接,并且初始化锁

 public Ticket12306(){
        //重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //2.第二种方式
        //CuratorFrameworkFactory.builder();
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.149.135:2181")
            .sessionTimeoutMs(60 * 1000)
            .connectionTimeoutMs(15 * 1000)
            .retryPolicy(retryPolicy)
            .build();
        //开启连接
        client.start();
        lock = new InterProcessMutex(client,"/lock");
    }

运行多个线程进行测试

  public class LockTest {
        public static void main(String[] args) {
            Ticket12306 ticket12306 = new Ticket12306();
            //创建客户端
            Thread t1 = new Thread(ticket12306,"携程");
            Thread t2 = new Thread(ticket12306,"飞猪");
            t1.start();
            t2.start();
        }
    }

五、内部原理(面试题)

1、选举机制

1)半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器。

2)Zookeeper虽然在配置文件中并没有指定Master和Slave。但是,Zookeeper工作时,是有一个节点为Leader,其他则为Follower,Leader是通过内部的选举机制临时产生的。

3)以一个简单的例子来说明整个选举的过程。

假设有五台服务器组成的Zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么,

在这里插入图片描述

(1)服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态。

(2)服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。

(3)服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader。

(4)服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了。

(5)服务器5启动,同4一样当小弟。

2、ZooKeeper的监听原理是什么?

2.1、节点值变化监听

在 hadoop104 主机上注册监听/sanguo 节点数据变化

[zk: localhost:2181(CONNECTED) 26] get -w /sanguo

可能因为版本问题上面的命令会报错,可改用:

[zk: localhost:2181(CONNECTED) 26] get /sanguo [watch]

在 hadoop103 主机上修改/sanguo 节点的数据

[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"

观察 hadoop104 主机收到数据变化的监听

WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged 
path:/sanguo

注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。

2.2、节点的子节点变化监听(路径变化)

在 hadoop104 主机上注册监听/sanguo 节点的子节点变化

[zk: localhost:2181(CONNECTED) 1] ls -w /sanguo
[shuguo, weiguo]

在 hadoop103 主机/sanguo 节点上创建子节点

[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"
Created /sanguo/jin

观察 hadoop104 主机收到子节点变化的监听

WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged 
path:/sanguo

注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

  • 监控数据变化用 set
  • 监控子节点变化用 ls

3、写数据流程

4、ZooKeeper的部署方式有哪几种?

集群中的角色有哪些?集群最少需要几台机器?

(1)部署方式单机模式、集群模式

(2)角色:Leader和Follower

(3)集群最少需要机器数:3

六、实战案例

1、服务器动态上下线

需求

现在集群创建server节点

create /servers "servers"

服务器端向Zookeeper注册代码

public class DistributeServer {

    private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    private int sessionTimeout = 2000;
    private ZooKeeper zkClient;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeServer server = new DistributeServer();

        //1 连接zookeeper集群
        server.getConnect();

        //2 注册节点
        server.regist("test2");

        //3 业务逻辑处理
        server.business();
    }

    //业务逻辑处理
    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    //注册节点
    private void regist(String hostName) throws InterruptedException, KeeperException {
        String path = zkClient.create
                ("/servers/server", hostName.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(path);
    }

    //连接zookeeper集群
    private void getConnect() throws IOException {

        zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                List<String> children = null;
                try {
                    System.out.println("********************************");
                    children = zkClient.getChildren("/", true);
                    for (int i = 0; i < children.size(); i++) {
                        System.out.println(children.get(i));
                    }
                    System.out.println("********************************");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

客户端代码

public class DistributeClient {
    private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
    private int sessionTimeout = 2000;
    private ZooKeeper zkClient;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeClient server = new DistributeClient();

        //1 连接zookeeper集群
        server.getConnect();

        //2 注册节点
        server.getChildren();

        //3 业务逻辑处理
        server.business();
    }

    //注册监听
    private void getChildren() throws InterruptedException, KeeperException {
        List<String> children = zkClient.getChildren("/servers", true);
        //储存服务器节点主机名称集合
        ArrayList<String> hosts = new ArrayList<>();

        for (String child : children) {
            byte[] data = zkClient.getData("/servers/" + child, false, null);
            hosts.add(String.valueOf(data));
        }

        //将所有在线主机的名称打印出来
        System.out.println(hosts);
    }

    //业务逻辑处理
    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    //连接zookeeper集群
    private void getConnect() throws IOException {

        zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    getChildren();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

2、分布式锁案例

2.1、原生zk实现

锁实现

package com.herobin.flink.debug_local.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";

    // 超时时间
    private int sessionTimeout = 2000;
    private ZooKeeper zk;
    private String rootNode = "locks";
    private String subNode = "seq-";

    // 当前 client 等待的子节点
    private String waitPath;

    //ZooKeeper 连接
    private CountDownLatch connectLatch = new CountDownLatch(1);

    //ZooKeeper 节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);

    // 当前 client 创建的子节点
    private String currentNode;

    // 和 zk 服务建立连接,并创建根节点
    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
                if (event.getState() ==
                        Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
                // 发生了 waitPath 的删除事件
                if (event.getType() ==
                        Event.EventType.NodeDeleted && event.getPath().equals(waitPath))
                {
                    waitLatch.countDown();
                } }
        });

        // 等待连接建立
        connectLatch.await();
        //获取根节点状态
        Stat stat = zk.exists("/" + rootNode, false);
        //如果根节点不存在,则创建根节点,根节点类型为永久节点
        if (stat == null) {
            System.out.println("根节点不存在");
            zk.create("/" + rootNode, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 加锁方法
    public void zkLock() {
        try {
            //在根节点下创建临时顺序节点,返回值为创建的节点路径
            currentNode = zk.create("/" + rootNode + "/" + subNode,
                    null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            // wait 一小会, 让结果更清晰一些
            Thread.sleep(10);
            // 注意, 没有必要监听"/locks"的子节点的变化情况
            List<String> childrenNodes = zk.getChildren("/" +
                    rootNode, false);
            // 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
            if (childrenNodes.size() == 1) {
                return;
            } else {
                //对根节点下的所有临时顺序节点进行从小到大排序
                Collections.sort(childrenNodes);
                //当前节点名称
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());
                //获取当前节点的位置
                int index = childrenNodes.indexOf(thisNode);
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
                    return;
                } else {
                    // 获得排名比 currentNode 前 1 位的节点
                    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
                    // 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
                    zk.getData(waitPath, true, new Stat());
                    //进入等待锁状态
                    waitLatch.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 解锁方法
    public void zkUnlock() {
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}

锁测试(创建两个线程)

package com.herobin.flink.debug_local.zk;

import org.apache.zookeeper.KeeperException;
import java.io.IOException;

public class DistributedLockTest {

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {

        // 创建分布式锁 1
        final DistributedLock lock1 = new DistributedLock();

        // 创建分布式锁 2
        final DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock1.zkLock();
                    System.out.println("线程 1 获取锁");
                    Thread.sleep(5 * 1000);
                    lock1.zkUnlock();
                    System.out.println("线程 1 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                } }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock2.zkLock();
                    System.out.println("线程 2 获取锁");
                    Thread.sleep(5 * 1000);
                    lock2.zkUnlock();
                    System.out.println("线程 2 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                } }
        }).start();
    }
}

观察运行

线程 1 获取锁
线程 1 释放锁
线程 2 获取锁
线程 2 释放锁

2.2、Curator 框架实现分布式锁案例

查看API Curator章节


本文转载自: https://blog.csdn.net/qq_34491508/article/details/125373957
版权归原作者 程序三两行 所有, 如有侵权,请联系我们删除。

“ZooKeeper”的评论:

还没有评论