0


ZooKeeper实战总结

注意:本文参考 ZooKeeper 实战 | JavaGuide

Curator框架实现Zookeeper基本操作 - 百里浅暮 - 博客园

菜鸟教程 1.0 Zookeeper 教程 | 菜鸟教程

ZooKeeper 安装和使用

使用Docker 安装 zookeeper

a.使用 Docker 下载 ZooKeeper

docker pull zookeeper:3.5.8

b.运行 ZooKeeper

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.5.8

连接 ZooKeeper 服务

a.进入ZooKeeper容器中

先使用 docker ps 查看 ZooKeeper 的 ContainerID,然后使用 docker exec -it ContainerID /bin/bash 命令进入容器中。

b.先进入 bin 目录,然后通过 ./zkCli.sh -server 127.0.0.1:2181命令连接ZooKeeper 服务

root@eaf70fc620cb:/apache-zookeeper-3.5.8-bin cd bin

如果你看到控制台成功打印出如下信息的话,说明你已经成功连接 ZooKeeper 服务。

常用增删改查命令演示

查看常用命令(help 命令)

通过 help 命令查看 ZooKeeper 常用命令

创建节点(create 命令)

create [-s] [-e] path data acl

[-s] [-e]:-s 和 -e 都是可选的,-s 代表顺序节点, -e 代表临时节点,注意其中 -s 和 -e 可以同时使用的,并且临时节点不能再创建子节点。

path:指定要创建节点的路径,比如 /runoob。

data:要在此节点存储的数据。

acl:访问权限相关,默认是 world,相当于全世界都能访问。

通过 create 命令在根目录创建了 node1 节点,与它关联的字符串是"node1"

[zk: 127.0.0.1:2181(CONNECTED) 34] create /node1 “node1”

通过 create 命令在node1 创建了 node1.1节点,与它关联的内容是数字 123

[zk: 127.0.0.1:2181(CONNECTED) 1] create /node1/node1.1 123

Created /node1/node1.1

$ create -s -e /runoob 0

创建的节点既是有序,又是临时节点。

更新节点数据内容(set 命令)

set path data [version]

path:节点路径。

data:需要存储的数据。

[version]:可选项,版本号(可用作乐观锁)。

[zk: 127.0.0.1:2181(CONNECTED) 11] set /node1 "set node1"

以下实例开启两个终端,也可以在同一终端操作:

$ get /runoob

下图可见,只有正确的版本号才能设置成功:

$ set /runoob 0 1
$ set /runoob 0 2
$ set /runoob 0 10
$ set /runoob 0 6

获取节点的数据(get 命令)

get 命令可以获取指定节点的数据内容和节点的状态,可以看出我们通过 set 命令已经将节点数据内容改为 "set node1"。

get /node1

set node1
cZxid = 0x47
ctime = Sun Jan 20 10:22:59 CST 2019
mZxid = 0x4b
mtime = Sun Jan 20 10:41:10 CST 2019
pZxid = 0x4a
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 1

get path [watch]

path:代表路径。

[watch]:对节点进行事件监听。

以下实例查看同时开启两个终端。

终端一:

$ get /runoob watch

在终端二对此节点进行修改:

$ set /runoob 1

终端一自动显示 NodeDataChanged 事件:

查看某个目录下的子节点(ls 命令)

通过 ls 命令查看根目录下的节点

[zk: 127.0.0.1:2181(CONNECTED) 37] ls /

[dubbo, ZooKeeper, node1]

通过 ls 命令查看 node1 目录下的节点

[zk: 127.0.0.1:2181(CONNECTED) 5] ls /node1

[node1.1]

ZooKeeper 中的 ls 命令和 linux 命令中的 ls 类似, 这个命令将列出绝对路径 path 下的所有子节点信息(列出 1 级,并不递归)

查看节点状态(stat 命令)

通过 stat 命令查看节点状态

stat path [watch]

path:代表路径。

[watch]:对节点进行事件监听。

[zk: 127.0.0.1:2181(CONNECTED) 10] stat /node1
cZxid = 0x47
ctime = Sun Jan 20 10:22:59 CST 2019
mZxid = 0x47
mtime = Sun Jan 20 10:22:59 CST 2019
pZxid = 0x4a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 1

上面显示的一些信息比如 cversion、aclVersion、numChildren 等等

查看节点信息和状态(ls2 命令)

ls2 命令更像是 ls 命令和 stat 命令的结合。 ls2 命令返回的信息包括 2 部分:

子节点列表

当前节点的 stat 信息。

[zk: 127.0.0.1:2181(CONNECTED) 7] ls2 /node1
[node1.1]
cZxid = 0x47
ctime = Sun Jan 20 10:22:59 CST 2019
mZxid = 0x47
mtime = Sun Jan 20 10:22:59 CST 2019
pZxid = 0x4a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 1

删除节点(delete 命令)

delete path [version]

path:节点路径。

[version]:可选项,版本号(同 set 命令)。

这个命令很简单,但是需要注意的一点是如果你要删除某一个节点,那么这个节点必须无子节点才行。

[zk: 127.0.0.1:2181(CONNECTED) 3] delete /node1/node1.1

注意:delete 命令只能一层一层删除!新版本可以通过 deleteall 命令递归删除。

Zookeeper 四字命令

zookeeper 支持某些特定的四字命令与其交互,用户获取 zookeeper 服务的当前状态及相关信息,用户在客户端可以通过 telenet 或者 nc(netcat) 向 zookeeper 提交相应的命令。

安装 nc 命令:

$ yum install nc # centos

$ sudo apt install netcat # ubuntu

四字命令格式:

echo [command] | nc [ip] [port]

ZooKeeper 常用四字命令主要如下:
四字命令功能描述conf3.3.0版本引入的。打印出服务相关配置的详细信息。cons3.3.0版本引入的。列出所有连接到这台服务器的客户端全部连接/会话详细信息。包括"接受/发送"的包数量、会话id、操作延迟、最后的操作执行等等信息。crst3.3.0版本引入的。重置所有连接的连接和会话统计信息。dump列出那些比较重要的会话和临时节点。这个命令只能在leader节点上有用。envi打印出服务环境的详细信息。reqs列出未经处理的请求ruok测试服务是否处于正确状态。如果确实如此,那么服务返回"imok",否则不做任何相应。stat输出关于性能和连接的客户端的列表。srst重置服务器的统计。srvr3.3.0版本引入的。列出连接服务器的详细信息wchs3.3.0版本引入的。列出服务器watch的详细信息。wchc3.3.0版本引入的。通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表。wchp3.3.0版本引入的。通过路径列出服务器watch的详细信息。它输出一个与session相关的路径。mntr3.4.0版本引入的。输出可用于检测集群健康状态的变量列表

stat 命令

stat 命令用于查看 zk 的状态信息,实例如下:

$ echo stat | nc 192.168.3.38 2181

ruok 命令

ruok 命令用于查看当前 zkserver 是否启动,若返回 imok 表示正常。实例如下:

$ echo ruok | nc 192.168.3.38 2181

dump 命令

dump 命令用于列出未经处理的会话和临时节点。实例如下:

$ echo dump | nc 192.168.3.38 2181

conf 命令

conf 命令用于查看服务器配置。实例如下:

$ echo conf | nc 192.168.3.38 2181

cons 命令

cons 命令用于展示连接到服务器的客户端信息。实例如下:

$ echo cons | nc 192.168.3.38 2181

envi 命令

envi 命令用于查看环境变量。实例如下:

$ echo envi | nc 192.168.3.38 2181

ACL命令

ACL 命令行

getAcl 命令:获取某个节点的 acl 权限信息。

setAcl 命令:设置某个节点的 acl 权限信息。

addauth 命令:输入认证授权信息,注册时输入明文密码,加密形式保存。

ACL 构成

zookeeper 的 acl 通过 [scheme:id:permissions] 来构成权限列表。

1、scheme:代表采用的某种权限机制,包括 world、auth、digest、ip、super 几种。

2、id:代表允许访问的用户。

3、permissions:权限组合字符串,由 cdrwa 组成,其中每个字母代表支持不同权限, 创建权限 create(c)、删除权限 delete(d)、读权限 read(r)、写权限 write(w)、管理权限admin(a)。

world 实例

查看默认节点权限,再更新节点 permissions 权限部分为 crwa,结果删除节点失败。其中 world 代表开放式权限。

$ getAcl /runoob/child

$ setAcl /runoob/child world:anyone:crwa

$ delete /runoob/child

auth 实例

auth 用于授予权限,注意需要先创建用户。

$ setAcl /runoob/child auth:user1:123456:cdrwa

$ addauth digest user1:123456

$ setAcl /runoob/child auth:user1:123456:cdrwa

$ getAcl /runoob/child

digest 实例

退出当前用户,重新连接终端,digest 可用于账号密码登录和验证。。

$ ls /runoob

$ create /runoob/child01 runoob

$ getAcl /runoob/child01

$ setAcl /runoob/child01 digest:user1:HYGa7IZRm2PUBFiFFu8xY2pPP/s=:cdra

$ getAcl /runoob/child01

$ addauth digest user1:123456

$ getAcl /runoob/child01

提示:加密密码是上一步创建的。

IP 实例

限制 IP 地址的访问权限,把权限设置给 IP 地址为 192.168.3.7 后,IP 为 192.168.3.38 已经没有访问权限。

$ create /runoob/ip 0

$ getAcl /runoob/ip

$ setAcl /runoob/ip ip:192.168.3.7:cdrwa

$ get /runoob/ip

ZooKeeper Java客户端 Curator简单使用

Curator 是Netflix公司开源的一套 ZooKeeper Java客户端框架,相比于 Zookeeper 自带的客户端 zookeeper 来说,Curator 的封装更加完善,各种 API 都可以比较方便地使用。

下面我们就来简单地演示一下 Curator 的使用吧!

Curator4.0+版本对ZooKeeper 3.5.x支持比较好。开始之前,请先将下面的依赖添加进你的项目。

<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.11</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.8</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-framework</artifactId>
  <version>4.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>4.0.0</version>
</dependency>

Curator 包含了几个包:

curator-framework:对 zookeeper 的底层 api 的一些封装。

curator-client:提供一些客户端的操作,例如重试策略等。

curator-recipes:封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。

连接 ZooKeeper 客户端

通过 CuratorFrameworkFactory 创建 CuratorFramework 对象,然后再调用 CuratorFramework 对象的 start() 方法即可!

private static final int BASE_SLEEP_TIME = 1000;
private static final int MAX_RETRIES = 3;

// Retry strategy. Retry 3 times, and will increase the sleep time between retries.
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
    // the server to connect to (can be a server list)
    .connectString("127.0.0.1:2181")
    .retryPolicy(retryPolicy)
    .build();
zkClient.start();

对于一些基本参数的说明:

baseSleepTimeMs:重试之间等待的初始时间

maxRetries :最大重试次数

connectString :要连接的服务器列表

retryPolicy :重试策略

说一下retryPolicy,重连策略,建议用其中两种:

//重连3次,每次休息3秒
new RetryNTimes(3,3000);
//重连3次,每次休息大约是1秒
new ExponentialBackoffRetry(1000,3);
//初始化一个大概的等待时间1秒,然后开始重连,最多重连3次,每次最多休息2秒
new ExponentialBackoffRetry(1000,3,2000);

//计算通过这个初始化的大约时间,计算实际需要睡眠多久
long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));

namespace代表命名空间,注意的是,curator会自动创建

数据节点的增删改查

创建节点

我们在 ZooKeeper常见概念解读 中介绍到,我们通常是将 znode 分为 4 大类:

持久(PERSISTENT)节点 :一旦创建就一直存在即使 ZooKeeper 集群宕机,直到将其删除。

临时(EPHEMERAL)节点 :临时节点的生命周期是与 客户端会话(session) 绑定的,会话消失则节点消失 。并且,临时节点 只能做叶子节点 ,不能创建子节点。

持久顺序(PERSISTENT_SEQUENTIAL)节点 :除了具有持久(PERSISTENT)节点的特性之外, 子节点的名称还具有顺序性。比如 /node1/app0000000001 、/node1/app0000000002 。

临时顺序(EPHEMERAL_SEQUENTIAL)节点 :除了具备临时(EPHEMERAL)节点的特性之外,子节点的名称还具有顺序性。

你在使用的ZooKeeper 的时候,会发现 CreateMode 类中实际有 7种 znode 类型 ,但是用的最多的还是上面介绍的 4 种。

a.创建持久化节点

你可以通过下面两种方式创建持久化的节点。

//注意:下面的代码会报错,下文说了具体原因
zkClient.create().forPath("/node1/00001");
zkClient.create().withMode(CreateMode.PERSISTENT).forPath("/node1/00002");

但是,你运行上面的代码会报错,这是因为的父节点node1还未创建。

你可以先创建父节点 node1 ,然后再执行上面的代码就不会报错了。

zkClient.create().forPath("/node1");

更推荐的方式是通过下面这行代码, creatingParentsIfNeeded() 可以保证父节点不存在的时候自动创建父节点,这是非常有用的。

zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/node1/00001");

b.创建临时节点

需要注意,只有叶节点可以做临时节点,所以叶节点的父节点必须是永久节点,也就是creatingParentsIfNeeded这个方法创建的父节点必须是永久节点。

zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/node1/00001");

连续的参考下面

public enum CreateMode {
    PERSISTENT(0, false, false),
    PERSISTENT_SEQUENTIAL(2, false, true),
    EPHEMERAL(1, true, false),
    EPHEMERAL_SEQUENTIAL(3, true, true);
}

c.创建节点并指定数据内容

zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/node1/00001","java".getBytes());
zkClient.getData().forPath("/node1/00001");//获取节点的数据内容,获取到的是 byte数组

d.检测节点是否创建成功

zkClient.checkExists().forPath("/node1/00001");//不为null的话,说明节点创建成功

删除节点

a.删除一个子节点

zkClient.delete().forPath("/node1/00001");

b.删除一个节点以及其下的所有子节点

zkClient.delete().deletingChildrenIfNeeded().forPath("/node1");

c.删除指定版本的节点

client.delete().withVersion(0).forPath("/comm_msg_nd");

d.保证删除,失败后继续执行

client.delete().guaranteed().forPath("/comm_msg_nd");

获取/更新节点数据内容

zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/node1/00001","java".getBytes());
zkClient.getData().forPath("/node1/00001");//获取节点的数据内容
zkClient.setData().forPath("/node1/00001","c++".getBytes());//更新节点数据内容

Stat stat = new Stat();
byte[] ctx = cc.getData().storingStatIn(stat).forPath("/comm_msg_nd");
//获取节点内容为ctx
System.out.println(new String(ctx));
//获取该节点stat
//78,78,1573789366124,1573789366124,0,0,0,0,3,0,78
System.out.println(stat);

Stat stat = new Stat();
stat = cc.setData().forPath("/comm_msg_nd","new ctx".getBytes());
System.out.println(stat);

获取某个节点的所有子节点路径

List<String> childrenPaths = zkClient.getChildren().forPath("/node1");

异步调用

ExecutorService es = Executors.newFixedThreadPool(2);
//带线程池的异步接口
client.create().inBackground((client,event)->{
    //pool-3-thread-1,CuratorEventImpl{type=CREATE, resultCode=0, path='/abc', name='/abc', children=null, context=null,stat=114,114,1573797468244,1573797468244,0,0,0,0,13,0,114, data=null, watchedEvent=null, aclList=null, opResults=null}  
    System.out.println(Thread.currentThread().getName()+","+event);

},es).forPath("/abc");

//不带线程池的异步接口
client.delete().inBackground((client,event)->{

    //main-EventThread,CuratorEventImpl{type=DELETE, resultCode=0, path='/abc', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null, opResults=null}        
    System.out.println(Thread.currentThread().getName()+","+event);

}).forPath("/abc");

Thread.sleep(Integer.MAX_VALUE);

1)inBackground() 该方法就是添加一个异步的回调方法,参数是BackgroundCallback接口,是一个函数式接口。

2)BackgroundCallback的接口参数为client(当前客户端实例)及event(服务端事件)

3)事件类型,CuratorEventType,包含如下信息 {type=DELETE, resultCode=0, path='/abc', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null, opResults=null}。type对应的就是操作类型,比如delete对应的delete(),create对应create()等,resultCode是响应码,0代表成功

4)线程池es的作用,通过名称可以看到,默认情况下都是使用main-EventThread线程来串行执行,如果耗时较长会有影响,可以通过定制线程池来缓解这种情况。

事件监听

zookeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。curator引入了cache来实现zookeeper服务端事件的监听。Cache是Curator中对时间监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程zookeeper视图的对比过程。

cache分为两类监听类型,节点监听和子节点监听。

NodeCache(监听和缓存根节点变化) 只监听单一个节点(变化 添加,修改,删除)。

PathChildrenCache(监听和缓存子节点变化) 监听这个节点下的所有子节点(变化 添加,修改,删除)。

TreeCache(监听和缓存根节点变化和子节点变化) NodeCache+ PathChildrenCache 监听当前节点及其下的所有子节点的变化。

1、NodeCache

//创建节点数据监听对象
final NodeCache nodeCache = new NodeCache(client, "/hello");
//开始缓存
/**
* start参数为true:可以直接获取监听的节点,System.out.println(nodeCache.getCurrentData());为ChildData{path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608
, data=[97, 98, 99, 100, 101]}
* 参数为false:不可以获取监听的节点,System.out.println(nodeCache.getCurrentData());为null
*/
nodeCache.start(true);
System.out.println(nodeCache.getCurrentData());
//添加监听对象
nodeCache.getListenable().addListener(new NodeCacheListener() {
    //如果节点数据有变化,会回调该方法
    public void nodeChanged() throws Exception {
        String data = new String(nodeCache.getCurrentData().getData());
        System.out.println("数据Watcher:路径=" + nodeCache.getCurrentData().getPath()
                + ":data=" + data);
    }
});

2、PathChildrenCache

//监听指定节点的子节点变化情况包括͹新增子节点 子节点数据变更 和子节点删除
//true表示用于配置是否把节点内容缓存起来,如果配置为true,客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容(即:event.getData().getData())ͺ如果为false 则无法取到数据内容(即:event.getData().getData())
PathChildrenCache childrenCache = new PathChildrenCache(client,"/hello",true);
/**
* NORMAL:  普通启动方式, 在启动时缓存子节点数据
* POST_INITIALIZED_EVENT:在启动时缓存子节点数据,提示初始化
* BUILD_INITIAL_CACHE: 在启动时什么都不会输出
*  在官方解释中说是因为这种模式会在start执行执行之前先执行rebuild的方法,而rebuild的方法不会发出任何事件通知。
*/
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
System.out.println(childrenCache.getCurrentData());
//添加监听
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){
            System.out.println("子节点更新");
            System.out.println("节点:"+event.getData().getPath());
            System.out.println("数据" + new String(event.getData().getData()));
        }else if(event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ){
            System.out.println("初始化操作");
        }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ){
            System.out.println("删除子节点");
            System.out.println("节点:"+event.getData().getPath());
            System.out.println("数据" + new String(event.getData().getData()));
        }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ){
            System.out.println("添加子节点");
            System.out.println("节点:"+event.getData().getPath());
            System.out.println("数据" + new String(event.getData().getData()));
        }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED ){
            System.out.println("连接失效");
        }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ){
            System.out.println("重新连接");
        }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ){
            System.out.println("连接失效后稍等一会儿执行");
        }
    }
});

3、TreeCache

TreeCache treeCache = new TreeCache(client,"/hello");
treeCache.start();
System.out.println(treeCache.getCurrentData("/hello"));
treeCache.getListenable().addListener(new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){
                System.out.println(event.getData().getPath() + "节点添加");
            }else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED){
                System.out.println(event.getData().getPath() + "节点移除");
            }else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){
                System.out.println(event.getData().getPath() + "节点修改");
            }else if(event.getType() == TreeCacheEvent.Type.INITIALIZED){
                System.out.println("初始化完成");
            }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_SUSPENDED){
                System.out.println("连接过时");
            }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_RECONNECTED){
                System.out.println("重新连接");
            }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_LOST){
                System.out.println("连接过时一段时间");
            }
    }
});

注:在一些Curator版本中以上几个类被标识过期,被CuratorCache取代。使用方法也会有所不同。

// 创建CuratorCache实例,基于路径/father/son/grandson1(这里说的路径都是基于命名空间下的路径)
// 缓存构建选项是SINGLE_NODE_CACHE
CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                                        CuratorCache.Options.SINGLE_NODE_CACHE);

// 创建一系列CuratorCache监听器,都是通过lambda表达式指定
CuratorCacheListener listener = CuratorCacheListener.builder()
    // 初始化完成时调用
    .forInitialized(() -> System.out.println("[forInitialized] : Cache initialized"))
    // 添加或更改缓存中的数据时调用
    .forCreatesAndChanges(
    (oldNode, node) -> System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n",
                                         oldNode, node)
)
    // 添加缓存中的数据时调用
    .forCreates(childData -> System.out.printf("[forCreates] : Node created: [%s]\n", childData))
    // 更改缓存中的数据时调用
    .forChanges(
    (oldNode, node) -> System.out.printf("[forChanges] : Node changed: Old: [%s] New: [%s]\n",
                                         oldNode, node)
)
    // 删除缓存中的数据时调用
    .forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]\n", childData))
    // 添加、更改或删除缓存中的数据时调用
    .forAll((type, oldData, data) -> System.out.printf("[forAll] : type: [%s] [%s] [%s]\n", type, oldData, data))
    .build();

// 给CuratorCache实例添加监听器
cache.listenable().addListener(listener);

// 启动CuratorCache
cache.start();

Master选举

分布式执行一些不需要同时执行的复杂任务,curator利用zk的特质,实现了这个选举过程。其实就是利用了多个zk客户端在同一个位置建节点,只会有一个客户端建立成功这个特性。来实现同一时间,只会选择一个客户端执行任务

LeaderSelector selector = new LeaderSelector(cc, "/tmp/leader/master", new LeaderSelectorListener() {
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        //成为leader了
        System.out.println("do leader work");
        Thread.sleep(5000);
        System.out.println("end work");
    }

    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        System.out.println("stateChanged:"+newState);
    }
});
selector.autoRequeue();
selector.start();
Thread.sleep(Integer.MAX_VALUE);

分布式锁

String lockPath = "/123/111";
InterProcessMutex lock = new InterProcessMutex(client,lockPath);
lock.acquire();
//do something
lock.release();

curator直接给出了分布式锁的实现。原理是客户端创建锁节点,执行完毕后再删除锁节点。一个客户端先检查是否有锁节点,如果没有,说明可以执行,则创建锁节点去执行。如果有锁节点,则说明现在锁在别的客户端那里,自己则需要等待。

分布式计数器

分布式计数器的一个典型应用场景是统计在线人数。

指定一个zookeeper节点作为计数器,多个应用实例在分布式锁的控制下,通过更新该数据节点的内容来实现技术功能。

通过类DistributedAtomicInteger来实现。

//分布式计数器
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client,"/distributed_atomic_counter",new ExponentialBackoffRetry(1000,3));
AtomicValue av = atomicInteger.add(10);
System.out.println(av.succeeded());//true
System.out.println(av.preValue());//0
System.out.println(av.postValue());//10

每个DistributedAtomicXXX里面都有一个AtomicValue,这个是分布式的核心实现类。

AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);
        //尝试下乐观锁
        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            //失败的话再使用排他锁
            tryWithMutex(result, makeValue);
        }

        return result;
    }

分布式Barrier

Barrier是一种用来控制多线程之间同步的经典方式,在JDK中也自带了CyclicBarrier实现。

curator用DistributeBarrier类来实现。

1)分布式的Barrier(主线程触发)

for (int i = 0; i < 5; i++) {
    new Thread(()->{
        System.out.println(Thread.currentThread().getName()+" is ready ");
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("ip:port")
            .sessionTimeoutMs(2000)
            .connectionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .namespace("test")
            .build();
        client.start();
        DistributedBarrier barrier = new DistributedBarrier(client,"/distributed_barrier");
        try {
            barrier.setBarrier();
            barrier.waitOnBarrier();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" run  ");
    }).start();
}
Thread.sleep(5000);
DistributedBarrier barrier = new DistributedBarrier(cc,"/distributed_barrier");
barrier.removeBarrier();

2)分布式的Barrier(根据等待线程数量触发,同时进入 and 同时退出)

for (int i = 0; i < 5; i++) {
    new Thread(()->{
        System.out.println(Thread.currentThread().getName()+" is ready ");
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("ip:port")
            .sessionTimeoutMs(2000)
            .connectionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .namespace("test")
            .build();
        client.start();
        DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client,"/distributed_barrier",5);
        try {
            //进入时会等待,5个才会同时进入
            barrier.enter();
            Thread.sleep(3000);
            //退出时依然要等待,5个才会同时退出
            barrier.leave();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" run  ");
    }).start();
}
标签: zookeeper docker dubbo

本文转载自: https://blog.csdn.net/xushiyu1996818/article/details/127315438
版权归原作者 xushiyu1996818 所有, 如有侵权,请联系我们删除。

“ZooKeeper实战总结”的评论:

还没有评论