ZooKeeper
API 的核心部分是
ZooKeeper
类。在构造函数中提供一些参数来连接
ZooKeeper
,并提供如下方法:
connect
− 连接ZooKeeper
服务器。create
− 创建一个ZNode
节点。exists
− 检查指定的节点是否存在。getData
− 从指定的节点获取数据。setData
− 为指定的节点设置数据。getChildren
− 获取指定节点的所有子节点。delete
− 删除指定节点以及子节点。close
− 关闭连接。
ZooKeeper 大部分 API 都提供了同步和异步方法。同步方法一般会有返回值,并且会抛出相应的异常。异步方法没有返回值,也不会抛出异常。此外异步方法参数在同步方法参数的基础上,会增加
cb
和
ctx
两个参数。
1. 开发环境
在工程的
pom.xml
文件中加入下面的依赖:
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.5</version></dependency>
截止目前最新版本为 3.5.5 版本。
2. 连接服务器
构造
ZooKeeper
类对象的过程就是与
ZooKeeper
服务器建立连接的过程。
2.1 语法
ZooKeeper
通过构造函数连接服务器。构造函数如下所示:
ZooKeeper(String connectionString,int sessionTimeout,Watcher watcher)throwsIOException
ZooKeeper
构造函数一共有三个参数:
connectionString
: 第一个参数是ZooKeeper
服务器地址(可以指定端口,默认端口号为2181)。sessionTimeout
: 第二个参数是以毫秒为单位的会话超时时间。表示ZooKeeper
等待客户端通信的最长时间,之后会声明会话结束。例如,我们设置为5000,即5s,这就是说如果ZooKeeper
与客户端有5s的时间无法进行通信,ZooKeeper
就会终止客户端的会话。ZooKeeper
会话一般设置超时时间为5-10s。watcher
: 第三个参数是Watcher
对象实例。Watcher
对象接收来自于ZooKeeper
的回调,以获得各种事件的通知。这个对象需要我们自己创建,因为Watcher
定义为接口,所以需要我们自己实现一个类,然后初始化这个类的实例并传入ZooKeeper
的构造函数中。客户端使用Watcher
接口来监控与ZooKeeper
之间会话的健康情况。与ZooKeeper
服务器之间建立或者断开连接时会产生事件。
2.2 Example
下面会创建一个
ZooKeeperConnection
类并实现一个
connect
方法。
connect
方法创建一个
ZooKeeper
对象,连接到
ZooKeeper
集群,然后返回该对象。
CountDownLatch
阻塞主进程,直到客户端连接到
ZooKeeper
集群。
ZooKeeper
通过
Watcher
回调返回连接状态。一旦客户端与
ZooKeeper
集群连接,
Watcher
回调函数会被调用,
Watcher
回调调用
CountDownLatch
的
countDown
方法释放锁。
packagecom.zookeeper.example;importorg.apache.zookeeper.WatchedEvent;importorg.apache.zookeeper.Watcher;importorg.apache.zookeeper.ZooKeeper;importjava.io.IOException;importjava.util.concurrent.CountDownLatch;// 连接ZooKeeper服务器publicclassZooKeeperConnection{// ZooKeeper实例privateZooKeeper zoo;privatefinalCountDownLatch connectedSignal =newCountDownLatch(1);// 连接服务器publicZooKeeperconnect(String host)throwsIOException,InterruptedException{
zoo =newZooKeeper(host,5000,newWatcher(){publicvoidprocess(WatchedEvent event){Event.KeeperState state = event.getState();// 连接成功if(state ==Event.KeeperState.SyncConnected){System.out.println("与ZooKeeper服务器连接成功");
connectedSignal.countDown();}// 断开连接elseif(state ==Event.KeeperState.Disconnected){System.out.println("与ZooKeeper服务器断开连接");}}});
connectedSignal.await();return zoo;}// 断开与服务器的连接publicvoidclose()throwsInterruptedException{
zoo.close();}}
3. 创建Znode节点
3.1 语法
ZooKeeper
类提供了
create
方法,用于在
ZooKeeper
集群中创建新的
Znode
。
create
方法如下所示:
// 同步方式Stringcreate(String path,byte[] data,List<ACL> acl,CreateMode createMode)throwsKeeperException,InterruptedException// 异步方式voidcreate(String path,byte[] data,List<ACL> acl,CreateMode createMode,StringCallback cb,Object ctx)
同步
create()
方法一共有四个参数:
path
: 第一个参数是要创建的节点路径。data
: 第二个参数是存储在指定创建节点中的数据。参数类型是字节数组。acl
: 第三个参数是创建节点的访问权限列表。ZooKeeper
API提供了一个静态接口ZooDefs.Ids
来获取一些基本的acl
列表。createMode
: 第四个参数是创建节点的类型,可以是临时节点,也可以是顺序节点。CreateMode
是一个枚举,它有如下取值:PERSISTENT
、PERSISTENT_SEQUENTIAL
、EPHEMERAL
、EPHEMERAL_SEQUENTIAL
、CONTAINER
、PERSISTENT_WITH_TTL
以及PERSISTENT_SEQUENTIAL_WITH_TTL
。
除了同步
create
方法中的四个参数以外,异步模式的
create
方法还增加了
cb
和
ctx
两个参数。
3.2 Example
同步方法会返回创建节点的路径,并且会抛出相应的异常。异步方法没有返回值,也不会抛出异常。
StringCallback
接口中的
processResult
方法会在节点创建好之后被调用,它有四个参数:
resultCode
: 第一个参数为创建节点的结果码,成功创建节点时,resultCode
的值为0。path
: 第二个参数是创建节点的路径。ctx
: 第三个参数是上下文名称,当一个StringCallback
对象作为多个create
方法的参数时,这个参数就很有用了。name
: 第四个参数是创建节点的名字,其实与path参数相同。
packagecom.zookeeper.example;importorg.apache.zookeeper.*;importjava.io.IOException;// 创建Znode节点publicclassZKCreate{// 创建ZooKeeper实例privatestaticZooKeeper zk;// 创建ZooKeeperConnection实例privatestaticZooKeeperConnection conn;// 同步方式创建Znode节点publicstaticvoidcreateNodeSync(String path,byte[] data)throwsKeeperException,InterruptedException{String nodePath = zk.create(path, data,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);System.out.println("NodePath: "+ nodePath);}// 异步方式创建Znode节点publicstaticvoidcreateNodeAsync(String path,byte[] data){
zk.create(path, data,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,newAsyncCallback.StringCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx,String name){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Name: "+ name);}},"create_node_async");}publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{// 路径String path1 ="/demo/node1";String path2 ="/demo/node2";// 数据byte[] data1 ="zookeeper node1".getBytes();byte[] data2 ="zookeeper node2".getBytes();// 建立连接
conn =newZooKeeperConnection();
zk = conn.connect("localhost");// 同步方式创建节点createNodeSync(path1, data1);// 异步方式创建节点createNodeAsync(path2, data2);// 断开连接
conn.close();}}
编译并执行应用程序后,将在
ZooKeeper
集群中创建具有指定数据的
Znode
。你可以使用
zkCli.sh
进行检查:
[zk: 127.0.0.1:2181(CONNECTED) 10] ls /demo
[node2, node1]
4. 判断Znode是否存在
4.1 语法
ZooKeeper
类提供了检查
Znode
是否存在的
exists
方法。如果指定的
Znode
存在,则返回
Znode
的元数据。
exists
方法如下所示:
// 同步方式Statexists(String path,Watcher watcher)throwsKeeperException,InterruptedExceptionStatexists(String path,boolean watch)throwsKeeperException,InterruptedException// 异步方式voidexists(String path,Watcher watcher,StatCallback cb,Object ctx)voidexists(String path,boolean watch,StatCallback cb,Object ctx)
同步
exists
方法一共有两个参数:
path
: 第一个参数是Znode
路径。watch
或者watcher
: 第二个参数是一个布尔类型的watch
或者是一个自定义的Watcher
对象。同步模式和异步模式分别有两个方法,第一个方法传递一个新的Watcher
对象(我们需要自定义实现)。 第二个方法使用默认Watcher
,如果开启Watcher
只需要将第二个参数设置为true
(不需要我们自己实现)。用来监控节点数据变化以及节点的创建和删除。
除了同步
exists
方法中的两个参数以外,异步模式的
exists
方法还增加了
cb
和
ctx
两个参数。
4.2 Example
exists
方法的
watch
参数比较特别,如果将其指定为
true
,那么代表你对该节点的创建、删除以及数据内容变更都感兴趣,所以会响应三种事件类型。
packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;// 判断指定路径节点是否存在publicclassZKExists{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式判断指定路径的节点是否存在publicstaticStatexistSync(String path)throwsKeeperException,InterruptedException{return zk.exists(path,true);}// 异步方式判断指定路径的节点是否存在publicstaticvoidexistAsync(String path){
zk.exists(path,true,newAsyncCallback.StatCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx,Stat stat){System.out.println("ResultCode:"+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Stat: "+ stat);}},"exist_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path1 ="/demo/node1";String path2 ="/demo/node2";
conn =newZooKeeperConnection();
zk = conn.connect("localhost");// 同步方式判断指定路径的节点是否存在Stat stat =existSync(path1);if(stat !=null){System.out.println("Node exists and the node version is "+ stat.getVersion());}else{System.out.println("Node does not exists");}// 同步方式判断指定路径的节点是否存在existAsync(path2);// 断开连接
zk.close();}}
5. 获取节点数据
5.1 语法
ZooKeeper
类提供了
getData
方法来获取指定
Znode
中的数据以及状态。
getData
方法的如下所示:
// 同步方式byte[]getData(String path,Watcher watcher,Stat stat)throwsKeeperException,InterruptedExceptionbyte[]getData(String path,boolean watch,Stat stat)throwsKeeperException,InterruptedException// 异步方式voidgetData(String path,Watcher watcher,DataCallback cb,Object ctx)voidgetData(String path,boolean watch,DataCallback cb,Object ctx)
同步
getData
方法返回值就是节点中存储的数据值,它有三个参数:
path
: 第一个参数是节点的路径,用于表示要获取哪个节点中的数据。watch
或者watcher
: 第二个参数是一个布尔类型的watch
或者是一个自定义的Watcher
对象。用来监控节点数据变化以及节点是否被删除。stat
: 第三个参数用于存储节点的状态信息。
除了同步
getData
方法中的三个参数以外,异步模式的
getData
方法还增加了
cb
和
ctx
两个参数。
5.2 Example
在调用
getData
方法前,会先构造一个空的
Stat
类型对象作为参数传给
getData
方法,当
getData
方法调用返回后,节点的状态信息会被填充到
stat
对象中。
值得注意的是,
zooKeeper
设置的监听只生效一次,如果在接收到事件后还想继续对该节点的数据内容改变进行监听,需要在事件处理逻辑中重新调用
getData
方法并将
watch
参数设置为
true
。
packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;// 获取指定节点中的数据publicclassZKGetData{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式获取数据publicstaticvoidgetDataSync(String path)throwsKeeperException,InterruptedException{Stat stat =newStat();byte[] data = zk.getData(path,true, stat);System.out.println("Data: "+newString(data));System.out.println("Stat: "+ stat);}// 异步方式获取数据publicstaticvoidgetDataAsync(String path){
zk.getData(path,true,newAsyncCallback.DataCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx,byte[] data,Stat stat){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Data: "+newString(data));System.out.println("Stat: "+ stat);}},"get_data_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path1 ="/demo/node1";String path2 ="/demo/node2";// 连接服务器
conn =newZooKeeperConnection();
zk = conn.connect("localhost");// 同步方式获取数据getDataSync(path1);// 异步方式获取数据getDataAsync(path2);// 断开连接
conn.close();}}
6. 修改节点的数据内容
6.1 语法
ZooKeeper
类提供了
setData
方法来修改指定
Znode
中的数据。
setData
方法如下所示:
// 同步方法StatsetData(String path,byte[] data,int version)throwsKeeperException,InterruptedException// 异步方法voidsetData(String path,byte[] data,int version,StatCallback cb,Object ctx)
同步
setData
方法有三个参数:
path
: 第一个参数是节点的路径。data
: 第二个参数是修改的数据值。version
: 最后一个参数当前Znode
版本。每当数据发生变化时,ZooKeeper
都会更新Znode
的版本号。
除了同步
setData
方法中的三个参数以外,异步模式的
setData
方法还增加了
cb
和
ctx
两个参数。
6.2 Example
在调用
setData
方法修改节点数据内容时,只有当
version
参数的值与节点状态信息中的
dataVersion
值相等时,数据才能修改,否则会抛出
BadVersion
异常。主要是为了防止丢失数据的更新,在
ZooKeeper
提供的API中,所有的写操作都必有
version
参数。
packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;// 修改节点的数据内容publicclassZKSetData{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式修改节点数据内容publicstaticvoidsetDataSync(String path,byte[] data)throwsKeeperException,InterruptedException{
zk.setData(path, data, zk.exists(path,true).getVersion());}// 异步方式修改节点数据内容publicstaticvoidsetDataAsync(String path,byte[] data)throwsKeeperException,InterruptedException{
zk.setData(path, data, zk.exists(path,true).getVersion(),newAsyncCallback.StatCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx,Stat stat){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Stat: "+ stat);}},"set_data_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path1 ="/demo/node1";String path2 ="/demo/node2";byte[] data1 ="zookeeper node1 update success".getBytes();byte[] data2 ="zookeeper node2 update success".getBytes();// 连接服务器
conn =newZooKeeperConnection();
zk = conn.connect("localhost");// 同步方式修改节点数据内容setDataSync(path1, data1);// 异步方式修改节点数据内容setDataAsync(path2, data2);// 断开连接
conn.close();}}
7. 获取节点的子节点列表
7.1 语法
ZooKeeper
类提供
getChildren
方法来获取指定
Znode
下的所有子节点。
getChildren
方法如下所示:
// 同步方式List<String>getChildren(String path,Watcher watcher)throwsKeeperException,InterruptedExceptionList<String>getChildren(String path,boolean watch)throwsKeeperException,InterruptedException// 异步方式voidgetChildren(String path,Watcher watcher,ChildrenCallback cb,Object ctx)voidgetChildren(String path,boolean watch,Children2Callback cb,Object ctx)
同步
getChildren
方法有两个参数:
path
: 第一个参数是节点路径。watch
或者watcher
: 第二个参数是一个布尔类型的watch
或者是一个自定义的Watcher
对象。用来监控节点数据变化以及节点是否被删除。用于监控节点的删除以及子节点的创建与删除操作。
除了同步
getChildren
方法中的三个参数以外,异步模式的
getChildren
方法还增加了
cb
和
ctx
两个参数。
7.2 Example
packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;importjava.util.List;// 获取节点的子节点列表publicclassZKGetChildren{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式获取节点的子节点列表publicstaticvoidgetChildrenSync(String path)throwsKeeperException,InterruptedException{List<String> children = zk.getChildren(path,true);for(int i =0; i < children.size(); i++){System.out.println("Child: "+ children.get(i));}}// 异步方式获取节点的子节点列表publicstaticvoidgetChildrenAsync(String path){
zk.getChildren(path,true,newAsyncCallback.Children2Callback(){publicvoidprocessResult(int resultCode,String path,Object ctx,List<String> children,Stat stat){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);System.out.println("Stat: "+ stat);for(String child : children){System.out.println("Child: "+ child);}}},"get_children_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path ="/demo";// 连接服务器
conn =newZooKeeperConnection();
zk = conn.connect("localhost");// 同步方式获取节点的子节点列表getChildrenSync(path);// 异步方式获取节点的子节点列表getChildrenAsync(path);// 断开连接
conn.close();}}
8. 删除节点
8.1 语法
ZooKeeper
类提供了
delete
方法来删除指定的
Znode
。
delete
方法如下所示:
// 同步方式voiddelete(String path,int version)throwsInterruptedException,KeeperException// 异步方式voiddelete(String path,int version,VoidCallback cb,Object ctx)
同步
delete
方法有两个参数:
path
: 第一个参数是节点路径。version
: 最后一个参数是当前Znode
版本。每当数据发生变化时,ZooKeeper
都会更新Znode
的版本号。
除了同步
delete
方法中的两个参数以外,异步模式的
delete
方法还增加了
cb
和
ctx
两个参数。
8.2 Example
packagecom.zookeeper.example;importorg.apache.zookeeper.AsyncCallback;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooKeeper;importjava.io.IOException;// 删除节点publicclassZKDelete{privatestaticZooKeeper zk;privatestaticZooKeeperConnection conn;// 同步方式删除节点publicstaticvoiddeleteSync(String path)throwsKeeperException,InterruptedException{
zk.delete(path, zk.exists(path,true).getVersion());}// 异步方式删除节点privatestaticvoiddeleteAsync(String path)throwsKeeperException,InterruptedException{
zk.delete(path, zk.exists(path,true).getVersion(),newAsyncCallback.VoidCallback(){publicvoidprocessResult(int resultCode,String path,Object ctx){System.out.println("ResultCode: "+ resultCode);System.out.println("Path: "+ path);System.out.println("Ctx: "+ ctx);}},"delete_async");}publicstaticvoidmain(String[] args)throwsInterruptedException,KeeperException,IOException{String path1 ="/test/node1";String path2 ="/test/node2";// 连接服务器
conn =newZooKeeperConnection();
zk = conn.connect("localhost");// 同步方式删除节点deleteSync(path1);// 异步方式删除节点deleteAsync(path2);// 断开连接
conn.close();}}
参考:
- zookeeper java api介绍
- Zookeeper - API
版权归原作者 @SmartSi 所有, 如有侵权,请联系我们删除。