0


Zookeeper(五)Zokeeper 环境搭建与Curator使用

目录

  • 官网:Apache ZooKeeper

一 环境搭建

1.1 单机环境搭建

image.png
image.png
image.png

  • 我这里是本地环境,说名一下,无脑解压一下,放在本地环境目录

image.png

  • 复制配置文件一份zoo_sample未zoo

image.png

  • 修改配置文件

image.png

  • 启动

image.png

  • 查看

image.png

1.2 可视化工具ZooKeeper Assistant

image.png

  • 查看状况

image.png

1.3 集群环境搭建

  • 这里我是伪集群环境搭建,注意集群环境只能是奇数

image.png

  • 这里我模拟三套服务器环境,一个主节点,两个从节点,主要是配置文件的变化
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:\\Tools\\Zookeeper\\ServerA\\data
dataLogDir=D:\\Tools\\Zookeeper\\ServerA\\log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=localhost:2886:3886
server.2=localhost:2887:3887
server.3=localhost:2888:3888

image.png
image.png
server.A=B:C:D;其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

  • myid 建立,依次写入1,2,3,id被称为Server ID,用来标识该机器在集群中的机器序号。同时,在每台ZooKeeper机器上,我们都需要在数据目录(即dataDir参数指定的那个目录)下创建一个 myid文件,该文件只有一行内容,并且是一个数字,即对应于每台机器的Server ID数字。

image.png

  • 后面的B,C一样的,一键启动脚本
@echo off
start cmd /k "cd /d D:\Tools\Zookeeper\ServerA\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerA...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerB\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerB...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerC\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerC...
echo All ZooKeeper servers have been started.
  • 查看启动日志可以看到主从节点情况

image.png

二 常用命令

1.1 命令行语法

命令行语法****功能描述help显示所有操作命令ls path使用ls命令来查看当前znode的子节点[可监听], -w 监听子节点变化, -s 附加次级信息create普通创建, -s 含有序列, -e 临时(重启或超时消失)get path获得节点的值[可监听] -w 监听节点内容变化, -s 附加次级信息set设置节点的具体值stat查看节点的状态delete删除节点deleteall递归删除节点

1.2 数据节点信息

[zk: bigdata01:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  1. cZxid: 创建的事务zxid 每次修改Zookeeper状态都会产生一个zookeeper事务ID, 事务ID是Zookeeper中所有修改总的次序. 每次修改都有唯一的zxid, 如果zxid1小于zxid2, 那么zxid1在zxid2之前发生.
  2. ctime: znode被创建的毫秒数(从1970开始)
  3. mzxid: znode最后更新的事务zxid
  4. mtime: znode最后修改的毫秒数(从1970开始)
  5. pZxid: znode最后更新的子节点zxid
  6. cversion: znode子节点变化号, znode子节点修改次数
  7. dataversion:znode 数据变化号
  8. aclVersion:znode 访问控制列表的变化号
  9. ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
  10. dataLength:znode 的数据长度
  11. numChildren:znode 子节点数量

1.3 节点类型


这个命令就需要自己去练了

三 CuratorAPI使用

3.1 依赖

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-x-discovery</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>4.0.0</version>
            <scope>test</scope>
        </dependency>

3.1 创建会话

使用CuratorFrameworkFactory这个工厂类的两个静态方法来创建一个客户端
image.png
image.png
image.png

packagecom.shu;importorg.apache.curator.RetryPolicy;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.retry.ExponentialBackoffRetry;importorg.apache.zookeeper.ZooDefs;importorg.apache.zookeeper.data.ACL;importjava.util.ArrayList;importjava.util.List;/**
 * @author 31380
 * @description
 * @create 2024/3/16 18:39
 */publicclassCuratorUtils{/**
     * 创建连接
     *
     * @param connectionString  连接地址
     * @param sessionTimeout    会话超时时间
     * @param connectionTimeout 连接超时时间
     * @return
     */publicstaticCuratorFrameworkcreateCuratorFramework(String connectionString,int sessionTimeout,int connectionTimeout){returnCuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).retryPolicy(newExponentialBackoffRetry(1000,3)).connectionTimeoutMs(connectionTimeout).build();}/**
     * 创建连接
     *
     * @param connectionString  连接地址
     * @param sessionTimeout    会话超时时间
     * @param connectionTimeout 连接超时时间
     * @param retryPolicy       重试策略
     * @return
     */publicstaticCuratorFrameworkcreateCuratorFrameworkWithRetry(String connectionString,int sessionTimeout,int connectionTimeout,RetryPolicy retryPolicy
                                                                  ){returnCuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).retryPolicy(retryPolicy).build();}/**
     * 创建一个隔离的命名空间
     */publicstaticCuratorFrameworkcreateNamespaceCuratorFramework(String connectionString,int sessionTimeout,int connectionTimeout,String namespace){returnCuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).namespace(namespace).retryPolicy(newExponentialBackoffRetry(1000,3)).build();}/**
     * ZooDefs.Perms.READ:读权限
     * ZooDefs.Perms.WRITE:写权限
     * ZooDefs.Perms.CREATE:创建子节点权限
     * ZooDefs.Perms.DELETE:删除权限
     * ZooDefs.Perms.ADMIN:管理权限
     * ZooDefs.Perms.ALL:所有权限
     * 以下是一些常用的身份验证方案:
     * Ids.ANYONE_ID_UNSAFE:表示任何人都可以访问
     * Ids.AUTH_IDS:表示使用已验证的用户身份
     * Ids.OPEN_ACL_UNSAFE:表示开放的ACL,任何人都可以访问
     *  ACL acl = new ACL(ZooDefs.Perms.READ, new Id("myUser", "myPassword"));
     * @return
     */publicstaticList<ACL>getAclList(){ArrayList<ACL> acls =newArrayList<>();// 权限设置ACL acl =newACL(ZooDefs.Perms.ALL,ZooDefs.Ids.ANYONE_ID_UNSAFE);// 添加权限
        acls.add(acl);return acls;}}

3.2 基本使用增删改查

  • 新增
packagecom.shu.base;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.zookeeper.CreateMode;importjava.util.List;/**
 * @author 31380
 * @description
 * @create 2024/3/16 18:43
 */publicclassCuratorCreatTest{/**
     * 总结:curator创建节点方法
     * 1.创建节点,如果节点已经存在则抛出异常 create().forPath()
     * 2.withMode():节点类型: CreateMode.EPHEMERAL 临时节点,CreateMode.PERSISTENT 永久节点
     * 3.递归创建节点 creatingParentsIfNeeded()
     * 4.查询所有子节点 getChildren().forPath()
     * 5.删除节点 delete().forPath()
     * 6.判断节点是否存在 checkExists().forPath()
     * 7.关闭连接 close()
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{CuratorFramework curatorClint =CuratorUtils.createCuratorFramework("127.0.0.1:2181",1000,1000);System.out.println("连接成功!");
        curatorClint.start();// 创建节点,如果节点已经存在则抛出异常try{
            curatorClint.create().forPath("/test");}catch(Exception e){System.out.println("创建节点失败!"+e.getMessage());}// 删除节点try{
            curatorClint.delete().forPath("/test");System.out.println("删除节点成功!");}catch(Exception e){System.out.println("删除节点失败!"+e.getMessage());}/**
         * 临时节点(EPHEMERAL):临时创建的,会话结束节点自动被删除,也可以手动删除,临时节点不能拥有子节点.
         * 持久节点(PERSISTENT):创建后永久存在,除非主动删除。
         */// 临时节点,当会话结束后,节点自动删除
        curatorClint.create().withMode(CreateMode.EPHEMERAL).forPath("/secondPath","hello,word".getBytes());System.out.println("临时节点:"+newString(curatorClint.getData().forPath("/secondPath")));// 永久节点
        curatorClint.create().withMode(CreateMode.PERSISTENT).forPath("/thirdPath","hello,word".getBytes());System.out.println("永久节点:"+newString(curatorClint.getData().forPath("/thirdPath")));// 递归创建节点
        curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/parent/child","hello,word".getBytes());System.out.println("递归创建节点:"+newString(curatorClint.getData().forPath("/parent/child")));// 查询所有子节点List<String> list= curatorClint.getChildren().forPath("/");System.out.println(list);// 关闭连接
        curatorClint.close();}}
  • 读取
packagecom.shu.base;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.zookeeper.data.Stat;importjava.util.Date;/**
 * @author 31380
 * @description 读取节点数据
 * @create 2024/3/17 11:28
 */publicclassCuratorReadTest{/**
     * 总结:
     * 1. 读取单个节点数据:curatorClint.getData().forPath("/base/test")
     * 2. 读取多个节点数据:curatorClint.getChildren().forPath("/test").forEach(System.out::println)
     * 3. 读取节点数据并获取 stat:curatorClint.getData().storingStatIn(stat).forPath("/base/test")
     * 4:Stat:节点状态,包含节点的版本、数据长度、子节点数量、创建时间、修改时间、最近一次修改的事务 ID、数据版本、ACL 版本、临时节点
     * @param args
     */publicstaticvoidmain(String[] args){// 地址String connectString ="127.0.0.1:2181";// 确保连接字符串正确CuratorFramework curatorClint =CuratorUtils.createCuratorFramework(connectString,1000,1000);System.out.println("连接成功!");
        curatorClint.start();// 读取单个节点数据try{byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(newString(bytes));System.out.println("读取节点数据成功!");}catch(Exception e){System.out.println("读取节点数据失败!"+e.getMessage());}// 读取多个节点数据try{
            curatorClint.getChildren().forPath("/test").forEach(System.out::println);System.out.println("读取多个节点数据成功!");}catch(Exception e){System.out.println("读取多个节点数据失败!"+e.getMessage());}try{Stat stat =newStat();byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");String dataString =newString(data);System.out.println("节点数据:"+ dataString);System.out.println("节点状态:");System.out.println("  节点创建版本:"+ stat.getCversion());System.out.println("  数据长度:"+ stat.getDataLength());System.out.println("  子节点数量:"+ stat.getNumChildren());System.out.println("  创建时间:"+newDate(stat.getCtime()));System.out.println("  修改时间:"+newDate(stat.getMtime()));System.out.println("  最近一次修改的事务 ID:"+ stat.getMzxid());System.out.println("  数据版本:"+ stat.getVersion());System.out.println("  ACL 版本:"+ stat.getAversion());System.out.println("  临时节点:"+ stat.getEphemeralOwner());System.out.println("读取节点数据并获取 stat 成功!");}catch(Exception e){System.out.println("读取节点数据并获取 stat 失败:"+ e.getMessage());}// 关闭连接
        curatorClint.close();}}
  • 删除
packagecom.shu.base;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;/**
 * @author 31380
 * @description
 * @create 2024/3/16 19:25
 */publicclassCuratorDeleteTest{/**
     * 总结:
     * 1. 删除节点:delete().forPath("/test")
     * 2. 如果存在子节点,删除子节点:delete().deletingChildrenIfNeeded().forPath("/parent")
     * 3. 递归删除节点:delete().deletingChildrenIfNeeded().forPath("/secondPath")
     * 4. 判断节点是否存在:checkExists().forPath("/secondPath")
     * 5. 关闭连接:close()
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{CuratorFramework curatorClint =CuratorUtils.createCuratorFramework("127.0.0.1:2181",1000,1000);System.out.println("连接成功!");
        curatorClint.start();// 删除节点try{
            curatorClint.delete().forPath("/test");System.out.println("删除节点成功!");}catch(Exception e){System.out.println("删除节点失败!"+e.getMessage());}// 如果存在子节点,删除子节点try{
            curatorClint.delete().deletingChildrenIfNeeded().forPath("/parent");System.out.println("删除节点成功!");}catch(Exception e){System.out.println("删除节点失败!"+e.getMessage());}// 递归删除节点
        curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath");// 判断节点是否存在if(curatorClint.checkExists().forPath("/secondPath")==null){System.out.println("节点不存在!");}else{System.out.println("节点存在!");}// 关闭连接
        curatorClint.close();}}
  • 修改
packagecom.shu.base;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.zookeeper.data.Stat;/**
 * @author 31380
 * @description
 * @create 2024/3/17 11:35
 */publicclassCuratorUpdateTest{/**
     * 总计
     * 1. 更新节点:setData().forPath("/test", "hello,word".getBytes())
     * 2. 指定版本更新节点:setData().withVersion(1).forPath("/test", "hello,word".getBytes())
     * @param args
     */publicstaticvoidmain(String[] args)throwsException{// 地址String connectString ="127.0.0.1:2181";//创建节点CuratorFramework curatorClint =CuratorUtils.createCuratorFramework(connectString,1000,1000);System.out.println("连接成功!");
        curatorClint.start();// 更新节点try{
            curatorClint.setData().forPath("/base/test","hello,word1111".getBytes());// 获取节点数据byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(newString(bytes));System.out.println("更新节点成功!");}catch(Exception e){System.out.println("更新节点失败!"+e.getMessage());}// 先获取节点的版本号Stat stat =newStat();byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");String dataString =newString(data);System.out.println("节点数据:"+ dataString);System.out.println("节点状态:");System.out.println("  数据版本:"+ stat.getVersion());// 指定版本更新节点:CAS 机制try{
            curatorClint.setData().withVersion(stat.getVersion()).forPath("/base/test","hello,word2222".getBytes());// 获取节点数据byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(newString(bytes));System.out.println("更新节点成功!");}catch(Exception e){System.out.println("更新节点失败!"+e.getMessage());}}}
  • 异步创建
packagecom.shu.base;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.zookeeper.CreateMode;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/**
 * @author 31380
 * @description Curator异步操作
 * @create 2024/3/17 11:42
 */publicclassCuratorAyncTest{/**
     * 总结:
     * 1 异步操作:inBackground()
     * 2.创建节点,如果节点已经存在则抛出异常 create().forPath()
     * 3.递归创建节点 creatingParentsIfNeeded()
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{// 地址String connectString ="127.0.0.1:2181";//创建节点CuratorFramework curatorClint =CuratorUtils.createCuratorFramework(connectString,1000,1000);System.out.println("连接成功!");
        curatorClint.start();CountDownLatch cdl =newCountDownLatch(2);ExecutorService executorService =Executors.newFixedThreadPool(2);
        curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event)->{System.out.println("Code:"+ event.getResultCode());System.out.println("Type:"+ event.getType());System.out.println("Path:"+ event.getPath());
            cdl.countDown();}, executorService).forPath("/test1","hello,word".getBytes());

        curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event)->{System.out.println("Code:"+ event.getResultCode());System.out.println("Type:"+ event.getType());System.out.println("Path:"+ event.getPath());
            cdl.countDown();}).forPath("/test2","hello,word".getBytes());
        cdl.await();
        executorService.shutdown();
        curatorClint.close();}/**
     * 事件类型
     * CREATE, // 创建
     * DELETE, // 删除
     * EXISTS, // 存在
     * GET_DATA, // 获取数据
     * SET_DATA, // 设置数据
     * CHILDREN, // 子节点
     * SYNC, // 同步
     * GET_ACL, // 获取ACL
     * SET_ACL, // 设置ACL
     * TRANSACTION, // 事务
     * GET_CONFIG, // 获取配置
     * RECONFIG, // 重新配置
     * WATCHED, // 监听
     * REMOVE_WATCHES, // 移除监听
     * CLOSING; // 关闭
     * @param args
     *//**
     * 响应码
     * OK(0), // OK
     * CONNECTIONLOSS(-4), // 连接丢失
     * MARSHALLINGERROR(-7), // 编组错误
     * UNIMPLEMENTED(-9), // 未实现
     * OPERATIONTIMEOUT(-10), // 操作超时
     * BADARGUMENTS(-8), // 错误参数
     * APIERROR(-100), // API错误
     * NONODE(-101), // 无节点·
     */}
  • 不同的顺序节点
packagecom.shu.base;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.zookeeper.CreateMode;/**
 * @author 31380
 * @description
 * @create 2024/3/17 11:03
 */publicclassCuratorSEQCreat{/**
     * 临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会有序列号。
     * 持久顺序节点(PERSISTENT_SEQUENTIAL):具有持久节点特征,但是它会有序列号。
     * @param args
     */publicstaticvoidmain(String[] args){CuratorFramework curatorClint =CuratorUtils.createCuratorFramework("127.0.0.1:2181",1000,1000);System.out.println("连接成功!");
        curatorClint.start();// 创建一个持久顺序节点A-1,A-2,A-3try{
            curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A","hello,word".getBytes());
            curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A","hello,word".getBytes());
            curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A","hello,word".getBytes());System.out.println("创建节点成功!");}catch(Exception e){System.out.println("创建节点失败!"+e.getMessage());}// 创建一个临时顺序节点B-1,B-2,B-3try{
            curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B","hello,word".getBytes());
            curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B","hello,word".getBytes());
            curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B","hello,word".getBytes());System.out.println("创建节点成功!");}catch(Exception e){System.out.println("创建节点失败!"+e.getMessage());}// 关闭连接
        curatorClint.close();}}
  • 事务
packagecom.shu.base;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.api.transaction.CuratorTransactionResult;importjava.util.Collection;/**
 * @author 31380
 * @description TODO
 * @create 2024/3/16 19:28
 */publicclassCuratorTransactionTest{publicstaticvoidmain(String[] args)throwsException{CuratorFramework curatorClint =CuratorUtils.createCuratorFramework("127.0.0.1:2181",1000,1000);System.out.println("连接成功!");
        curatorClint.start();Collection<CuratorTransactionResult> commit = curatorClint.inTransaction().create().forPath("/xiao","456".getBytes()).and().setData().forPath("/xiao","123".getBytes()).and().commit();for(CuratorTransactionResult result : commit){System.out.println(result.getForPath()+"--->"+ result.getType());}

        curatorClint.close();}}

3.3 ACL权限控制

packagecom.shu.acl;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;/**
 * @author 31380
 * @description
 * @create 2024/3/16 19:56
 */publicclassCuratorAclTest{publicstaticvoidmain(String[] args){CuratorFramework curatorClint =CuratorUtils.createCuratorFramework("127.0.0.1:2181",1000,1000);System.out.println("连接成功!");
        curatorClint.start();// 创建节点,ACL为ip:try{
            curatorClint.create().withACL(CuratorUtils.getAclList()).forPath("/test");System.out.println("创建节点成功!");}catch(Exception e){System.out.println("创建节点失败!"+e.getMessage());}}}/**
 * @description
 * @author 31380
 * @create 2024/3/17 11:12
 * Schema 代表权限控制模式,分别为:
 * ● World 任何人
 * ● Auth 不需要ID
 * ● Digest 用户名和密码方式的认证
 * ● IP Address IP地址方式的认证
 * perms(权限),ZooKeeper支持如下权限
 * ● CREATE: 创建子节点
 * ● READ: 获取子节点与自身节点的数据信息
 * ● WRITE:在Znode节点上写数据
 * ● DELETE:删除子节点
 * ● ADMIN:设置ACL权限
 * ————————————————
 */packagecom.shu.acl;

3.4 分布式锁

packagecom.shu.lock;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importjava.text.SimpleDateFormat;importjava.util.concurrent.CountDownLatch;/**
 * @author 31380
 * @description 分布式锁
 * @create 2024/3/17 13:12
 */publicclassLockTest{/**
     * 分布式锁:InterProcessMutex
     * 1: 获取锁,acquire()
     * 2: 释放锁,release()
     * 3: 创建 InterProcessMutex 对象
     * 4: 调用 acquire() 方法获取锁
     * 5: 业务操作
     * 6: 调用 release() 方法释放锁
     * @param args
     */publicstaticvoidmain(String[] args){CountDownLatch latch =newCountDownLatch(10);String connectString ="127.0.0.1:2181";String lockPath ="/lock";CuratorFramework curatorFramework =CuratorUtils.createCuratorFramework(connectString,1000,1000);
        curatorFramework.start();InterProcessMutex lock =newInterProcessMutex(curatorFramework, lockPath);for(int i =0; i <10; i++){newThread(()->{try{
                    latch.await();
                    lock.acquire();System.out.println(Thread.currentThread().getName()+"获取到锁");// 模拟业务操作,生成订单号Thread.sleep(1000);SimpleDateFormat sdf =newSimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(System.currentTimeMillis());System.out.println("生成的订单号:"+ orderNo);}catch(Exception e){
                    e.printStackTrace();}finally{try{
                        lock.release();}catch(Exception e){
                        e.printStackTrace();}}},"Thread-"+ i).start();
            latch.countDown();}}}

3.5 分布式计数器

packagecom.shu.lock;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.atomic.AtomicValue;importorg.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;/**
 * @author 31380
 * @description 分布式计数器
 * @create 2024/3/17 13:20
 */publicclassRecipeDisAtomicIntTest{/**
     * 分布式计数器:DistributedAtomicInteger
     * 1、创建DistributedAtomicInteger对象
     * 2、调用add方法
     * 3、获取当前值
     *
     * @param args
     */publicstaticvoidmain(String[] args){String connectString ="127.0.0.1:2181";String connectString2 ="127.0.0.1:2182";String connectString3 ="127.0.0.1:2183";CuratorFramework curatorFramework =CuratorUtils.createCuratorFramework(connectString,1000,1000);
        curatorFramework.start();DistributedAtomicInteger atomicInteger =newDistributedAtomicInteger(curatorFramework,"/atomic",null);try{AtomicValue<Integer> added = atomicInteger.add(1);System.out.println("1Result: "+ added.succeeded());// 获取当前值System.out.println("2Result: "+ added.postValue());}catch(Exception e){thrownewRuntimeException(e);}// 客户端2CuratorFramework curatorFramework2 =CuratorUtils.createCuratorFramework(connectString2,1000,1000);
        curatorFramework2.start();DistributedAtomicInteger atomicInteger2 =newDistributedAtomicInteger(curatorFramework2,"/atomic",null);try{AtomicValue<Integer> added = atomicInteger2.add(1);System.out.println("2Result: "+ added.succeeded());// 获取当前值System.out.println("2Result: "+ added.postValue());}catch(Exception e){thrownewRuntimeException(e);}// 客户端3CuratorFramework curatorFramework3 =CuratorUtils.createCuratorFramework(connectString3,1000,1000);
        curatorFramework3.start();DistributedAtomicInteger atomicInteger3 =newDistributedAtomicInteger(curatorFramework3,"/atomic",null);try{AtomicValue<Integer> added = atomicInteger3.add(1);System.out.println("3Result: "+ added.succeeded());// 获取当前值System.out.println("3Result: "+ added.postValue());}catch(Exception e){thrownewRuntimeException(e);}}}

3.6 分布式Barrier

packagecom.shu.lock;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.barriers.DistributedBarrier;/**
 * @author 31380
 * @description
 * 分布式Barrier:分布式 Barrier 是一种常见的同步原语,用于在分布式系统中协调多个进程或线程的执行顺序。
 * 它可以用来实现诸如等待直到所有参与者都准备好,然后一起执行某项任务,或者等待直到某些条件达成后再继续执行的场景。
 * @create 2024/3/17 13:27
 */publicclassCycliBarrierTest{staticDistributedBarrier barrier;publicstaticvoidmain(String[] args){String connectString ="127.0.0.1:2181";String path="/barrier";CuratorFramework curatorFramework =CuratorUtils.createCuratorFramework(connectString,1000,1000);
        curatorFramework.start();// 等待所有的线程到达barrier 10个线程for(int i =0; i <10; i++){newThread(()->{try{
                    barrier =newDistributedBarrier(curatorFramework, path);System.out.println(Thread.currentThread().getName()+"号barrier设置");
                    barrier.setBarrier();
                    barrier.waitOnBarrier();System.out.println("启动...");}catch(Exception e){
                    e.printStackTrace();}},"Thread-"+ i).start();}try{Thread.sleep(2000);
            barrier.removeBarrier();}catch(Exception e){
            e.printStackTrace();}}}

3.7 主从节点选举

packagecom.shu.master;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.leader.LeaderSelector;importorg.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;/**
 * @author 31380
 * @description 主节点选举
 * @create 2024/3/17 13:03
 */publicclassMasterSelectTest{/**
     * 主节点选举:LeaderSelector
     * 1、创建LeaderSelector对象
     * 2、调用start方法
     * 3、添加监听器
     * 4、关闭连接
     * @param args
     */publicstaticvoidmain(String[] args){// 地址String connectString ="127.0.0.1:2181";String connectString2 ="127.0.0.1:2182";String connectString3 ="127.0.0.1:2183";// 创建并连接 CuratorFramework 实例CuratorFramework curatorFramework1 =CuratorUtils.createCuratorFramework(connectString,1000,1000);CuratorFramework curatorFramework2 =CuratorUtils.createCuratorFramework(connectString2,1000,1000);CuratorFramework curatorFramework3 =CuratorUtils.createCuratorFramework(connectString3,1000,1000);
            curatorFramework1.start();
            curatorFramework2.start();
            curatorFramework3.start();// 第一个节点LeaderSelector leaderSelector1 =newLeaderSelector(curatorFramework1,"/master1",newLeaderSelectorListenerAdapter(){@OverridepublicvoidtakeLeadership(CuratorFramework curatorFramework)throwsException{System.out.println("节点1成为master节点");Thread.sleep(10000);System.out.println("节点1完成master操作,释放master权利");}});
            leaderSelector1.autoRequeue();
            leaderSelector1.start();// 第二个节点LeaderSelector leaderSelector2 =newLeaderSelector(curatorFramework2,"/master1",newLeaderSelectorListenerAdapter(){@OverridepublicvoidtakeLeadership(CuratorFramework curatorFramework)throwsException{System.out.println("节点2成为master节点");Thread.sleep(10000);System.out.println("节点2完成master操作,释放master权利");}});
            leaderSelector2.autoRequeue();
            leaderSelector2.start();// 第三个节点LeaderSelector leaderSelector3 =newLeaderSelector(curatorFramework3,"/master1",newLeaderSelectorListenerAdapter(){@OverridepublicvoidtakeLeadership(CuratorFramework curatorFramework)throwsException{System.out.println("节点3成为master节点");Thread.sleep(10000);System.out.println("节点3完成master操作,释放master权利");}});
            leaderSelector3.autoRequeue();
            leaderSelector3.start();try{Thread.sleep(Integer.MAX_VALUE);}catch(InterruptedException e){
                e.printStackTrace();}}}

3.8 NodeCache监听

packagecom.shu.watch;importcom.shu.CuratorUtils;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.NodeCache;importorg.apache.curator.framework.recipes.cache.NodeCacheListener;/**
 * @author 31380
 * @description
 * @create 2024/3/16 19:38
 */publicclassCuratorNodeCacheTest{/**
     * NodeCache:监听节点的新增、修改操作
     * 1、创建NodeCache对象
     * 2、调用start方法
     * 3、添加监听器
     * 4、关闭连接
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{String path ="/test";CuratorFramework curatorClint =CuratorUtils.createCuratorFramework("127.0.0.1:2181",1000,1000);System.out.println("连接成功!");
        curatorClint.start();finalNodeCache nodeCache =newNodeCache(curatorClint, path);
        nodeCache.start();
        nodeCache.getListenable().addListener(newNodeCacheListener(){@OverridepublicvoidnodeChanged()throwsException{System.out.println("监听事件触发");System.out.println("重新获得节点内容为:"+newString(nodeCache.getCurrentData().getData()));}});
        curatorClint.setData().forPath(path,"456".getBytes());
        curatorClint.setData().forPath(path,"789".getBytes());
        curatorClint.setData().forPath(path,"123".getBytes());
        curatorClint.setData().forPath(path,"222".getBytes());
        curatorClint.setData().forPath(path,"333".getBytes());
        curatorClint.setData().forPath(path,"444".getBytes());Thread.sleep(15000);}}

3.9 PathChildrenCache监听

packagecom.shu.watch;importorg.apache.curator.RetryPolicy;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.cache.PathChildrenCache;importorg.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;importorg.apache.curator.framework.recipes.cache.PathChildrenCacheListener;importorg.apache.curator.retry.ExponentialBackoffRetry;importorg.apache.zookeeper.CreateMode;/**
 * @author 31380
 * @description
 * @create 2024/3/16 19:43
 */publicclassCuratorPathChildrenCacheTest{/**
     * PathChildrenCache:监听子节点的新增、修改、删除操作
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{CuratorFramework client =getClient();String parentPath ="/p1";PathChildrenCache pathChildrenCache =newPathChildrenCache(client,parentPath,false);/* * StartMode:初始化方式
         * POST_INITIALIZED_EVENT:异步初始化。初始化后会触发事件
         * NORMAL:异步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         * */
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        pathChildrenCache.getListenable().addListener(newPathChildrenCacheListener(){@OverridepublicvoidchildEvent(CuratorFramework client,PathChildrenCacheEvent event)throwsException{System.out.println("事件类型:"+ event.getType()+";操作节点:"+ event.getData().getPath());switch(event.getType()){caseCHILD_ADDED:System.out.println("新增子节点:"+ event.getData().getPath());break;caseCHILD_UPDATED:System.out.println("更新子节点:"+ event.getData().getPath());break;caseCHILD_REMOVED:System.out.println("删除子节点:"+ event.getData().getPath());break;default:break;}}});String path ="/p1/c1";
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep(1000);// 此处需留意,如果没有现成睡眠则无法触发监听事件
        client.delete().forPath(path);Thread.sleep(15000);}privatestaticCuratorFrameworkgetClient(){RetryPolicy retryPolicy =newExponentialBackoffRetry(1000,3);CuratorFramework client =CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").retryPolicy(retryPolicy).sessionTimeoutMs(6000).connectionTimeoutMs(3000).namespace("demo").build();
        client.start();return client;}}

3.10 TreeCache监听

packagecom.shu.watch;importorg.apache.curator.RetryPolicy;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.cache.TreeCache;importorg.apache.curator.retry.ExponentialBackoffRetry;importorg.apache.zookeeper.CreateMode;/**
 * @author 31380
 * @description
 * @create 2024/3/16 19:44
 */publicclassCuratorWatcher3{privatestaticfinalStringCONNECT_ADDR="127.0.0.1:2181";privatestaticfinalintSESSION_TIMEOUT=5000;publicstaticvoidmain(String[] args)throwsException{RetryPolicy policy =newExponentialBackoffRetry(1000,10);CuratorFramework curator =CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(policy).build();
        curator.start();TreeCache treeCache =newTreeCache(curator,"/treeCache");
        treeCache.start();
        treeCache.getListenable().addListener((curatorFramework, treeCacheEvent)->{switch(treeCacheEvent.getType()){caseNODE_ADDED:System.out.println("NODE_ADDED:路径:"+ treeCacheEvent.getData().getPath()+",数据:"+newString(treeCacheEvent.getData().getData())+",状态:"+ treeCacheEvent.getData().getStat());break;caseNODE_UPDATED:System.out.println("NODE_UPDATED:路径:"+ treeCacheEvent.getData().getPath()+",数据:"+newString(treeCacheEvent.getData().getData())+",状态:"+ treeCacheEvent.getData().getStat());break;caseNODE_REMOVED:System.out.println("NODE_REMOVED:路径:"+ treeCacheEvent.getData().getPath()+",数据:"+newString(treeCacheEvent.getData().getData())+",状态:"+ treeCacheEvent.getData().getStat());break;default:break;}});
        curator.create().forPath("/treeCache","123".getBytes());
        curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1","456".getBytes());
        curator.setData().forPath("/treeCache","789".getBytes());
        curator.setData().forPath("/treeCache/c1","910".getBytes());
        curator.delete().forPath("/treeCache/c1");
        curator.delete().forPath("/treeCache");Thread.sleep(5000);
        curator.close();}}

详细介绍参考书籍《从Paxos到Zookeeper:分布式一致性原理与实践》


本文转载自: https://blog.csdn.net/weixin_44451022/article/details/136870451
版权归原作者 长安不及十里 所有, 如有侵权,请联系我们删除。

“Zookeeper(五)Zokeeper 环境搭建与Curator使用”的评论:

还没有评论