0


ZooKeeper 实战:ZooKeeper Java API

在这里插入图片描述

ZooKeeper

API 的核心部分是

ZooKeeper

类。在构造函数中提供一些参数来连接

ZooKeeper

,并提供如下方法:

  • connect − 连接 ZooKeeper 服务器。
  • create − 创建一个 ZNode 节点。
  • exists − 检查指定的节点是否存在。
  • getData − 从指定的节点获取数据。
  • setData − 为指定的节点设置数据。
  • getChildren − 获取指定节点的所有子节点。
  • delete − 删除指定节点以及子节点。
  • close − 关闭连接。

ZooKeeper 大部分 API 都提供了同步和异步方法。同步方法一般会有返回值,并且会抛出相应的异常。异步方法没有返回值,也不会抛出异常。此外异步方法参数在同步方法参数的基础上,会增加

cb

ctx

两个参数。

1. 开发环境

在工程的

pom.xml

文件中加入下面的依赖:

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.5</version></dependency>

截止目前最新版本为 3.5.5 版本。

2. 连接服务器

构造

ZooKeeper

类对象的过程就是与

ZooKeeper

服务器建立连接的过程。

2.1 语法
ZooKeeper

通过构造函数连接服务器。构造函数如下所示:

ZooKeeper(String connectionString,int sessionTimeout,Watcher watcher)throwsIOException
ZooKeeper

构造函数一共有三个参数:

  • connectionString: 第一个参数是 ZooKeeper 服务器地址(可以指定端口,默认端口号为2181)。
  • sessionTimeout: 第二个参数是以毫秒为单位的会话超时时间。表示 ZooKeeper 等待客户端通信的最长时间,之后会声明会话结束。例如,我们设置为5000,即5s,这就是说如果 ZooKeeper 与客户端有5s的时间无法进行通信,ZooKeeper 就会终止客户端的会话。ZooKeeper 会话一般设置超时时间为5-10s。
  • watcher: 第三个参数是 Watcher 对象实例。Watcher 对象接收来自于 ZooKeeper 的回调,以获得各种事件的通知。这个对象需要我们自己创建,因为 Watcher 定义为接口,所以需要我们自己实现一个类,然后初始化这个类的实例并传入 ZooKeeper 的构造函数中。客户端使用 Watcher 接口来监控与 ZooKeeper 之间会话的健康情况。与 ZooKeeper 服务器之间建立或者断开连接时会产生事件。
2.2 Example

下面会创建一个

ZooKeeperConnection

类并实现一个

connect

方法。

connect

方法创建一个

ZooKeeper

对象,连接到

ZooKeeper

集群,然后返回该对象。

CountDownLatch

阻塞主进程,直到客户端连接到

ZooKeeper

集群。

ZooKeeper

通过

Watcher

回调返回连接状态。一旦客户端与

ZooKeeper

集群连接,

Watcher

回调函数会被调用,

Watcher

回调调用

CountDownLatch

countDown

方法释放锁。

packagecom.zookeeper.example;importorg.apache.zookeeper.WatchedEvent;importorg.apache.zookeeper.Watcher;importorg.apache.zookeeper.ZooKeeper;importjava.io.IOException;importjava.util.concurrent.CountDownLatch;// 连接ZooKeeper服务器publicclassZooKeeperConnection{// ZooKeeper实例privateZooKeeper zoo;privatefinalCountDownLatch connectedSignal =newCountDownLatch(1);// 连接服务器publicZooKeeperconnect(String host)throwsIOException,InterruptedException{
        zoo =newZooKeeper(host,5000,newWatcher(){publicvoidprocess(WatchedEvent event){Event.KeeperState state = event.getState();// 连接成功if(state ==Event.KeeperState.SyncConnected){System.out.println("与ZooKeeper服务器连接成功");
                    connectedSignal.countDown();}// 断开连接elseif(state ==Event.KeeperState.Disconnected){System.out.println("与ZooKeeper服务器断开连接");}}});
        connectedSignal.await();return zoo;}// 断开与服务器的连接publicvoidclose()throwsInterruptedException{
        zoo.close();}}

3. 创建Znode节点

3.1 语法
ZooKeeper

类提供了

create

方法,用于在

ZooKeeper

集群中创建新的

Znode

create

方法如下所示:

// 同步方式Stringcreate(String path,byte[] data,List<ACL> acl,CreateMode createMode)throwsKeeperException,InterruptedException// 异步方式voidcreate(String path,byte[] data,List<ACL> acl,CreateMode createMode,StringCallback cb,Object ctx)

同步

create()

方法一共有四个参数:

  • path: 第一个参数是要创建的节点路径。
  • data: 第二个参数是存储在指定创建节点中的数据。参数类型是字节数组。
  • acl: 第三个参数是创建节点的访问权限列表。ZooKeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的 acl 列表。
  • createMode: 第四个参数是创建节点的类型,可以是临时节点,也可以是顺序节点。CreateMode 是一个枚举,它有如下取值: PERSISTENTPERSISTENT_SEQUENTIALEPHEMERALEPHEMERAL_SEQUENTIALCONTAINERPERSISTENT_WITH_TTL 以及 PERSISTENT_SEQUENTIAL_WITH_TTL

除了同步

create

方法中的四个参数以外,异步模式的

create

方法还增加了

cb

ctx

两个参数。

3.2 Example

同步方法会返回创建节点的路径,并且会抛出相应的异常。异步方法没有返回值,也不会抛出异常。

StringCallback

接口中的

processResult

方法会在节点创建好之后被调用,它有四个参数:

  • resultCode: 第一个参数为创建节点的结果码,成功创建节点时,resultCode 的值为0。
  • path: 第二个参数是创建节点的路径。
  • ctx: 第三个参数是上下文名称,当一个 StringCallback 对象作为多个 create 方法的参数时,这个参数就很有用了。
  • name: 第四个参数是创建节点的名字,其实与path参数相同。
packagecom.zookeeper.example;importorg.apache.zookeeper.*;importjava.io.IOException;// 创建Znode节点publicclassZKCreate{// 创建ZooKeeper实例privatestaticZooKeeper zk;// 创建ZooKeeperConnection实例privatestaticZooKeeperConnection conn;// 同步方式创建Znode节点publicstaticvoidcreateNodeSync(String path,byte[] data)throwsKeeperException,InterruptedException{String nodePath = zk.create(path, data,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);System.out.println("NodePath: "+ nodePath);}// 异步方式创建Znode节点publicstaticvoidcreateNodeAsync(String path,byte[] data){
        zk.create(path, data,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,newAsyncCallback.StringCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx,String name){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Name: "+ name);}},"create_node_async");}publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{// 路径String path1 ="/demo/node1";String path2 ="/demo/node2";// 数据byte[] data1 ="zookeeper node1".getBytes();byte[] data2 ="zookeeper node2".getBytes();// 建立连接
        conn =newZooKeeperConnection();
        zk = conn.connect("localhost");// 同步方式创建节点createNodeSync(path1, data1);// 异步方式创建节点createNodeAsync(path2, data2);// 断开连接
        conn.close();}}

编译并执行应用程序后,将在

ZooKeeper

集群中创建具有指定数据的

Znode

。你可以使用

zkCli.sh

进行检查:

[zk: 127.0.0.1:2181(CONNECTED) 10] ls /demo
[node2, node1]

4. 判断Znode是否存在

4.1 语法
ZooKeeper

类提供了检查

Znode

是否存在的

exists

方法。如果指定的

Znode

存在,则返回

Znode

的元数据。

exists

方法如下所示:

// 同步方式Statexists(String path,Watcher watcher)throwsKeeperException,InterruptedExceptionStatexists(String path,boolean watch)throwsKeeperException,InterruptedException// 异步方式voidexists(String path,Watcher watcher,StatCallback cb,Object ctx)voidexists(String path,boolean watch,StatCallback cb,Object ctx)

同步

exists

方法一共有两个参数:

  • path: 第一个参数是 Znode 路径。
  • watch或者watcher: 第二个参数是一个布尔类型的 watch或者是一个自定义的 Watcher 对象。同步模式和异步模式分别有两个方法,第一个方法传递一个新的 Watcher 对象(我们需要自定义实现)。 第二个方法使用默认 Watcher,如果开启 Watcher 只需要将第二个参数设置为 true(不需要我们自己实现)。用来监控节点数据变化以及节点的创建和删除。

除了同步

exists

方法中的两个参数以外,异步模式的

exists

方法还增加了

cb

ctx

两个参数。

4.2 Example
exists

方法的

watch

参数比较特别,如果将其指定为

true

,那么代表你对该节点的创建、删除以及数据内容变更都感兴趣,所以会响应三种事件类型。

packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;// 判断指定路径节点是否存在publicclassZKExists{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式判断指定路径的节点是否存在publicstaticStatexistSync(String path)throwsKeeperException,InterruptedException{return zk.exists(path,true);}// 异步方式判断指定路径的节点是否存在publicstaticvoidexistAsync(String path){
        zk.exists(path,true,newAsyncCallback.StatCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx,Stat stat){System.out.println("ResultCode:"+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Stat: "+ stat);}},"exist_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path1 ="/demo/node1";String path2 ="/demo/node2";
        conn =newZooKeeperConnection();
        zk = conn.connect("localhost");// 同步方式判断指定路径的节点是否存在Stat stat =existSync(path1);if(stat !=null){System.out.println("Node exists and the node version is "+ stat.getVersion());}else{System.out.println("Node does not exists");}// 同步方式判断指定路径的节点是否存在existAsync(path2);// 断开连接
        zk.close();}}

5. 获取节点数据

5.1 语法
ZooKeeper

类提供了

getData

方法来获取指定

Znode

中的数据以及状态。

getData

方法的如下所示:

// 同步方式byte[]getData(String path,Watcher watcher,Stat stat)throwsKeeperException,InterruptedExceptionbyte[]getData(String path,boolean watch,Stat stat)throwsKeeperException,InterruptedException// 异步方式voidgetData(String path,Watcher watcher,DataCallback cb,Object ctx)voidgetData(String path,boolean watch,DataCallback cb,Object ctx)

同步

getData

方法返回值就是节点中存储的数据值,它有三个参数:

  • path: 第一个参数是节点的路径,用于表示要获取哪个节点中的数据。
  • watch或者watcher: 第二个参数是一个布尔类型的 watch或者是一个自定义的 Watcher 对象。用来监控节点数据变化以及节点是否被删除。
  • stat: 第三个参数用于存储节点的状态信息。

除了同步

getData

方法中的三个参数以外,异步模式的

getData

方法还增加了

cb

ctx

两个参数。

5.2 Example

在调用

getData

方法前,会先构造一个空的

Stat

类型对象作为参数传给

getData

方法,当

getData

方法调用返回后,节点的状态信息会被填充到

stat

对象中。

值得注意的是,

zooKeeper

设置的监听只生效一次,如果在接收到事件后还想继续对该节点的数据内容改变进行监听,需要在事件处理逻辑中重新调用

getData

方法并将

watch

参数设置为

true

packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;// 获取指定节点中的数据publicclassZKGetData{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式获取数据publicstaticvoidgetDataSync(String path)throwsKeeperException,InterruptedException{Stat stat =newStat();byte[] data = zk.getData(path,true, stat);System.out.println("Data: "+newString(data));System.out.println("Stat: "+ stat);}// 异步方式获取数据publicstaticvoidgetDataAsync(String path){
        zk.getData(path,true,newAsyncCallback.DataCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx,byte[] data,Stat stat){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Data: "+newString(data));System.out.println("Stat: "+ stat);}},"get_data_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path1 ="/demo/node1";String path2 ="/demo/node2";// 连接服务器
        conn =newZooKeeperConnection();
        zk = conn.connect("localhost");// 同步方式获取数据getDataSync(path1);// 异步方式获取数据getDataAsync(path2);// 断开连接
        conn.close();}}

6. 修改节点的数据内容

6.1 语法
ZooKeeper

类提供了

setData

方法来修改指定

Znode

中的数据。

setData

方法如下所示:

// 同步方法StatsetData(String path,byte[] data,int version)throwsKeeperException,InterruptedException// 异步方法voidsetData(String path,byte[] data,int version,StatCallback cb,Object ctx)

同步

setData

方法有三个参数:

  • path: 第一个参数是节点的路径。
  • data: 第二个参数是修改的数据值。
  • version: 最后一个参数当前 Znode 版本。每当数据发生变化时,ZooKeeper 都会更新 Znode 的版本号。

除了同步

setData

方法中的三个参数以外,异步模式的

setData

方法还增加了

cb

ctx

两个参数。

6.2 Example

在调用

setData

方法修改节点数据内容时,只有当

version

参数的值与节点状态信息中的

dataVersion

值相等时,数据才能修改,否则会抛出

BadVersion

异常。主要是为了防止丢失数据的更新,在

ZooKeeper

提供的API中,所有的写操作都必有

version

参数。

packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;// 修改节点的数据内容publicclassZKSetData{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式修改节点数据内容publicstaticvoidsetDataSync(String path,byte[] data)throwsKeeperException,InterruptedException{
        zk.setData(path, data, zk.exists(path,true).getVersion());}// 异步方式修改节点数据内容publicstaticvoidsetDataAsync(String path,byte[] data)throwsKeeperException,InterruptedException{
        zk.setData(path, data, zk.exists(path,true).getVersion(),newAsyncCallback.StatCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx,Stat stat){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Stat: "+ stat);}},"set_data_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path1 ="/demo/node1";String path2 ="/demo/node2";byte[] data1 ="zookeeper node1 update success".getBytes();byte[] data2 ="zookeeper node2 update success".getBytes();// 连接服务器
        conn =newZooKeeperConnection();
        zk = conn.connect("localhost");// 同步方式修改节点数据内容setDataSync(path1, data1);// 异步方式修改节点数据内容setDataAsync(path2, data2);// 断开连接
        conn.close();}}

7. 获取节点的子节点列表

7.1 语法
ZooKeeper

类提供

getChildren

方法来获取指定

Znode

下的所有子节点。

getChildren

方法如下所示:

// 同步方式List<String>getChildren(String path,Watcher watcher)throwsKeeperException,InterruptedExceptionList<String>getChildren(String path,boolean watch)throwsKeeperException,InterruptedException// 异步方式voidgetChildren(String path,Watcher watcher,ChildrenCallback cb,Object ctx)voidgetChildren(String path,boolean watch,Children2Callback cb,Object ctx)

同步

getChildren

方法有两个参数:

  • path: 第一个参数是节点路径。
  • watch或者watcher: 第二个参数是一个布尔类型的 watch或者是一个自定义的 Watcher 对象。用来监控节点数据变化以及节点是否被删除。用于监控节点的删除以及子节点的创建与删除操作。

除了同步

getChildren

方法中的三个参数以外,异步模式的

getChildren

方法还增加了

cb

ctx

两个参数。

7.2 Example
packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;importjava.util.List;// 获取节点的子节点列表publicclassZKGetChildren{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式获取节点的子节点列表publicstaticvoidgetChildrenSync(String path)throwsKeeperException,InterruptedException{List<String> children = zk.getChildren(path,true);for(int i =0; i < children.size(); i++){System.out.println("Child: "+ children.get(i));}}// 异步方式获取节点的子节点列表publicstaticvoidgetChildrenAsync(String path){
        zk.getChildren(path,true,newAsyncCallback.Children2Callback(){publicvoidprocessResult(int resultCode,String path,Object ctx,List<String> children,Stat stat){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Stat: "+ stat);for(String child : children){System.out.println("Child: "+ child);}}},"get_children_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path ="/demo";// 连接服务器
        conn =newZooKeeperConnection();
        zk = conn.connect("localhost");// 同步方式获取节点的子节点列表getChildrenSync(path);// 异步方式获取节点的子节点列表getChildrenAsync(path);// 断开连接
        conn.close();}}

8. 删除节点

8.1 语法
ZooKeeper

类提供了

delete

方法来删除指定的

Znode

delete

方法如下所示:

// 同步方式voiddelete(String path,int version)throwsInterruptedException,KeeperException// 异步方式voiddelete(String path,int version,VoidCallback cb,Object ctx)

同步

delete

方法有两个参数:

  • path: 第一个参数是节点路径。
  • version: 最后一个参数是当前 Znode 版本。每当数据发生变化时,ZooKeeper 都会更新 Znode 的版本号。

除了同步

delete

方法中的两个参数以外,异步模式的

delete

方法还增加了

cb

ctx

两个参数。

8.2 Example
packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importjava.io.IOException;// 删除节点publicclassZKDelete{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式删除节点publicstaticvoiddeleteSync(String path)throwsKeeperException,InterruptedException{
        zk.delete(path, zk.exists(path,true).getVersion());}// 异步方式删除节点privatestaticvoiddeleteAsync(String path)throwsKeeperException,InterruptedException{
        zk.delete(path, zk.exists(path,true).getVersion(),newAsyncCallback.VoidCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);}},"delete_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path1 ="/test/node1";String path2 ="/test/node2";// 连接服务器
        conn =newZooKeeperConnection();
        zk = conn.connect("localhost");// 同步方式删除节点deleteSync(path1);// 异步方式删除节点deleteAsync(path2);// 断开连接
        conn.close();}}

参考:

  • zookeeper java api介绍
  • Zookeeper - API

本文转载自: https://blog.csdn.net/SunnyYoona/article/details/138549183
版权归原作者 @SmartSi 所有, 如有侵权,请联系我们删除。

“ZooKeeper 实战:ZooKeeper Java API”的评论:

还没有评论