0


zookeeper 常见客户端介绍和使用 zkCli、自带API、 zkClient、Curator

文章目录

一、Zookeeper的命令行使用

ZooKeeper解压后,在其bin目录下包含着常用的程序,例如 zkServer.sh zkCli.sh
我们使用zkCli.sh 就可以通过命令行使用Zookeeper客户端
连接zookeeper服务器
连接后输入help就可以查看所有命令和使用方式的说明了

#对于本地默认端口 则可以直接 ./zkCli.sh # -server 指定服务地址和端口[root@localhost bin]# ./zkCli.sh -server localhost:15881

创建节点命令

create [-s][-e] path data acl

-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点;acl⽤来进⾏权限控制。

# 创建顺序节点[zk: localhost:15881(CONNECTED)0] create -s /zk-test dataContent1111
Created /zk-test0000000007 

# 创建临时节点,临时节点在会话结束后由就会被自动删除[zk: localhost:15881(CONNECTED)0] create -e /zk-temp data222
Created /zk-temp

# 创建永久节点[zk: localhost:15881(CONNECTED)2] create /zk-test-permanent data333
Created /zk-test-permanent

读取节点
可以使用

ls

查看子节点列表,使用 get 命令查看节点的内容

# 使用 ls 命令查看子节点[zk: localhost:15881(CONNECTED)4]ls /
[lg-PERSISTENT, zk-premament, zk-temp, zk-test-permanent, zk-test0000000000, zk-test0000000007, zookeeper]# 使用 get 命令查看节点内容 get -s 则可以附加打印节点状态信息[zk: localhost:15881(CONNECTED)6] get /zk-temp
data222

# stat 命令查看节点状态[zk: localhost:15881(CONNECTED)0]stat /zk-temp
cZxid = 0x30000000a
ctime = Wed Jul 05 10:48:44 CST 2023
mZxid = 0x30000000a
mtime = Wed Jul 05 10:48:44 CST 2023
pZxid = 0x30000000a
cversion =0
dataVersion =0
aclVersion =0
ephemeralOwner = 0x100008d52290003
dataLength =7
numChildren =0

更新节点内容
命令:

set path data [version]

version表示数据版本,在zookeeper中,节点的数据是有版本概念的,这个参数⽤于指定本次更新操作是基于Znode的哪⼀个数据版本进⾏的,如果版本和最新版本对不上则会更新失败,这样可以防止覆盖最新写入的数据。

set /zk-premament 666

删除节点
删除命令

**delete path [version]**

** **如果删除的节点包含子节点,那么必须先删除子节点才能删除对应节点。

二、Zookeeper自带API的使用

2.1 引入API

通过Maven引入Zookeeper提供了java客户端API依赖,截至当前时间最新稳定版是 3.7.1

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.1</version></dependency>

2.1 API简单使用

/**
 * zookeeper API 简单使用
 *
 * @author liuyp
 */publicclassZookeeperApiSimpleTest{//是否完成连接的建立staticboolean connected =false;staticObject lock =newObject();//zookeeper实例对象staticZooKeeper zooKeeper;//定义Watcher的回调 它会收到客户端状态变化的通知,也可以收到节点事件的通知staticWatcher watcherProcess =(watchedEvent)->{//客户端连接成功状态通知if(watchedEvent.getState()==Watcher.Event.KeeperState.SyncConnected&&!connected){System.out.println("watcher回调:客户端连接上线");synchronized(lock){//连接成功就通知方法返回
                connected =true;
                lock.notifyAll();}}//子节点列表变化通知if(watchedEvent.getType()==Watcher.Event.EventType.NodeChildrenChanged){try{//获取最新的子节点,并重新开启watchList<String> children = zooKeeper.getChildren(watchedEvent.getPath(),true);System.out.println("watcher回调:子节点变化通知 节点:"+ watchedEvent.getPath()+" 的最新子节点:"+ children);}catch(KeeperException e){
                e.printStackTrace();}catch(InterruptedException e){
                e.printStackTrace();}}//节点内容变更事件if(watchedEvent.getType()==Watcher.Event.EventType.NodeDataChanged){try{byte[] data = zooKeeper.getData(watchedEvent.getPath(),false,null);System.out.println("watcher回调:节点数据变化通知 节点:"+ watchedEvent.getPath()+" 内容为:"+newString(data));}catch(KeeperException e){
                e.printStackTrace();}catch(InterruptedException e){
                e.printStackTrace();}}//节点删除通知if(watchedEvent.getType()==Watcher.Event.EventType.NodeDeleted){System.out.println("watcher回调:节点被删除通知:"+ watchedEvent.getPath());}};/**
     * demo测试入口
     *
     * @param args
     * @throws IOException
     * @throws InterruptedException
     * @throws KeeperException
     */publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{//同步的方式建立会话createSession();//测试创建节点,先删除上一次创建的createZNode();//获取节点数据getZNodeData();//更新节点数据updateZNodeData();//删除节点deleteZNode();}/**
     * 一、创建会话
     * 创建Zookeeper会话初始化Zookeeper对象
     * 这里改成同步执行,连接上了方法才返回
     */publicsynchronizedstaticvoidcreateSession()throwsIOException,InterruptedException{//可以配置多个地址客户端会随机连接例如 192.168.188.130:15881,192.168.188.130:15882String connectString ="192.168.188.130:15881";//会话超时时间 单位是毫秒int sessionTimeout =5000;//执行结果立即返回,后台异步建立连接。watcherProcess
        zooKeeper =newZooKeeper(connectString, sessionTimeout, watcherProcess);if(connected){return;}//如果没执行完,就让出锁进入等待状态,等待出结果后被唤醒synchronized(lock){
            lock.wait();}}/**
     * 二、创建znode
     */publicstaticvoidcreateZNode()throwsKeeperException,InterruptedException{//创建一个测试的公共节点,后续都在这个节点下面测试,并且给他加一个watchString testParentNodePath ="/zookeeperApi";if(zooKeeper.exists(testParentNodePath,false)==null){
            zooKeeper.create(testParentNodePath,"父节点".getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}//添加监听 exist&getData
        zooKeeper.addWatch(testParentNodePath,AddWatchMode.PERSISTENT_RECURSIVE);
        zooKeeper.getChildren(testParentNodePath,true);/**
         * path:节点创建路径
         * data[] :字节数组格式保存到节点的数据
         * acl:节点ACL权限设置
         * createMode:创建的节点类型。PERSISTENT:持久节点 EPHEMERAL临时节点 ,还有临时顺序节点,持久顺序节点
         */String zNodePersistent = zooKeeper.create(
                testParentNodePath +"/persistent","持久节点内容".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);String zNodeEphemeralSequential = zooKeeper.create(
                testParentNodePath +"/ephemeralSequential","临时顺序节点内容".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);String zNodeEphemeral = zooKeeper.create(
                testParentNodePath +"/persistentEphemeral","临时节点内容".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);}/**
     * 三、获取节点数据
     */publicstaticvoidgetZNodeData()throwsKeeperException,InterruptedException{String testParentNodePath ="/zookeeperApi";byte[] data = zooKeeper.getData(testParentNodePath,false,null);System.out.println("节点:"+ testParentNodePath +" 内容为:"+newString(data));}/**
     * 三、更新节点数据
     */publicstaticvoidupdateZNodeData()throwsKeeperException,InterruptedException{String testParentNodePath ="/zookeeperApi";
        zooKeeper.setData(testParentNodePath,("新数据"+Math.random()).getBytes(),-1);}/**
     * 四、删除znode
     */publicstaticvoiddeleteZNode()throwsKeeperException,InterruptedException{String testParentNodePath ="/zookeeperApi";
        zooKeeper.delete(testParentNodePath +"/persistent",-1);}}

三、Zookeeper三方客户端zkClient的使用

项目地址:https://github.com/sgroschupf/zkclient/issues
zkClient是git上的一个开源的zookeeper的java客户端项目,是对zookeeper原生API的封装,使得其更易用了。
优势:1. session重连 2.watch重主策 3.递归删除/添加节点
注意:项目最新更新日期是2018年,上生产使用前需要考虑漏洞问题。

3.1 引入依赖

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient --><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version></dependency>

3.2 简单的使用案例

publicclassZkClientTest{staticCountDownLatch countDownLatch =newCountDownLatch(1);publicstaticvoidmain(String[] args)throwsInterruptedException{String testzkClientPath ="/zkClientAPI";//建立连接,这里是同步的方式String connectString ="192.168.188.130:15881";ZkClient zkClient =newZkClient(connectString);//创建节点,zkClient支持递归创建,没有父节点会自动创建对应的父节点
        zkClient.createPersistent(testzkClientPath +"/persistent",true);
        zkClient.createPersistent(testzkClientPath +"/persistent_readyDelete",true);//删除节点 zkClient支持自动删除节点下的子节点
        zkClient.delete(testzkClientPath +"/persistent_readyDelete",-1);//获取子节点List<String> children = zkClient.getChildren(testzkClientPath);System.out.println("读取节点:"+ testzkClientPath +" 子节点:"+ children);//监听事件注册//注册子节点变更事件
        zkClient.subscribeChildChanges(testzkClientPath,(path, childNodeList)->{System.out.println("节点子节点监听事件通知:节点:"+ path +" 最新子节点:"+ childNodeList);});//注册节点数据变更事件
        zkClient.subscribeDataChanges(testzkClientPath,newIZkDataListener(){@OverridepublicvoidhandleDataChange(String s,Object o)throwsException{System.out.println("节点数据监听事件通知:节点:"+ s +" 最新数据:"+ o);}@OverridepublicvoidhandleDataDeleted(String s)throwsException{System.out.println("节点数据监听事件通知:节点:"+ s +" 已删除");}});//写入节点数据
        zkClient.writeData(testzkClientPath,System.currentTimeMillis()+"写入数据");//获取节点数据Object readDataResult = zkClient.readData(testzkClientPath);System.out.println("读取节点数据:"+ testzkClientPath +" : "+ readDataResult);//删除节点
        zkClient.deleteRecursive(testzkClientPath);//阻塞最后的结束程序
        countDownLatch.await();}}

四、Curator 客户端框架

项目地址:https://github.com/apache/curator
最开始由 netflix 在github上开源,2013年成为apache顶级项目,至今仍在更新
和ZkClient一样,Curator解决了很多细节的底层工作,包括连接重连、watch自动重新注册
节点不存在异常等,并且提供了基于fluent编程风格的支持

4.1 引入依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.5.0</version></dependency>

4.2 简单使用案例

/**
 * Curator 是Netflix公司开源的一套ZooKeeper客户端框架
 * 和ZkClient一样,Curator解决了很多细节的底层工作,包括连接重连、watch自动重新注册
 * 节点不存在异常等,并且提供了基于fluent编程风格的支持
 * @author liuyp
 */publicclassCuratorTest{publicstaticvoidmain(String[] args)throwsException{//连接信息,多个连接使用逗号分隔String connectString ="192.168.188.130:15881";/**
         * 一、发起连接
         *
         * RetryPolicy重连策略 默认提供三种重连策略
         * 1、ExponentialBackoffRetry(基于backoff的重连策略)重新尝试一定次数,并增加重试之间的睡眠时间
         * 2、RetryNTimes(重连N次策略)
         * 3、RetryForever(永远重试策略)
         *
         * 创建连接 CuratorFramework
         * 1、通过CuratorFrameworkFactory.newClient 底层是CuratorFrameworkFactory.build
         * 2、直接通过 CuratorFrameworkFactory.build
         *
         * 启动连接 CuratorFramework.start()
         */int baseSleepTimeMs=1000;//重试之间等待的初始时间int maxRetries=5;//最大重试次数int maxSleepMs=5000;//每次重试的最大睡眠时间 如果算出来的sleepMs超过这个时间,则采用maxSleepMs//重试间隔时间: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));RetryPolicy retryPolicy=newExponentialBackoffRetry(baseSleepTimeMs,maxRetries,maxSleepMs);CuratorFramework client =CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(10000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("curatorAPI")//加上这个以后,所有路径都是以这个路径为根路径.build();

        client.start();System.out.println("**********客户端已启动**********");/**
         * 二、创建节点
         * 1、默认创建内容为空的永久节点
         * 2、设置节点内容和原生一样,使用字节数组
         * 3、可以使用 creatingParentsIfNeeded 方法自动创建父节点,避免需要递归判断父节点是否存在
         */
        client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/tempNode/create","临时节点".getBytes(StandardCharsets.UTF_8));/**
         * 三、测试增加监听
         * 1、监听类型 PERSISTENT_RECURSIVE 会循环监听注册节点和其子节点的数据变化和是否存在
         */CuratorWatcher curatorWatcher=(watchevent)->{System.out.println("[监听通知:]"+"节点:"+watchevent.getPath()+" "+watchevent.getType());};
        client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(curatorWatcher).forPath("/tempNode");
        client.create().forPath("/tempNode/watcher");/**
         * 三、读取&修改节点数据 并获取状态数据
         */Stat stat=newStat();byte[] bytes = client.getData().storingStatIn(stat).forPath("/tempNode/create");System.out.println("读取节点数据:"+newString(bytes,StandardCharsets.UTF_8));System.out.println("读取节点状态:"+stat.toString());
        client.setData().forPath("/tempNode/create","节点/tempNode/create的新数据".getBytes(StandardCharsets.UTF_8));/**
         * 四、删除节点
         */
        client.delete().forPath("/tempNode/watcher");
        client.delete().forPath("/tempNode/create");
        client.delete().forPath("/tempNode");}}

本文转载自: https://blog.csdn.net/weixin_40979518/article/details/135375169
版权归原作者 栗子叶 所有, 如有侵权,请联系我们删除。

“zookeeper 常见客户端介绍和使用 zkCli、自带API、 zkClient、Curator”的评论:

还没有评论