0


2.Zookeeper集成springboot操作节点,事件监听,分布式锁实现

1.Springboot项目中添加zookeeper 已经对应的客户端依赖 ,pom.xml文件如下

<!-- Zookeeper组件 --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.1</version></dependency><!-- 包含Curator组件 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-zookeeper</artifactId><version>6.2.2</version></dependency>

2.application.yml 文件中配置zookeeper连接的相关配置信息

zookeeper:#服务器地址connectString: 127.0.0.1:2181#会话超时时间sessionTimeoutMs:3000#连接超时时间connectionTimeoutMs:60000#最大重试次数maxRetries:3#初始休眠时间baseSleepTimeMs:1000

3.java配置的方式添加zookeeper相关的配置

packagecom.jinyi.up.zk.config;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.RetryPolicy;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importorg.apache.curator.retry.ExponentialBackoffRetry;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.annotation.PostConstruct;/**
 * @author huangchong
 * @date 2024/3/5 20:48
 * @desc
 */@Slf4j@ConfigurationpublicclassZookeeperConfig{@Value("${zookeeper.connectString}")privateString connectString;@Value("${zookeeper.baseSleepTimeMs}")privateint baseSleepTimeMs;@Value("${zookeeper.maxRetries}")privateint maxRetries ;@Value("${zookeeper.connectionTimeoutMs}")int connectionTimeoutMs ;@Value("${zookeeper.sessionTimeoutMs}")int sessionTimeoutMs ;privatestaticCuratorFramework client =null;/**
     * 初始化
     */@PostConstructpublicvoid init (){// 重试策略RetryPolicy policy =newExponentialBackoffRetry(baseSleepTimeMs, maxRetries);//通过工厂创建Curator
        client =CuratorFrameworkFactory.builder().connectString(connectString).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(policy).build();//开启连接
        client.start();
        log.info("zookeeper 初始化完成...");}@BeanpublicCuratorFramework getClient (){return client ;}/**
     * 分布式锁bean 注入spring管理中
     */@BeanpublicInterProcessMutexdistributedLock()throwsException{//使用了Curator提供的InterProcessMutex来创建一个分布式锁。我们使用ZooKeeper的路径/lock来表示锁的资源。InterProcessMutex distributedLock =newInterProcessMutex(client,"/lock");return distributedLock;}}

4.Zookeeper基础操作服务和分布式锁服务编码

packagecom.jinyi.up.client.service;importcom.jinyi.up.zk.process.AbstractListenerProcess;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.*;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.data.Stat;importorg.springframework.stereotype.Service;importjavax.annotation.Resource;importjava.nio.charset.StandardCharsets;/**
 * @author huangchong
 * @date 2024/3/5 21:39
 * @desc
 */@Slf4j@ServicepublicclassZookeeperService{@ResourceprivateCuratorFramework client;/**
     * 查询节点数据
     *
     * @param nodePath 节点
     * @return {@link String}
     */publicStringqueryData(String nodePath){try{Stat stat = client.checkExists().forPath(nodePath);if(stat !=null){byte[] bytes = client.getData().forPath(nodePath);returnnewString(bytes,StandardCharsets.UTF_8);}returnnull;}catch(Exception e){
            log.error("查询节点数据失败:", e);returnnull;}}/**
     * 创建节点
     *
     * @param mode     节点类型
     * @param nodePath 节点路径
     * @param nodeData 节点数据
     * @return {@link String}
     */publicStringcreate(CreateMode mode,String nodePath,String nodeData){try{Stat stat = client.checkExists().forPath(nodePath);if(stat ==null){return client.create().withMode(mode).forPath(nodePath, nodeData.getBytes());}else{returnnull;}}catch(Exception e){
            log.error("创建节点失败:", e);returnnull;}}/**
     * 更新节点数据
     *
     * @param nodePath 节点路径
     * @param nodeData 节点数据
     * @return {@link Stat}
     */publicbooleanupdate(String nodePath,String nodeData){try{Stat stat = client.checkExists().forPath(nodePath);if(stat !=null){
                stat = client.setData().forPath(nodePath, nodeData.getBytes());}return stat !=null;}catch(Exception e){
            log.error("更新节点失败:", e);returnfalse;}}/**
     * 删除节点
     *
     * @param nodePath v
     * @return {@link boolean}
     */publicbooleandelete(String nodePath){try{Stat stat = client.checkExists().forPath(nodePath);if(stat !=null){
                client.delete().forPath(nodePath);}returntrue;}catch(Exception e){
            log.error("删除节点失败:", e);returnfalse;}}/**
     * 监听一个节点
     *
     * @param nodePath 被监听节点路径
     * @return {@link }
     */publicbooleanaddWatchNodeListener(String nodePath){CuratorCache curatorCache =CuratorCache.builder(client, nodePath).build();CuratorCacheListener listener =CuratorCacheListener.builder().forNodeCache(newNodeCacheListener(){@OverridepublicvoidnodeChanged()throwsException{
                        log.info("监听到节点变动");//TODO}}).build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();returntrue;}/**
     * 监听子孙节点 支持子节点的子节点监听
     * TreeCache监听节点自己和所有子节点们
     *
     * @param nodePath  被监听节点路径
     * @param processer 监听后处理
     * @return {@link }
     */publicbooleanaddWatchTreeListener(String nodePath,AbstractListenerProcess processer){CuratorCache curatorCache =CuratorCache.builder(client, nodePath).build();CuratorCacheListener listener =CuratorCacheListener.builder().forTreeCache(client,newTreeCacheListener(){@OverridepublicvoidchildEvent(CuratorFramework curatorFramework,TreeCacheEvent treeCacheEvent){
                        log.info("监听到子节点变动,变动类型:{}", treeCacheEvent.getType().toString());
                        processer.process(curatorFramework, treeCacheEvent);}}).build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();returntrue;}}
packagecom.jinyi.up.zk.service;importcom.jinyi.up.zk.process.AbstractListenerProcess;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.*;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.data.Stat;importorg.springframework.stereotype.Service;importjavax.annotation.Resource;importjava.nio.charset.StandardCharsets;/**
 * @author huangchong
 * @date 2024/3/5 21:39
 * @desc
 */@Slf4j@ServicepublicclassZookeeperLockService{@ResourceprivateInterProcessMutex distributedLock;publicvoiddoProtectedOperation()throwsException{//acquire()方法获取锁
        distributedLock.acquire();try{// 执行需要保护的代码块}finally{
            distributedLock.release();}}}

5.watcher机制事件处理抽象封装

packagecom.jinyi.up.zk.process;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;importorg.apache.curator.framework.recipes.cache.TreeCacheEvent;/**
 * @author huangchong
 * @date 2024/3/5 21:58
 * @desc
 */publicabstractclassAbstractListenerProcess{/**
     * 处理监听节点自己和所有子节点们变更事件
     *
     * @param client       zk客户端
     * @param event 子节点事件
     * @return {@link }
     */publicabstractvoidprocess(CuratorFramework client,TreeCacheEvent event);}
packagecom.jinyi.up.zk.process;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.TreeCacheEvent;/**
 * @author huangchong
 * @date 2024/3/5 21:58
 * @desc
 */@Slf4jpublicclassWatcherTreeListenerProcessextendsAbstractListenerProcess{/**
     * 实际处理监听节点自己和所有子节点们变更事件
     *
     * @param client zk客户端
     * @param event 子节点事件
     * @return {@link }
     */@Overridepublicvoidprocess(CuratorFramework client,TreeCacheEvent event){//事件pathString path = event.getData().getPath();switch(event.getType()){caseNODE_ADDED:
                log.info("新增子节点:"+ path);break;caseNODE_UPDATED:
                log.info("更新子节点:"+ path);break;caseNODE_REMOVED:
                log.info("删除子节点:"+ path);break;default:break;}}}

6.基本操作的单元测试代码

packagecom.jinyi.zookeeper;importcom.jinyi.up.zk.ZookeeperApplication;importorg.junit.runner.RunWith;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)///此处classes内的内容是@SpringBootApplication入口@SpringBootTest(classes ={ZookeeperApplication.class})publicabstractclassBaseZkBootTest{}
packagecom.jinyi.zookeeper;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.data.Stat;importorg.junit.Test;importjavax.annotation.Resource;importjava.nio.charset.StandardCharsets;/**
 * @author huangchong
 * @date 2024/3/5 21:00
 * @desc
 */@Slf4jpublicclassZookeeperBaseTestextendsBaseZkBootTest{@ResourceprivateCuratorFramework client;@TestpublicvoidtestAddPersistentNode()throwsException{// 创建一个持久化节点/persistent_node,断开连接时不会自动删除
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent_node");}@TestpublicvoidtestZnodeExists()throwsException{// 判断节点是否存在,persistent_node2不存在所以stat2是nullStat stat = client.checkExists().forPath("/persistent_node");
        log.info(String.valueOf(stat));Stat stat2 = client.checkExists().forPath("/persistent_node2");
        log.info(String.valueOf(stat2));}@TestpublicvoidtestSetData()throwsException{// 设置节点数据
        client.setData().forPath("/persistent_node","persistent_node_data".getBytes(StandardCharsets.UTF_8));}@TestpublicvoidtestCreateAndSet()throwsException{// 创建一个持久化节点并设置节点数据
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("//persistent_node1","persistent_node_data1".getBytes(StandardCharsets.UTF_8));}@TestpublicvoidtestGetData()throwsException{// 查询节点数据byte[] data = client.getData().forPath("/persistent_node1");
        log.info(newString(data,StandardCharsets.UTF_8));}@TestpublicvoidtestDelete()throwsException{// 删除节点
        client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent_node1");}@TestpublicvoidtestReadLock()throwsException{// 读写锁-读InterProcessReadWriteLock lock =newInterProcessReadWriteLock(client,"/lock-read");
        lock.readLock().acquire();
        log.info("获取-ReadLock");
        lock.readLock().release();}@TestpublicvoidtestWriteLock()throwsException{// 读写锁-写InterProcessReadWriteLock lock =newInterProcessReadWriteLock(client,"/lock-write");
        lock.writeLock().acquire();
        log.info("获取-WriteLock");
        lock.writeLock().release();}}

本文转载自: https://blog.csdn.net/huangchong0107/article/details/136492414
版权归原作者 哈喽,树先生 所有, 如有侵权,请联系我们删除。

“2.Zookeeper集成springboot操作节点,事件监听,分布式锁实现”的评论:

还没有评论