文章目录
一、Zookeeper的命令行使用
ZooKeeper解压后,在其bin目录下包含着常用的程序,例如 zkServer.sh zkCli.sh
我们使用zkCli.sh 就可以通过命令行使用Zookeeper客户端
连接zookeeper服务器
连接后输入help就可以查看所有命令和使用方式的说明了
#对于本地默认端口 则可以直接 ./zkCli.sh # -server 指定服务地址和端口[root@localhost bin]# ./zkCli.sh -server localhost:15881
创建节点命令
create [-s][-e] path data acl
-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点;acl⽤来进⾏权限控制。
# 创建顺序节点[zk: localhost:15881(CONNECTED)0] create -s /zk-test dataContent1111
Created /zk-test0000000007
# 创建临时节点,临时节点在会话结束后由就会被自动删除[zk: localhost:15881(CONNECTED)0] create -e /zk-temp data222
Created /zk-temp
# 创建永久节点[zk: localhost:15881(CONNECTED)2] create /zk-test-permanent data333
Created /zk-test-permanent
读取节点
可以使用
ls
查看子节点列表,使用 get 命令查看节点的内容
# 使用 ls 命令查看子节点[zk: localhost:15881(CONNECTED)4]ls /
[lg-PERSISTENT, zk-premament, zk-temp, zk-test-permanent, zk-test0000000000, zk-test0000000007, zookeeper]# 使用 get 命令查看节点内容 get -s 则可以附加打印节点状态信息[zk: localhost:15881(CONNECTED)6] get /zk-temp
data222
# stat 命令查看节点状态[zk: localhost:15881(CONNECTED)0]stat /zk-temp
cZxid = 0x30000000a
ctime = Wed Jul 05 10:48:44 CST 2023
mZxid = 0x30000000a
mtime = Wed Jul 05 10:48:44 CST 2023
pZxid = 0x30000000a
cversion =0
dataVersion =0
aclVersion =0
ephemeralOwner = 0x100008d52290003
dataLength =7
numChildren =0
更新节点内容
命令:
set path data [version]
version表示数据版本,在zookeeper中,节点的数据是有版本概念的,这个参数⽤于指定本次更新操作是基于Znode的哪⼀个数据版本进⾏的,如果版本和最新版本对不上则会更新失败,这样可以防止覆盖最新写入的数据。
set /zk-premament 666
删除节点
删除命令
**delete path [version]**
** **如果删除的节点包含子节点,那么必须先删除子节点才能删除对应节点。
二、Zookeeper自带API的使用
2.1 引入API
通过Maven引入Zookeeper提供了java客户端API依赖,截至当前时间最新稳定版是 3.7.1
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.1</version></dependency>
2.1 API简单使用
/**
* zookeeper API 简单使用
*
* @author liuyp
*/publicclassZookeeperApiSimpleTest{//是否完成连接的建立staticboolean connected =false;staticObject lock =newObject();//zookeeper实例对象staticZooKeeper zooKeeper;//定义Watcher的回调 它会收到客户端状态变化的通知,也可以收到节点事件的通知staticWatcher watcherProcess =(watchedEvent)->{//客户端连接成功状态通知if(watchedEvent.getState()==Watcher.Event.KeeperState.SyncConnected&&!connected){System.out.println("watcher回调:客户端连接上线");synchronized(lock){//连接成功就通知方法返回
connected =true;
lock.notifyAll();}}//子节点列表变化通知if(watchedEvent.getType()==Watcher.Event.EventType.NodeChildrenChanged){try{//获取最新的子节点,并重新开启watchList<String> children = zooKeeper.getChildren(watchedEvent.getPath(),true);System.out.println("watcher回调:子节点变化通知 节点:"+ watchedEvent.getPath()+" 的最新子节点:"+ children);}catch(KeeperException e){
e.printStackTrace();}catch(InterruptedException e){
e.printStackTrace();}}//节点内容变更事件if(watchedEvent.getType()==Watcher.Event.EventType.NodeDataChanged){try{byte[] data = zooKeeper.getData(watchedEvent.getPath(),false,null);System.out.println("watcher回调:节点数据变化通知 节点:"+ watchedEvent.getPath()+" 内容为:"+newString(data));}catch(KeeperException e){
e.printStackTrace();}catch(InterruptedException e){
e.printStackTrace();}}//节点删除通知if(watchedEvent.getType()==Watcher.Event.EventType.NodeDeleted){System.out.println("watcher回调:节点被删除通知:"+ watchedEvent.getPath());}};/**
* demo测试入口
*
* @param args
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{//同步的方式建立会话createSession();//测试创建节点,先删除上一次创建的createZNode();//获取节点数据getZNodeData();//更新节点数据updateZNodeData();//删除节点deleteZNode();}/**
* 一、创建会话
* 创建Zookeeper会话初始化Zookeeper对象
* 这里改成同步执行,连接上了方法才返回
*/publicsynchronizedstaticvoidcreateSession()throwsIOException,InterruptedException{//可以配置多个地址客户端会随机连接例如 192.168.188.130:15881,192.168.188.130:15882String connectString ="192.168.188.130:15881";//会话超时时间 单位是毫秒int sessionTimeout =5000;//执行结果立即返回,后台异步建立连接。watcherProcess
zooKeeper =newZooKeeper(connectString, sessionTimeout, watcherProcess);if(connected){return;}//如果没执行完,就让出锁进入等待状态,等待出结果后被唤醒synchronized(lock){
lock.wait();}}/**
* 二、创建znode
*/publicstaticvoidcreateZNode()throwsKeeperException,InterruptedException{//创建一个测试的公共节点,后续都在这个节点下面测试,并且给他加一个watchString testParentNodePath ="/zookeeperApi";if(zooKeeper.exists(testParentNodePath,false)==null){
zooKeeper.create(testParentNodePath,"父节点".getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}//添加监听 exist&getData
zooKeeper.addWatch(testParentNodePath,AddWatchMode.PERSISTENT_RECURSIVE);
zooKeeper.getChildren(testParentNodePath,true);/**
* path:节点创建路径
* data[] :字节数组格式保存到节点的数据
* acl:节点ACL权限设置
* createMode:创建的节点类型。PERSISTENT:持久节点 EPHEMERAL临时节点 ,还有临时顺序节点,持久顺序节点
*/String zNodePersistent = zooKeeper.create(
testParentNodePath +"/persistent","持久节点内容".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);String zNodeEphemeralSequential = zooKeeper.create(
testParentNodePath +"/ephemeralSequential","临时顺序节点内容".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);String zNodeEphemeral = zooKeeper.create(
testParentNodePath +"/persistentEphemeral","临时节点内容".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);}/**
* 三、获取节点数据
*/publicstaticvoidgetZNodeData()throwsKeeperException,InterruptedException{String testParentNodePath ="/zookeeperApi";byte[] data = zooKeeper.getData(testParentNodePath,false,null);System.out.println("节点:"+ testParentNodePath +" 内容为:"+newString(data));}/**
* 三、更新节点数据
*/publicstaticvoidupdateZNodeData()throwsKeeperException,InterruptedException{String testParentNodePath ="/zookeeperApi";
zooKeeper.setData(testParentNodePath,("新数据"+Math.random()).getBytes(),-1);}/**
* 四、删除znode
*/publicstaticvoiddeleteZNode()throwsKeeperException,InterruptedException{String testParentNodePath ="/zookeeperApi";
zooKeeper.delete(testParentNodePath +"/persistent",-1);}}
三、Zookeeper三方客户端zkClient的使用
项目地址:https://github.com/sgroschupf/zkclient/issues
zkClient是git上的一个开源的zookeeper的java客户端项目,是对zookeeper原生API的封装,使得其更易用了。
优势:1. session重连 2.watch重主策 3.递归删除/添加节点
注意:项目最新更新日期是2018年,上生产使用前需要考虑漏洞问题。
3.1 引入依赖
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient --><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version></dependency>
3.2 简单的使用案例
publicclassZkClientTest{staticCountDownLatch countDownLatch =newCountDownLatch(1);publicstaticvoidmain(String[] args)throwsInterruptedException{String testzkClientPath ="/zkClientAPI";//建立连接,这里是同步的方式String connectString ="192.168.188.130:15881";ZkClient zkClient =newZkClient(connectString);//创建节点,zkClient支持递归创建,没有父节点会自动创建对应的父节点
zkClient.createPersistent(testzkClientPath +"/persistent",true);
zkClient.createPersistent(testzkClientPath +"/persistent_readyDelete",true);//删除节点 zkClient支持自动删除节点下的子节点
zkClient.delete(testzkClientPath +"/persistent_readyDelete",-1);//获取子节点List<String> children = zkClient.getChildren(testzkClientPath);System.out.println("读取节点:"+ testzkClientPath +" 子节点:"+ children);//监听事件注册//注册子节点变更事件
zkClient.subscribeChildChanges(testzkClientPath,(path, childNodeList)->{System.out.println("节点子节点监听事件通知:节点:"+ path +" 最新子节点:"+ childNodeList);});//注册节点数据变更事件
zkClient.subscribeDataChanges(testzkClientPath,newIZkDataListener(){@OverridepublicvoidhandleDataChange(String s,Object o)throwsException{System.out.println("节点数据监听事件通知:节点:"+ s +" 最新数据:"+ o);}@OverridepublicvoidhandleDataDeleted(String s)throwsException{System.out.println("节点数据监听事件通知:节点:"+ s +" 已删除");}});//写入节点数据
zkClient.writeData(testzkClientPath,System.currentTimeMillis()+"写入数据");//获取节点数据Object readDataResult = zkClient.readData(testzkClientPath);System.out.println("读取节点数据:"+ testzkClientPath +" : "+ readDataResult);//删除节点
zkClient.deleteRecursive(testzkClientPath);//阻塞最后的结束程序
countDownLatch.await();}}
四、Curator 客户端框架
项目地址:https://github.com/apache/curator
最开始由 netflix 在github上开源,2013年成为apache顶级项目,至今仍在更新
和ZkClient一样,Curator解决了很多细节的底层工作,包括连接重连、watch自动重新注册
节点不存在异常等,并且提供了基于fluent编程风格的支持
4.1 引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.5.0</version></dependency>
4.2 简单使用案例
/**
* Curator 是Netflix公司开源的一套ZooKeeper客户端框架
* 和ZkClient一样,Curator解决了很多细节的底层工作,包括连接重连、watch自动重新注册
* 节点不存在异常等,并且提供了基于fluent编程风格的支持
* @author liuyp
*/publicclassCuratorTest{publicstaticvoidmain(String[] args)throwsException{//连接信息,多个连接使用逗号分隔String connectString ="192.168.188.130:15881";/**
* 一、发起连接
*
* RetryPolicy重连策略 默认提供三种重连策略
* 1、ExponentialBackoffRetry(基于backoff的重连策略)重新尝试一定次数,并增加重试之间的睡眠时间
* 2、RetryNTimes(重连N次策略)
* 3、RetryForever(永远重试策略)
*
* 创建连接 CuratorFramework
* 1、通过CuratorFrameworkFactory.newClient 底层是CuratorFrameworkFactory.build
* 2、直接通过 CuratorFrameworkFactory.build
*
* 启动连接 CuratorFramework.start()
*/int baseSleepTimeMs=1000;//重试之间等待的初始时间int maxRetries=5;//最大重试次数int maxSleepMs=5000;//每次重试的最大睡眠时间 如果算出来的sleepMs超过这个时间,则采用maxSleepMs//重试间隔时间: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));RetryPolicy retryPolicy=newExponentialBackoffRetry(baseSleepTimeMs,maxRetries,maxSleepMs);CuratorFramework client =CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(10000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("curatorAPI")//加上这个以后,所有路径都是以这个路径为根路径.build();
client.start();System.out.println("**********客户端已启动**********");/**
* 二、创建节点
* 1、默认创建内容为空的永久节点
* 2、设置节点内容和原生一样,使用字节数组
* 3、可以使用 creatingParentsIfNeeded 方法自动创建父节点,避免需要递归判断父节点是否存在
*/
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/tempNode/create","临时节点".getBytes(StandardCharsets.UTF_8));/**
* 三、测试增加监听
* 1、监听类型 PERSISTENT_RECURSIVE 会循环监听注册节点和其子节点的数据变化和是否存在
*/CuratorWatcher curatorWatcher=(watchevent)->{System.out.println("[监听通知:]"+"节点:"+watchevent.getPath()+" "+watchevent.getType());};
client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(curatorWatcher).forPath("/tempNode");
client.create().forPath("/tempNode/watcher");/**
* 三、读取&修改节点数据 并获取状态数据
*/Stat stat=newStat();byte[] bytes = client.getData().storingStatIn(stat).forPath("/tempNode/create");System.out.println("读取节点数据:"+newString(bytes,StandardCharsets.UTF_8));System.out.println("读取节点状态:"+stat.toString());
client.setData().forPath("/tempNode/create","节点/tempNode/create的新数据".getBytes(StandardCharsets.UTF_8));/**
* 四、删除节点
*/
client.delete().forPath("/tempNode/watcher");
client.delete().forPath("/tempNode/create");
client.delete().forPath("/tempNode");}}
版权归原作者 栗子叶 所有, 如有侵权,请联系我们删除。