1.Springboot项目中添加zookeeper 已经对应的客户端依赖 ,pom.xml文件如下
<!-- Zookeeper组件 --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.1</version></dependency><!-- 包含Curator组件 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-zookeeper</artifactId><version>6.2.2</version></dependency>
2.application.yml 文件中配置zookeeper连接的相关配置信息
zookeeper:#服务器地址connectString: 127.0.0.1:2181#会话超时时间sessionTimeoutMs:3000#连接超时时间connectionTimeoutMs:60000#最大重试次数maxRetries:3#初始休眠时间baseSleepTimeMs:1000
3.java配置的方式添加zookeeper相关的配置
packagecom.jinyi.up.zk.config;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.RetryPolicy;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importorg.apache.curator.retry.ExponentialBackoffRetry;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.annotation.PostConstruct;/**
* @author huangchong
* @date 2024/3/5 20:48
* @desc
*/@Slf4j@ConfigurationpublicclassZookeeperConfig{@Value("${zookeeper.connectString}")privateString connectString;@Value("${zookeeper.baseSleepTimeMs}")privateint baseSleepTimeMs;@Value("${zookeeper.maxRetries}")privateint maxRetries ;@Value("${zookeeper.connectionTimeoutMs}")int connectionTimeoutMs ;@Value("${zookeeper.sessionTimeoutMs}")int sessionTimeoutMs ;privatestaticCuratorFramework client =null;/**
* 初始化
*/@PostConstructpublicvoid init (){// 重试策略RetryPolicy policy =newExponentialBackoffRetry(baseSleepTimeMs, maxRetries);//通过工厂创建Curator
client =CuratorFrameworkFactory.builder().connectString(connectString).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(policy).build();//开启连接
client.start();
log.info("zookeeper 初始化完成...");}@BeanpublicCuratorFramework getClient (){return client ;}/**
* 分布式锁bean 注入spring管理中
*/@BeanpublicInterProcessMutexdistributedLock()throwsException{//使用了Curator提供的InterProcessMutex来创建一个分布式锁。我们使用ZooKeeper的路径/lock来表示锁的资源。InterProcessMutex distributedLock =newInterProcessMutex(client,"/lock");return distributedLock;}}
4.Zookeeper基础操作服务和分布式锁服务编码
packagecom.jinyi.up.client.service;importcom.jinyi.up.zk.process.AbstractListenerProcess;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.*;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.data.Stat;importorg.springframework.stereotype.Service;importjavax.annotation.Resource;importjava.nio.charset.StandardCharsets;/**
* @author huangchong
* @date 2024/3/5 21:39
* @desc
*/@Slf4j@ServicepublicclassZookeeperService{@ResourceprivateCuratorFramework client;/**
* 查询节点数据
*
* @param nodePath 节点
* @return {@link String}
*/publicStringqueryData(String nodePath){try{Stat stat = client.checkExists().forPath(nodePath);if(stat !=null){byte[] bytes = client.getData().forPath(nodePath);returnnewString(bytes,StandardCharsets.UTF_8);}returnnull;}catch(Exception e){
log.error("查询节点数据失败:", e);returnnull;}}/**
* 创建节点
*
* @param mode 节点类型
* @param nodePath 节点路径
* @param nodeData 节点数据
* @return {@link String}
*/publicStringcreate(CreateMode mode,String nodePath,String nodeData){try{Stat stat = client.checkExists().forPath(nodePath);if(stat ==null){return client.create().withMode(mode).forPath(nodePath, nodeData.getBytes());}else{returnnull;}}catch(Exception e){
log.error("创建节点失败:", e);returnnull;}}/**
* 更新节点数据
*
* @param nodePath 节点路径
* @param nodeData 节点数据
* @return {@link Stat}
*/publicbooleanupdate(String nodePath,String nodeData){try{Stat stat = client.checkExists().forPath(nodePath);if(stat !=null){
stat = client.setData().forPath(nodePath, nodeData.getBytes());}return stat !=null;}catch(Exception e){
log.error("更新节点失败:", e);returnfalse;}}/**
* 删除节点
*
* @param nodePath v
* @return {@link boolean}
*/publicbooleandelete(String nodePath){try{Stat stat = client.checkExists().forPath(nodePath);if(stat !=null){
client.delete().forPath(nodePath);}returntrue;}catch(Exception e){
log.error("删除节点失败:", e);returnfalse;}}/**
* 监听一个节点
*
* @param nodePath 被监听节点路径
* @return {@link }
*/publicbooleanaddWatchNodeListener(String nodePath){CuratorCache curatorCache =CuratorCache.builder(client, nodePath).build();CuratorCacheListener listener =CuratorCacheListener.builder().forNodeCache(newNodeCacheListener(){@OverridepublicvoidnodeChanged()throwsException{
log.info("监听到节点变动");//TODO}}).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();returntrue;}/**
* 监听子孙节点 支持子节点的子节点监听
* TreeCache监听节点自己和所有子节点们
*
* @param nodePath 被监听节点路径
* @param processer 监听后处理
* @return {@link }
*/publicbooleanaddWatchTreeListener(String nodePath,AbstractListenerProcess processer){CuratorCache curatorCache =CuratorCache.builder(client, nodePath).build();CuratorCacheListener listener =CuratorCacheListener.builder().forTreeCache(client,newTreeCacheListener(){@OverridepublicvoidchildEvent(CuratorFramework curatorFramework,TreeCacheEvent treeCacheEvent){
log.info("监听到子节点变动,变动类型:{}", treeCacheEvent.getType().toString());
processer.process(curatorFramework, treeCacheEvent);}}).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();returntrue;}}
packagecom.jinyi.up.zk.service;importcom.jinyi.up.zk.process.AbstractListenerProcess;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.*;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.data.Stat;importorg.springframework.stereotype.Service;importjavax.annotation.Resource;importjava.nio.charset.StandardCharsets;/**
* @author huangchong
* @date 2024/3/5 21:39
* @desc
*/@Slf4j@ServicepublicclassZookeeperLockService{@ResourceprivateInterProcessMutex distributedLock;publicvoiddoProtectedOperation()throwsException{//acquire()方法获取锁
distributedLock.acquire();try{// 执行需要保护的代码块}finally{
distributedLock.release();}}}
5.watcher机制事件处理抽象封装
packagecom.jinyi.up.zk.process;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;importorg.apache.curator.framework.recipes.cache.TreeCacheEvent;/**
* @author huangchong
* @date 2024/3/5 21:58
* @desc
*/publicabstractclassAbstractListenerProcess{/**
* 处理监听节点自己和所有子节点们变更事件
*
* @param client zk客户端
* @param event 子节点事件
* @return {@link }
*/publicabstractvoidprocess(CuratorFramework client,TreeCacheEvent event);}
packagecom.jinyi.up.zk.process;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.cache.TreeCacheEvent;/**
* @author huangchong
* @date 2024/3/5 21:58
* @desc
*/@Slf4jpublicclassWatcherTreeListenerProcessextendsAbstractListenerProcess{/**
* 实际处理监听节点自己和所有子节点们变更事件
*
* @param client zk客户端
* @param event 子节点事件
* @return {@link }
*/@Overridepublicvoidprocess(CuratorFramework client,TreeCacheEvent event){//事件pathString path = event.getData().getPath();switch(event.getType()){caseNODE_ADDED:
log.info("新增子节点:"+ path);break;caseNODE_UPDATED:
log.info("更新子节点:"+ path);break;caseNODE_REMOVED:
log.info("删除子节点:"+ path);break;default:break;}}}
6.基本操作的单元测试代码
packagecom.jinyi.zookeeper;importcom.jinyi.up.zk.ZookeeperApplication;importorg.junit.runner.RunWith;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)///此处classes内的内容是@SpringBootApplication入口@SpringBootTest(classes ={ZookeeperApplication.class})publicabstractclassBaseZkBootTest{}
packagecom.jinyi.zookeeper;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.data.Stat;importorg.junit.Test;importjavax.annotation.Resource;importjava.nio.charset.StandardCharsets;/**
* @author huangchong
* @date 2024/3/5 21:00
* @desc
*/@Slf4jpublicclassZookeeperBaseTestextendsBaseZkBootTest{@ResourceprivateCuratorFramework client;@TestpublicvoidtestAddPersistentNode()throwsException{// 创建一个持久化节点/persistent_node,断开连接时不会自动删除
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent_node");}@TestpublicvoidtestZnodeExists()throwsException{// 判断节点是否存在,persistent_node2不存在所以stat2是nullStat stat = client.checkExists().forPath("/persistent_node");
log.info(String.valueOf(stat));Stat stat2 = client.checkExists().forPath("/persistent_node2");
log.info(String.valueOf(stat2));}@TestpublicvoidtestSetData()throwsException{// 设置节点数据
client.setData().forPath("/persistent_node","persistent_node_data".getBytes(StandardCharsets.UTF_8));}@TestpublicvoidtestCreateAndSet()throwsException{// 创建一个持久化节点并设置节点数据
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("//persistent_node1","persistent_node_data1".getBytes(StandardCharsets.UTF_8));}@TestpublicvoidtestGetData()throwsException{// 查询节点数据byte[] data = client.getData().forPath("/persistent_node1");
log.info(newString(data,StandardCharsets.UTF_8));}@TestpublicvoidtestDelete()throwsException{// 删除节点
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent_node1");}@TestpublicvoidtestReadLock()throwsException{// 读写锁-读InterProcessReadWriteLock lock =newInterProcessReadWriteLock(client,"/lock-read");
lock.readLock().acquire();
log.info("获取-ReadLock");
lock.readLock().release();}@TestpublicvoidtestWriteLock()throwsException{// 读写锁-写InterProcessReadWriteLock lock =newInterProcessReadWriteLock(client,"/lock-write");
lock.writeLock().acquire();
log.info("获取-WriteLock");
lock.writeLock().release();}}
版权归原作者 哈喽,树先生 所有, 如有侵权,请联系我们删除。