文章目录
问题背景
研究分布式锁,基于ZK实现,需要整合到SpringBoot使用
前言
- 参考自SpringBoot集成Curator实现Zookeeper基本操作,Zookeeper入门
- 本篇的代码笔者有自己运行过,需要注意组件的版本号是否兼容,否则会有比较多的坑
实现
搭建Zookeeper容器
采用Docker compose快速搭建ZK容器,很快,几分钟就好了,而且是集群方式搭建。详情见笔者的Docker搭建zookeeper
引入依赖
需要注意的点:
Curator 2.x.x-
兼容两个
zk 3.4.x
和
zk 3.5.x
,
Curator 3.x.x
-兼容兼容
zk 3.5
,根据搭建的zk的版本使用对应的curator依赖。引入的zk依赖,如果项目中有使用
logback
日志 ,需要排除zk中的
log4j12
依赖,详情见下面笔者给出的依赖:
<dependencies><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency>
ZK客户端的配置类
配置ZK的参数,使用
@ConfigurationProperties
可以令配置热更新,比如搭配Apollo、Nacos,如果使用
@Valid
则无法热更新,必须重启项目才能生效
@Component@ConfigurationProperties(prefix ="curator")@DatapublicclassZKClientProps{privateString connectString;privateint retryCount;privateint elapsedTimeMs;privateint sessionTimeoutMs;privateint connectionTimeoutMs;}
对应yml如下:
#curator配置curator:connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183# zookeeper 地址retryCount:1# 重试次数elapsedTimeMs:2000# 重试间隔时间sessionTimeoutMs:60000# session超时时间connectionTimeoutMs:10000# 连接超时时间
ZK客户端的工厂类
定制ZK客户端:
@ComponentpublicclassZKClientFactory{@ResourceprivateZKClientProps zkClientProps;publicCuratorFrameworkcreateSimple(){//重试策略:第一次重试等待1S,第二次重试等待2S,第三次重试等待4s//第一个参数:等待时间的基础单位,单位为毫秒//第二个参数:最大重试次数ExponentialBackoffRetry retry =newExponentialBackoffRetry(zkClientProps.getElapsedTimeMs(), zkClientProps.getRetryCount());//获取CuratorFramework示例的最简单方式//第一个参数:zk的连接地址//第二个参数:重试策略returnCuratorFrameworkFactory.newClient(zkClientProps.getConnectString(), retry);}publicstaticCuratorFrameworkcreateWithOptions(String connectionString,RetryPolicy retryPolicy,int connectionTimeoutMs,int sessionTimeoutMs){returnCuratorFrameworkFactory.builder().connectString(connectionString).retryPolicy(retryPolicy).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).build();}}
注入bean
创建ZK的客户端,详情如下:
@Component@Slf4jpublicclassZKClient{@ResourceprivateZKClientFactory zkClientFactory;publicstaticfinalZKClientINSTANCE=newZKClient();privateZKClient(){}publicCuratorFrameworkgetClient(){return zkClientFactory.createSimple();}publicbooleanisNodeExist(String path){CuratorFramework client =getClient();try{
client.start();Stat stat = client.checkExists().forPath(path);return stat !=null;}catch(Exception e){
e.printStackTrace();}finally{CloseableUtils.closeQuietly(client);}returnfalse;}publicvoidcreateNode(String path,byte[] bytes){CuratorFramework client =getClient();try{// 必须start,否则报错
client.start();
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, bytes);}catch(Exception e){
e.printStackTrace();}finally{CloseableUtils.closeQuietly(client);}}publicvoiddeleteNode(String path){CuratorFramework client =getClient();try{
client.start();
client.delete().forPath(path);}catch(Exception e){
e.printStackTrace();}finally{CloseableUtils.closeQuietly(client);}}publicList<String>getChildren(String path){List<String> result =newLinkedList<>();CuratorFramework client =getClient();try{
client.start();
result = client.getChildren().forPath(path);}catch(Exception e){
log.error("ZKClient getChildren error.");}return result;}}
构建测试类
测试基类,设置激活环境
@Slf4j@ActiveProfiles("test")@RunWith(SpringRunner.class)@SpringBootTest(classes =GmallZookeeperApplication.class)@ContextConfigurationpublicclassBaseTest{}
创建节点、删除节点、获取节点信息、分布式锁的方法如下:
@ActiveProfiles("company")
是激活笔者一个
application-company.yml
文件
application.yml如下:
server:port:8022spring:profiles:active: home
application-compay.yml如下:
#curator配置
curator:
connectString:192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
retryCount:1 # 重试次数
elapsedTimeMs:2000 # 重试间隔时间
sessionTimeoutMs:60000 # session超时时间
connectionTimeoutMs:10000 # 连接超时时间
创建节点、删除节点、获取节点信息、分布式锁的方法如下:
@Slf4j@ActiveProfiles("company")publicclassZKClientTestextendsBaseTest{@ResourceprivateZKClient zkClient;publicstaticfinalintTHREAD_NUM=10;@TestpublicvoiddistributedLock()throwsInterruptedException,BrokenBarrierException{String lockPath ="/test/distributed2/lock";CuratorFramework client = zkClient.getClient();
client.start();InterProcessMutex lock =newInterProcessMutex(client, lockPath);// 阻塞主线程,等待全部子线程执行完CyclicBarrier cyclicBarrier =newCyclicBarrier(THREAD_NUM);for(int i =0; i <THREAD_NUM; i++){newThread(()->{
log.info("{}->尝试竞争锁",Thread.currentThread().getName());try{
lock.acquire();// 阻塞竞争锁
log.info("{}->成功获得锁",Thread.currentThread().getName());Thread.sleep(2000);
cyclicBarrier.await();}catch(Exception e){
e.printStackTrace();}finally{try{
lock.release();//释放锁}catch(Exception e){
e.printStackTrace();}}},"Thread-"+ i).start();}// 目的是为了等子线程抢完锁再结束子线程,否则无法看到日志效果
cyclicBarrier.await();
log.info("全部子线程已执行完毕");}@TestpublicvoidcreateNode(){// 创建一个ZNode节点String data ="hello";byte[] payload = data.getBytes(StandardCharsets.UTF_8);String zkPath ="/test/CRUD/node-1";
zkClient.createNode(zkPath, payload);
log.info("createNode succeeded!");}@TestpublicvoidgetChildren(){String zkPath ="/test/CRUD";List<String> children = zkClient.getChildren(zkPath);printList(children);}@TestpublicvoiddeleteNode(){String parentPath ="/test";
log.info("======================Before delete===================");List<String> before = zkClient.getChildren(parentPath);printList(before);String zkPath ="/test/CRUD/node-1";
zkClient.deleteNode(zkPath);
log.info("delete node secceeded!");
log.info("======================After delete===================");List<String> after = zkClient.getChildren(parentPath);printList(after);}privatevoidprintList(List<String> data){if(!CollectionUtils.isEmpty(data)){for(String datum : data){
log.info("datum:{}", data);}}}}
版权归原作者 Android_la 所有, 如有侵权,请联系我们删除。