文章目录
前言
Curator是netflix公司开源的一套
zookeeper
客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。
Curator主要解决了三类问题:
- 封装ZooKeeper client与ZooKeeper server之间的连接处理
- 提供了一套Fluent风格的操作API
- 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,这些实现都遵循了zk的最佳实践,并考虑了各种极端情况
Curator由一系列的模块构成,对于一般开发者而言,常用的是
curator-framework
和
curator-recipes
:
curator-framework
:提供了常见的zk相关的底层操作curator-recipes
:提供了一些zk的典型使用场景的参考。本节重点关注的分布式锁就是该包提供的
代码实践
curator 4.3.0
支持
zookeeper 3.4.x
和
3.5
,但是需要注意
curator
传递进来的依赖,需要和实际服务器端使用的版本相符,以使用
zookeeper 3.4.14
为例。
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.14</version></dependency>
1. 配置
添加
curator
客户端配置:
@ConfigurationpublicclassCuratorConfig{@BeanpublicCuratorFrameworkcuratorFramework(){// 重试策略,这里使用的是指数补偿重试策略,重试3次,初始重试间隔1000ms,每次重试之后重试间隔递增。RetryPolicy retry =newExponentialBackoffRetry(1000,3);// 初始化Curator客户端:指定链接信息 及 重试策略CuratorFramework client =CuratorFrameworkFactory.newClient("192.168.1.111:2181", retry);
client.start();// 开始链接,如果不调用该方法,很多方法无法工作return client;}}
2. 可重入锁InterProcessMutex
Reentrant
和
JDK
的
ReentrantLock
类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类
InterProcessMutex
来实现。
// 常用构造方法publicInterProcessMutex(CuratorFramework client,String path)// 获取锁publicvoidacquire();// 带超时时间的可重入锁publicbooleanacquire(long time,TimeUnit unit);// 释放锁publicvoidrelease();
测试方法:
@AutowiredprivateCuratorFramework curatorFramework;publicvoidcheckAndLock(){InterProcessMutex mutex =newInterProcessMutex(curatorFramework,"/curator/lock");try{// 加锁
mutex.acquire();// 处理业务// 例如查询库存 扣减库存// this.testSub(mutex); 如想重入,则需要使用同一个InterProcessMutex对象// 释放锁
mutex.release();}catch(Exception e){
e.printStackTrace();}}publicvoidtestSub(InterProcessMutex mutex){try{
mutex.acquire();System.out.println("测试可重入锁。。。。");
mutex.release();}catch(Exception e){
e.printStackTrace();}}
注意:如想重入,则需要使用同一个InterProcessMutex对象。
3. 不可重入锁InterProcessSemaphoreMutex
具体实现:
InterProcessSemaphoreMutex
与
InterProcessMutex
调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入。
publicInterProcessSemaphoreMutex(CuratorFramework client,String path);publicvoidacquire();publicbooleanacquire(long time,TimeUnit unit);publicvoidrelease();
案例:
@AutowiredprivateCuratorFramework curatorFramework;publicvoiddeduct(){InterProcessSemaphoreMutex mutex =newInterProcessSemaphoreMutex(curatorFramework,"/curator/lock");try{
mutex.acquire();// 处理业务// 例如查询库存 扣减库存}catch(Exception e){
e.printStackTrace();}finally{try{
mutex.release();}catch(Exception e){
e.printStackTrace();}}}
4. 可重入读写锁InterProcessReadWriteLock
类似
JDK
的
ReentrantReadWriteLock
。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁。从读锁升级成写锁是不成的。主要实现类
InterProcessReadWriteLock
:
// 构造方法publicInterProcessReadWriteLock(CuratorFramework client,String basePath);// 获取读锁对象InterProcessMutexreadLock();// 获取写锁对象InterProcessMutexwriteLock();
注意:写锁在释放之前会一直阻塞请求线程,而读锁不会
publicvoidtestZkReadLock(){try{InterProcessReadWriteLock rwlock =newInterProcessReadWriteLock(curatorFramework,"/curator/rwlock");
rwlock.readLock().acquire(10,TimeUnit.SECONDS);// TODO:一顿读的操作。。。。//rwlock.readLock().unlock();}catch(Exception e){
e.printStackTrace();}}publicvoidtestZkWriteLock(){try{InterProcessReadWriteLock rwlock =newInterProcessReadWriteLock(curatorFramework,"/curator/rwlock");
rwlock.writeLock().acquire(10,TimeUnit.SECONDS);// TODO:一顿写的操作。。。。//rwlock.writeLock().unlock();}catch(Exception e){
e.printStackTrace();}}
5. 联锁InterProcessMultiLock
Multi Shared Lock
是一个锁的容器。当调用
acquire
, 所有的锁都会被
acquire
,如果请求失败,所有的锁都会被
release
。同样调用
release
时所有的锁都被
release
(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。实现类
InterProcessMultiLock
:
// 构造函数需要包含的锁的集合,或者一组ZooKeeper的pathpublicInterProcessMultiLock(List<InterProcessLock> locks);publicInterProcessMultiLock(CuratorFramework client,List<String> paths);// 获取锁publicvoidacquire();publicbooleanacquire(long time,TimeUnit unit);// 释放锁publicsynchronizedvoidrelease();
6. 信号量InterProcessSemaphoreV2
一个计数的信号量类似
JDK
的
Semaphore
。
JDK
中
Semaphore
维护的一组许可(
permits
),而
Cubator
中称之为租约(
Lease
)。注意,所有的实例必须使用相同的
numberOfLeases
值。调用
acquire
会返回一个租约对象。客户端必须在
finally
中
close
这些租约对象,否则这些租约会丢失掉。但是,如果客户端
session
由于某种原因比如
crash
丢掉, 那么这些客户端持有的租约会自动
close
, 这样其它客户端可以继续使用这些租约。主要实现类
InterProcessSemaphoreV2
:
// 构造方法publicInterProcessSemaphoreV2(CuratorFramework client,String path,int maxLeases);// 注意一次你可以请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。// 同时还提供了超时的重载方法publicLeaseacquire();publicCollection<Lease>acquire(int qty);publicLeaseacquire(long time,TimeUnit unit);publicCollection<Lease>acquire(int qty,long time,TimeUnit unit)// 租约还可以通过下面的方式返还publicvoidreturnAll(Collection<Lease> leases);publicvoidreturnLease(Lease lease);
案例代码:
StockController中添加方法:
@GetMapping("test/semaphore")publicStringtestSemaphore(){this.stockService.testSemaphore();return"hello Semaphore";}
StockService中添加方法:
publicvoidtestSemaphore(){// 设置资源量 限流的线程数InterProcessSemaphoreV2 semaphoreV2 =newInterProcessSemaphoreV2(curatorFramework,"/locks/semaphore",5);try{Lease acquire = semaphoreV2.acquire();// 获取资源,获取资源成功的线程可以继续处理业务操作。否则会被阻塞住this.redisTemplate.opsForList().rightPush("log","10010获取了资源,开始处理业务逻辑。"+Thread.currentThread().getName());TimeUnit.SECONDS.sleep(10+newRandom().nextInt(10));this.redisTemplate.opsForList().rightPush("log","10010处理完业务逻辑,释放资源====================="+Thread.currentThread().getName());
semaphoreV2.returnLease(acquire);// 手动释放资源,后续请求线程就可以获取该资源}catch(Exception e){
e.printStackTrace();}}
7. 栅栏barrier
DistributedBarrier
构造函数中barrierPath
参数用来确定一个栅栏,只要barrierPath
参数相同(路径相同)就是同一个栅栏。通常情况下栅栏的使用如下:1. 主client
设置一个栅栏2. 其他客户端就会调用waitOnBarrier()
等待栅栏移除,程序处理线程阻塞3. 主client
移除栅栏,其他客户端的处理程序就会同时继续运行。
DistributedBarrier
类的主要方法如下:
setBarrier()- 设置栅栏
waitOnBarrier()- 等待栅栏移除
removeBarrier()- 移除栅栏
DistributedDoubleBarrier
双栅栏,允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算,当计算完成时,离开栅栏。DistributedDoubleBarrier
实现了双栅栏的功能。构造函数如下:// client - the client// barrierPath - path to use// memberQty - the number of members in the barrierpublicDistributedDoubleBarrier(CuratorFramework client,String barrierPath,int memberQty);enter()、enter(long maxWait,TimeUnit unit)- 等待同时进入栅栏leave()、leave(long maxWait,TimeUnit unit)- 等待同时离开栅栏
memberQty
是成员数量,当
enter
方法被调用时,成员被阻塞,直到所有的成员都调用了
enter
。当
leave
方法被调用时,它也阻塞调用线程,直到所有的成员都调用了
leave
。
注意:参数
memberQty
的值只是一个阈值,而不是一个限制值。当等待栅栏的数量大于或等于这个值栅栏就会打开!
与栅栏(
DistributedBarrier
)一样,双栅栏的
barrierPath
参数也是用来确定是否是同一个栅栏的,双栅栏的使用情况如下:
- 从多个客户端在同一个路径上创建双栅栏(
DistributedDoubleBarrier
),然后调用enter()
方法,等待栅栏数量达到memberQty
时就可以进入栅栏。 - 栅栏数量达到
memberQty
,多个客户端同时停止阻塞继续运行,直到执行leave()
方法,等待memberQty
个数量的栅栏同时阻塞到leave()
方法中。 memberQty
个数量的栅栏同时阻塞到leave()
方法中,多个客户端的leave()
方法停止阻塞,继续运行。
8. 共享计数器
利用
ZooKeeper
可以实现一个集群共享的计数器。只要使用相同的
path
就可以得到最新的计数器值, 这是由
ZooKeeper
的一致性保证的。
Curator
有两个计数器, 一个是用
int
来计数,一个用
long
来计数。
8.1. SharedCount
共享计数器
SharedCount
相关方法如下:
// 构造方法publicSharedCount(CuratorFramework client,String path,int seedValue);// 获取共享计数的值publicintgetCount();// 设置共享计数的值publicvoidsetCount(int newCount)throwsException;// 当版本号没有变化时,才会更新共享变量的值publicbooleantrySetCount(VersionedValue<Integer> previous,int newCount);// 通过监听器监听共享计数的变化publicvoidaddListener(SharedCountListener listener);publicvoidaddListener(finalSharedCountListener listener,Executor executor);// 共享计数在使用之前必须开启publicvoidstart()throwsException;// 关闭共享计数publicvoidclose()throwsIOException;
使用案例:
StockController:
@GetMapping("test/zk/share/count")publicStringtestZkShareCount(){this.stockService.testZkShareCount();return"hello shareData";}
StockService:
publicvoidtestZkShareCount(){try{// 第三个参数是共享计数的初始值SharedCount sharedCount =newSharedCount(curatorFramework,"/curator/count",0);// 启动共享计数器
sharedCount.start();// 获取共享计数的值int count = sharedCount.getCount();// 修改共享计数的值int random =newRandom().nextInt(1000);
sharedCount.setCount(random);System.out.println("我获取了共享计数的初始值:"+ count +",并把计数器的值改为:"+ random);
sharedCount.close();}catch(Exception e){
e.printStackTrace();}}
8.2. DistributedAtomicNumber
DistributedAtomicNumber
接口是分布式原子数值类型的抽象,定义了分布式原子数值类型需要提供的方法。
DistributedAtomicNumber
接口有两个实现:
DistributedAtomicLong
和
DistributedAtomicInteger
这两个实现将各种原子操作的执行委托给了
DistributedAtomicValue
,所以这两种实现是类似的,只不过表示的数值类型不同而已。这里以
DistributedAtomicLong
为例进行演示
DistributedAtomicLong
除了计数的范围比
SharedCount
大了之外,比
SharedCount
更简单易用。它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用
InterProcessMutex
方式来更新计数值。此计数器有一系列的操作:
get()
: 获取当前值increment()
:加一decrement()
: 减一add()
:增加特定的值subtract()
: 减去特定的值trySet()
: 尝试设置计数值forceSet()
: 强制设置计数值
最后必须检查返回结果的
succeeded()
, 代表此操作是否成功。如果操作成功,
preValue()
代表操作前的值,
postValue()
代表操作后的值。
版权归原作者 鱼找水需要时间 所有, 如有侵权,请联系我们删除。