1、前言
又到了金三银四的时候,大家都按耐不住内心的躁动,我在这里给大家分享下之前面试中遇到的一个知识点(zookeeper应用场景),希望对大家有些帮助。如有不足,欢迎大佬们指点指点。
2、zookeeper简介
ZooKeeper 是分布式应用程序的分布式开源协调服务。它公开了一组简单的api,分布式应用程序可以基于这些api实现更高级别的同步、配置维护、分组和命名服务。它被设计为易于编程,并使用一种数据模型,该模型以熟悉的文件系统目录树结构为风格。它在 Java 中运行,并具有 Java 和 C 的绑定。
众所周知,协调服务很难做好。它们特别容易出现竞争条件和死锁等错误。ZooKeeper背后的动机是减轻分布式应用程序从头开始实现协调服务的负担。
3、zookeeper应用场景
下面的代码都需要一个序列化类,所以放在最前面声明
/**
* @author admin
*/publicclassMyZkSerializerimplementsZkSerializer{
String charset ="UTF-8";@Overridepublic Object deserialize(byte[] bytes)throws ZkMarshallingError {try{returnnewString(bytes, charset);}catch(UnsupportedEncodingException e){thrownewZkMarshallingError(e);}}@Overridepublicbyte[]serialize(Object obj)throws ZkMarshallingError {try{return String.valueOf(obj).getBytes(charset);}catch(UnsupportedEncodingException e){thrownewZkMarshallingError(e);}}}
3.1 配置中心
3.1.1 什么是配置中心呢?
假设咱们的项目部署在5台机子上形成一个集群,那么这5个实例在启动时读取的配置信息应该是一样的,同时一旦咱们的配置信息更改了,需要马上通知到这5个实例上并生效,这就是配置中心的功能。
3.1.2 zookeeper怎么实现配置中心呢?
必要条件
1、znode能存储数据
2、Watch能监听数据改变
实现方式
- 一个配置项对应一个zNode
// 1 将单个配置放到zookeeper上publicvoidputZk(){
ZkClient client =newZkClient("192.168.10.11:2181");
client.setZkSerializer(newMyZkSerializer());
String configPath ="/config1";
String value ="1111111";if(client.exists(configPath)){
client.writeData(configPath, value);}else{
client.createPersistent(configPath, value);}
client.close();}
// 需要配置的服务都从zk上取,并注册watch来实时获得配置更新publicvoidgetConfigFromZk(){
ZkClient client =newZkClient("192.168.10.11:2181");
client.setZkSerializer(newMyZkSerializer());
String configPath ="/config1";
String value = client.readData(configPath);
System.out.println("从zk读到配置config1的值为:"+ value);// 监控配置的更新,基于watch实现发布订阅功能
client.subscribeDataChanges(configPath,newIZkDataListener(){@OverridepublicvoidhandleDataDeleted(String dataPath)throws Exception {// TODO 配置删除业务处理}@OverridepublicvoidhandleDataChange(String dataPath, Object data)throws Exception {
System.out.println("获得更新的配置值:"+ data);}});// 这里只是为演示实时获取到配置值更新而加的等待。实际项目应用中根据具体场景写(可用阻塞方式)try{
Thread.sleep(5*60*1000);}catch(InterruptedException e){
e.printStackTrace();}}
- 一个配置文件对应一个zNode
// 将配置文件的内容存放到zk节点上publicvoidputConfigFile2ZK()throws IOException {
File f =newFile(this.getClass().getResource("/config.xml").getFile());
FileInputStream fin =newFileInputStream(f);byte[] datas =newbyte[(int) f.length()];
fin.read(datas);
fin.close();
ZkClient client =newZkClient("192.168.10.11:2181");
client.setZkSerializer(newBytesPushThroughSerializer());
String configPath ="/config2";if(client.exists(configPath)){
client.writeData(configPath, datas);}else{
client.createPersistent(configPath, datas);}
client.close();}
获取整个配置文件的方式跟步骤1类似,只不过需要解析对应的配置文件而已。
3.2 命名服务(注册中心)
3.2.1 什么是注册中心?
注册中心主要存储注册实例应用的名称和ip地址,供其他服务通过RPC来调用,其他服务只关心你的服务名是啥,而不必关心你的服务器地址对不对,有没有上线。
3.2.2 zookeeper怎么实现注册中心呢?
首先是服务发现问题,当一个实例启动后会向zookeeper创建一个临时节点,并存入自己的服务信息(包括应用名和ip等),其他服务通过zookeeper拿到该实例的注册信息即可调用。
一旦该服务宕机了或者主动下线,那么该临时节点则会被删除,其他服务通过watch监听到下线通知,也就不会在去调用该服务。
3.3 Master选举
3.3.1 什么是Master选举?
在一个主从部署的集群里,一般master实例负责所有请求的读写功能,其他slave实例同步master的数据,一旦master节点不可用了,那么就需要从他的slave实例中重新选举一个节点作为master实例。
3.3.2 zookeeper怎么实现Master选举呢?
首先是实例去竞争创建临时决定(Master节点),谁创建成功谁就是master,否则就是slave。
同时所有的实例都需要去servers节点(临时节点)注册自己的服务信息,方便通过该节点获取到所有在线的实例,有点类似注册中心的意思。
下面咱们通过代码来模拟一下master选举
/**
* @author yinfeng
*/publicclassServer{privatefinal String cluster;privatefinal String name;privatefinal String address;privatefinal String path, value;private String master;publicServer(String cluster, String name, String address){super();this.cluster = cluster;this.name = name;this.address = address;
path ="/"+this.cluster +"Master";
value ="name:"+ name +" address:"+ address;final ZkClient client =newZkClient("192.168.10.11:2181");
client.setZkSerializer(newMyZkSerializer());final Thread thread =newThread(()->{electionMaster(client);});
thread.setDaemon(true);
thread.start();}/**
* 选举方法
**/publicvoidelectionMaster(ZkClient client){try{
client.createEphemeral(path, value);
master = client.readData(path);
System.out.println(value +"创建节点成功,成为Master");}catch(ZkNodeExistsException e){
master = client.readData(path);
System.out.println("Master为:"+ master);}// 为阻塞自己等待而用final CountDownLatch cdl =newCountDownLatch(1);// 注册watcher
IZkDataListener listener =newIZkDataListener(){@OverridepublicvoidhandleDataDeleted(String dataPath)throws Exception {
System.out.println("-----监听到节点被删除");
cdl.countDown();}@OverridepublicvoidhandleDataChange(String dataPath, Object data)throws Exception {}};
client.subscribeDataChanges(path, listener);// 让自己阻塞if(client.exists(path)){try{
cdl.await();}catch(InterruptedException e1){
e1.printStackTrace();}}// 醒来后,取消watcher
client.unsubscribeDataChanges(path, listener);// 递归调自己(下一次选举)electionMaster(client);}}
咱们通过启动多个服务来看看是否测试成功
publicstaticvoidmain(String[] args){// 测试时,依次开启多个Server实例java进程,然后停止获取的master的节点,看谁抢到Master
Server s =newServer("cluster1","server1","192.168.1.11:8991");
Server s1 =newServer("cluster1","server2","192.168.1.11:8992");
Server s2 =newServer("cluster1","server3","192.168.1.11:8993");
Server s3 =newServer("cluster1","server4","192.168.1.11:8994");try{
Thread.sleep(100000);}catch(InterruptedException e){
e.printStackTrace();}}
可以看到功能一切正常
3.4 分布式队列
3.4.1 什么是分布式队列?
队列的定义是先进先出,而在分布式环境下保证先进先出的队列就是分布式队列,有点类似于消息队列。
3.4.2 zookeeper怎么实现分布式队列?
由上图可知,zookeeper主要通过顺序节点来保证队列的先进先出。
3.5 分布式锁
3.5.1 什么是分布式锁?
分布式锁指的是控制分布式系统不同进程共同访问共享资源的一种锁的实现。 如果在不同的系统或同一个系统的不同主机之间共享和竞争某个临界资源,往往需要互斥来防止彼此干扰,避免出现脏数据或非业务数据,保证数据一致性。
3.5.2 zookeeper通过临时节点实现布式锁?
实现原理是zookeeper节点不可重名和watch的监听通知机制,使用临时节点主要是为了避免获取锁的节点由于异常原因无法释放锁而导致出现死锁情况。
竞争锁流程如下图:
代码实现如下
/**
* @author yinfeng
*/publicclassZKDistributeLockimplementsLock{private String lockPath;private ZkClient client;// 锁重入计数private ThreadLocal<Integer> reentrantCount =newThreadLocal<>();publicZKDistributeLock(String lockPath){super();this.lockPath = lockPath;
client =newZkClient("192.168.10.11:2181");
client.setZkSerializer(newMyZkSerializer());}@OverridepublicbooleantryLock(){// 锁重入不会阻塞if(this.reentrantCount.get()!= null){int count =this.reentrantCount.get();if(count >0){this.reentrantCount.set(++count);returntrue;}}// 创建节点try{
client.createEphemeral(lockPath);this.reentrantCount.set(1);}catch(ZkNodeExistsException e){returnfalse;}returntrue;}@Overridepublicvoidunlock(){// 重入释进行放锁处理if(this.reentrantCount.get()!= null){int count =this.reentrantCount.get();if(count >1){this.reentrantCount.set(--count);return;}else{this.reentrantCount.set(null);}}
client.delete(lockPath);}@Overridepublicvoidlock(){// 如果获取不到锁,阻塞等待if(!tryLock()){// 没获得锁,阻塞自己waitForLock();// 再次尝试lock();}}privatevoidwaitForLock(){final CountDownLatch cdl =newCountDownLatch(1);
IZkDataListener listener =newIZkDataListener(){@OverridepublicvoidhandleDataDeleted(String dataPath)throws Exception {
System.out.println("----收到节点被删除了-------------");
cdl.countDown();}@OverridepublicvoidhandleDataChange(String dataPath, Object data)throws Exception {}};
client.subscribeDataChanges(lockPath, listener);// 阻塞自己if(this.client.exists(lockPath)){try{
cdl.await();}catch(InterruptedException e){
e.printStackTrace();}}// 取消注册
client.unsubscribeDataChanges(lockPath, listener);}@OverridepublicvoidlockInterruptibly(){}@OverridepublicbooleantryLock(long time, TimeUnit unit){returnfalse;}@Overridepublic Condition newCondition(){return null;}}
咱们在写个测试类试一下效果,通过多线程来模拟多实例竞争锁
publicstaticvoidmain(String[] args){// 并发数int currency =50;// 循环屏障final CyclicBarrier cb =newCyclicBarrier(currency);// 多线程模拟高并发for(int i =0; i < currency; i++){newThread(()->{
System.out.println(Thread.currentThread().getName()+"---------我准备好---------------");// 等待一起出发try{
cb.await();}catch(InterruptedException| BrokenBarrierException e){
e.printStackTrace();}
ZKDistributeLock lock =newZKDistributeLock("/distLock11");try{
lock.lock();
System.out.println(Thread.currentThread().getName()+" 获得锁!");try{
Thread.sleep(1000*2);}catch(InterruptedException e){
e.printStackTrace();}}finally{
lock.unlock();
System.out.println(Thread.currentThread().getName()+" 释放锁!");}}).start();}}
可以看到功能是正常的,但也有个很明显的问题,就是一旦释放锁之后所有的实例(线程)都会收到通知然后去重新竞争锁,当实例的数量达到一定程度之后,那么势必会对zookeeper造成很大的带宽和性能消耗,严重的话可能会把zookeeper集群搞挂了,这种情况也叫惊群效应,所以只通过顺序节点实现分布式锁还是有一定的问题的,下面咱们再来优化一下。
3.5.3 zookeeper通过临时顺序节点实现布式锁?
既然通过临时节点会造成惊群效应,那么咱们是否能将临时和顺序节点结合起来,通过最小的那个zNode节点来视为获得锁的标志呢?
答案是肯定能的,当释放锁时只通知他的下一个节点即可,完美的避免了惊群效应的发生。
原理图如下
流程图如下
接着咱们通过代码来实现吧
/**
* @author yinfeng
*/publicclassZKDistributeImproveLockimplementsLock{/**
* 利用临时顺序节点来实现分布式锁
* 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
* 释放锁:删除自己创建的临时顺序节点
*/privatefinal String lockPath;privatefinal ZkClient client;private ThreadLocal<String> currentPath =newThreadLocal<>();private ThreadLocal<String> beforePath =newThreadLocal<>();/**
* 锁重入计数
*/private ThreadLocal<Integer> reentrantCount =newThreadLocal<>();publicZKDistributeImproveLock(String lockPath){super();this.lockPath = lockPath;
client =newZkClient("192.168.10.11:2181");
client.setZkSerializer(newMyZkSerializer());if(!this.client.exists(lockPath)){try{this.client.createPersistent(lockPath);}catch(ZkNodeExistsException ignored){}}}@OverridepublicbooleantryLock(){// 重入则直接返回获得锁成功if(this.reentrantCount.get()!= null){int count =this.reentrantCount.get();if(count >0){this.reentrantCount.set(++count);returntrue;}}if(this.currentPath.get()== null){
currentPath.set(this.client.createEphemeralSequential(lockPath +"/","aaa"));}// 获得所有的子节点
List<String> children =this.client.getChildren(lockPath);// 排序list
Collections.sort(children);// 判断当前节点是否是最小的if(currentPath.get().equals(lockPath +"/"+ children.get(0))){this.reentrantCount.set(1);returntrue;}else{// 取到前一个// 得到字节的索引号int curIndex = children.indexOf(currentPath.get().substring(lockPath.length()+1));
beforePath.set(lockPath +"/"+ children.get(curIndex -1));}returnfalse;}@Overridepublicvoidlock(){if(!tryLock()){// 阻塞等待waitForLock();// 再次尝试加锁lock();}}privatevoidwaitForLock(){final CountDownLatch cdl =newCountDownLatch(1);// 注册watcher
IZkDataListener listener =newIZkDataListener(){@OverridepublicvoidhandleDataDeleted(String dataPath)throws Exception {
System.out.println("-----监听到节点被删除");
cdl.countDown();}@OverridepublicvoidhandleDataChange(String dataPath, Object data)throws Exception {}};
client.subscribeDataChanges(this.beforePath.get(), listener);// 让自己阻塞if(this.client.exists(this.beforePath.get())){try{
cdl.await();}catch(InterruptedException e){
e.printStackTrace();}}// 醒来后,取消watcher
client.unsubscribeDataChanges(this.beforePath.get(), listener);}@Overridepublicvoidunlock(){// 重入的释放锁处理if(this.reentrantCount.get()!= null){int count =this.reentrantCount.get();if(count >1){this.reentrantCount.set(--count);return;}else{this.reentrantCount.set(null);}}// 删除节点this.client.delete(this.currentPath.get());}@OverridepublicvoidlockInterruptibly()throws InterruptedException {}@OverridepublicbooleantryLock(long time, TimeUnit unit)throws InterruptedException {returnfalse;}@Overridepublic Condition newCondition(){return null;}}
最后咱们再来测试一下
publicstaticvoidmain(String[] args){// 并发数int currency =50;// 循环屏障final CyclicBarrier cb =newCyclicBarrier(currency);// 多线程模拟高并发for(int i =0; i < currency; i++){newThread(()->{
System.out.println(Thread.currentThread().getName()+"---------我准备好---------------");// 等待一起出发try{
cb.await();}catch(InterruptedException| BrokenBarrierException e){
e.printStackTrace();}
ZKDistributeImproveLock lock =newZKDistributeImproveLock("/distLock");try{
lock.lock();
System.out.println(Thread.currentThread().getName()+" 获得锁!");try{
Thread.sleep(1000*2);}catch(InterruptedException e){
e.printStackTrace();}}finally{
lock.unlock();
System.out.println(Thread.currentThread().getName()+" 释放锁!");}}).start();}}
可以看到功能是正常的,同时在释放锁的时候只通知了下一节点,没有出现惊群效应,非常完美。
4、总结
在微服务和分布式的时代,zookeeper作为协调服务的代表,在面试中很容易被问到,希望大家能掌握这方面的知识,提高自己的核心竞争力,在谈薪的时候拿到最高的那个区间。
最后,外出打工不易,希望各位兄弟找到自己心仪的工作,虎年发发发! 也希望兄弟们能关注、点赞、收藏、评论支持一波,非常感谢大家!
版权归原作者 隐 风 所有, 如有侵权,请联系我们删除。