API
ZooKeeper官方提供了Java API,可以通过Java代码来连接zookeeper服务进行操作。可以连接、创建节点、获取节点数据、监听节点变化等操作,具体有以下几个重要的类:
- ZooKeeper:ZooKeeper类是Java API的核心类,用于与ZooKeeper服务器建立连接,并提供了一系列方法来操作ZooKeeper的节点。
- Watcher:Watcher是ZooKeeper的一个回调接口,当节点发生变化时会调用相应的方法进行通知。
- CreateMode:CreateMode枚举类定义了节点的类型,包括永久节点、临时节点、顺序节点和临时顺序节点。
- Stat:Stat类表示节点的元数据信息,比如修改版本、数据长度、子节点数量等。
添加依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.2</version>
</dependency>
操作例子
String host ="localhost:2181";//建立连接
zooKeeper =newZooKeeper(host,2000,null);String path ="/test";Watcher watcher =newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){System.out.println("Node changed: "+ watchedEvent.getPath());System.out.println(watchedEvent);}};//获取节点状态 如果不存在返回nullStat stat = zooKeeper.exists(path,false);if(null!= stat){System.out.println(stat.getCzxid()+"-"+stat.getAversion());}//创建节点 包含版本、时间、数据长度等信息
zooKeeper.create(path,"123".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);//添加watcher
zooKeeper.addWatch(path, watcher,AddWatchMode.PERSISTENT);//获取节点数据byte[] data = zooKeeper.getData(path,false,null);System.out.println(newString(data));
zooKeeper.create(path+"/1","child1".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);//获取子节点List<String> children = zooKeeper.getChildren(path,false);System.out.println("childs size:"+children.size());//删除子节点
zooKeeper.delete(path+"/1",-1);
zooKeeper.close();
zkClient
zkClient封装了zookeeper的官方api,简化了一些繁琐的操作,并提供了一些额外的功能,提高了开发效.
添加依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
zkclient对节点数据的操作进行了序列化, 这里先准备一个string类型的序列化类。需要实现ZkSerializer接口
public class ZkStringSerializer implements ZkSerializer {
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return String.valueOf(o).getBytes();
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return new String(bytes);
}
}
基本操作
ZkClient zkClient =newZkClient("localhost:2181");//自定义序列化 否则报错
zkClient.setZkSerializer(newZkStringSerializer());String path ="/test";//判断节点是否存在boolean exist = zkClient.exists(path);System.out.println(exist);if(!exist){//创建节点
zkClient.create(path,"123",CreateMode.PERSISTENT);}//读取节点数据System.out.println((String) zkClient.readData(path));
zkClient.writeData(path,"456");//设置节点数据System.out.println((String) zkClient.readData(path));
zkClient.delete(path);//删除节点
zkClient.close();
节点变化事件
String path ="/test";/**
* 节点变化事件
* 只监听节点增减,不监听数据变化事件
*/
zkClient.subscribeChildChanges(path,newIZkChildListener(){@OverridepublicvoidhandleChildChange(String parentPath,List<String> children)throwsException{System.out.println("节点"+parentPath+"发生变化");System.out.println(children);}});//节点操作,观察handleChildChange接收到对应事件Thread.sleep(2000);
zkClient.createPersistent(path);Thread.sleep(2000);
zkClient.createPersistent(path+"/child1");Thread.sleep(2000);
zkClient.writeData(path+"/child1","123");Thread.sleep(2000);
zkClient.delete(path+"/child1");Thread.sleep(2000);
zkClient.delete(path);Thread.sleep(100000);
节点数据变化事件
String path ="/test";/**
* 节点变化事件,只检测当前节点,感知不到其子节点
* 节点被删除或节点数据变化
*/
zkClient.subscribeDataChanges(path,newIZkDataListener(){@OverridepublicvoidhandleDataChange(String s,Object o)throwsException{System.out.println("节点:"+s+"数据变为:"+o);}@OverridepublicvoidhandleDataDeleted(String s)throwsException{System.out.println("节点:"+s+"删除");}});Thread.sleep(2000);
zkClient.createPersistent(path);Thread.sleep(2000);
zkClient.createPersistent(path+"/child1");Thread.sleep(2000);
zkClient.delete(path+"/child1");Thread.sleep(2000);
zkClient.writeData(path,"123");Thread.sleep(2000);
zkClient.delete(path);Thread.sleep(100000);}
Curator
curator是另一个java连接zookeeper类库。功能更加强大。提供了连接重试、分布式锁、选举、队列等多种实际场景的用例。这里先简单搞个使用例子。
添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>
curator-framework是基础的依赖,一些特定的使用方式需要添加不同的依赖,有curator-recipes、curator-x-discovery、curator-x-async等。
基本操作
//创建连接CuratorFramework client =CuratorFrameworkFactory.newClient("localhost:2181",newExponentialBackoffRetry(1000,3));
client.start();String path ="/test";
client.checkExists().forPath(path);//判断是否存在
client.create().forPath(path,"123".getBytes());//创建节点byte[] data = client.getData().forPath(path);//获取数据System.out.println(newString(data));
client.setData().forPath(path,"456".getBytes());//设置数据
client.delete().forPath(path);//删除节点
client.close();
节点监听
CuratorFramework client =CuratorFrameworkFactory.newClient("localhost:2181",newExponentialBackoffRetry(1000,3));
client.start();String path ="/test";NodeCache nodeCache =newNodeCache(client,path);//添加监听
nodeCache.getListenable().addListener(newNodeCacheListener(){@OverridepublicvoidnodeChanged()throwsException{ChildData data = nodeCache.getCurrentData();if(data !=null){System.out.println("Node changed: "+ data.getPath()+", value: "+newString(data.getData()));}else{System.out.println("Node deleted: "+ nodeCache.getPath());}}});
nodeCache.start();
client.create().forPath(path);
client.setData().forPath(path,"123".getBytes());
client.delete().forPath(path);
client.close();
这里NodeCache被标识@Deprecated,也不知道被什么方式代替了,后面再研究。先简单使用。
版权归原作者 曹朋羽 所有, 如有侵权,请联系我们删除。