0


大数据Zookeeper--案例

文章目录

服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。

需求分析

服务器动态上下线

具体实现

1)先在集群上创建/servers节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers" 
Created /servers

2)在Idea中创建包名:com.yudan.case1

3)服务器端向Zookeeper注册代码

importorg.apache.zookeeper.*;importjava.io.IOException;publicclassDistributeServer{privateString connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";privateint sessionTime =100000;privateZooKeeper zk;publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{DistributeServer server =newDistributeServer();// 1、获取zk连接
        server.getConnect();// 2、注册服务器到zk集群
        server.regist(args[0]);// 3、启动 业务逻辑(睡觉)
        server.business();}// 创建到 zk 的客户端连接privatevoidgetConnect()throwsIOException{

        zk =newZooKeeper(connectString, sessionTime,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){}});}// 注册到服务器privatevoidregist(String hostname)throwsInterruptedException,KeeperException{String create = zk.create("/servers/"+hostname, hostname.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname +" "+"is online");}// 业务功能privatevoidbusiness()throwsInterruptedException{Thread.sleep(Long.MAX_VALUE);}}

4)客户端代码

importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.WatchedEvent;importorg.apache.zookeeper.Watcher;importorg.apache.zookeeper.ZooKeeper;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;publicclassDistributeClient{privateString connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";privateint sessionTime =100000;privateZooKeeper zk;publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{DistributeClient client =newDistributeClient();// 1、获取zk连接
        client.getConnect();// 2、监听/servers下面子节点的增加和删除
        client.getServersList();// 3、业务逻辑(睡觉)
        client.business();}// 创建到 zk 的客户端连接privatevoidgetConnect()throwsIOException{
        zk =newZooKeeper(connectString, sessionTime,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){// 再次启动监听try{getServersList();}catch(InterruptedException e){thrownewRuntimeException(e);}catch(KeeperException e){thrownewRuntimeException(e);}}});}// 获取服务器列表信息privatevoidgetServersList()throwsInterruptedException,KeeperException{// 获取服务器子节点信息,并且对父节点进行监听List<String> children = zk.getChildren("/servers",true);// 存储服务器信息列表 ArrayList<String> servers =newArrayList<>();// 遍历所有节点,获取节点中的主机名称信息 for(String child : children){byte[] data = zk.getData("/servers/"+ child,false,null);

            servers.add(newString(data));}// 打印System.out.println(servers);}privatevoidbusiness()throwsInterruptedException{Thread.sleep(Long.MAX_VALUE);}}

测试

1)在Linux命令行上操作增加减少服务器

(1)启动DistributeClient 客户端

(2)在hadoop102上zk的客户端/servers目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1]  create -e -s /servers/hadoop102 "hadoop102" 
[zk: localhost:2181(CONNECTED) 2]  create -e -s /servers/hadoop103 "hadoop103"

(3)观察Idea控制台变化

[hadoop102, hadoop103]

(4)执行删除操作

[zk: localhost:2181(CONNECTED) 8]  delete /servers/hadoop1020000000000 

(5)观察Idea控制台变化

[hadoop103] 

2)在Idea上操作增加减少服务器

(1)启动DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动DistributeServer 服务

  • 点击Edit Configurations…在这里插入图片描述
  • 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102在这里插入图片描述
  • 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run “DistributeServer.main()”
  • 观察DistributeServer控制台,提示hadoop102 is online
  • 观察DistributeClient控制台,提示hadoop102已经上线

Zookeeper分布式锁案例

什么叫做分布式锁呢?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
分布式锁

原生Zookeeper实现分布式锁

1)分布式锁实现

importorg.apache.zookeeper.*;importorg.apache.zookeeper.data.Stat;importjava.io.IOException;importjava.util.Collections;importjava.util.List;importjava.util.concurrent.CountDownLatch;publicclassDistributeLock{privatefinalString connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";privatefinalint sessionTime =100000;privatefinalZooKeeper zk;// 当前client等待的子节点privateString waitPath;// zookeeper节点等待privateCountDownLatch waitLatch =newCountDownLatch(1);// zookeeper连接privateCountDownLatch connectLatch =newCountDownLatch(1);// 当前client创建的子节点privateString currentMode;// 和 zk 服务建立连接,并创建根节点publicDistributeLock()throwsIOException,InterruptedException,KeeperException{// 1、获取连接
        zk =newZooKeeper(connectString, sessionTime,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){// connectLatch 如果连接上zk 可以释放// 连接建立时, 打开latch, 唤醒wait在该latch上的线程if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
                    connectLatch.countDown();}// waitLatch 需要释放// 发生了waitPath的删除事件if(watchedEvent.getType()==Event.EventType.NodeDeleted&& watchedEvent.getPath().equals(waitPath)){
                    waitLatch.countDown();}}});// 等待 zookeeper正常连接后,往下走程序
        connectLatch.await();// 2、判断根节点/locks是否存在Stat stat = zk.exists("/locks",false);if(stat ==null){// 创建一下根节点
            zk.create("/locks","locks".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}// 对zk加锁publicvoidzkLock(){// 创建对应的临时带序号节点try{
            currentMode = zk.create("/locks/"+"seq-",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是, 监听他序号的前一个节点List<String> children = zk.getChildren("/locks",false);// 如果children 只有一个值,那就直接获取锁;如果有多个节点,需要判断,哪个节点最小if(children.size()==1){return;}else{// 对children集合内的节点进行排序Collections.sort(children);// 获取节点名称 seq-String thisNode = currentMode.substring("/locks/".length());// 通过seq- 获取到该节点在children集合中的位置int index = children.indexOf(thisNode);// 判断if(index ==-1){System.out.println("数据异常");}elseif(index ==0){// 就一个节点,可以获取锁了return;}else{// 需要监听前一个节点
                    waitPath ="/locks/"+ children.get(index-1);
                    zk.getData(waitPath,true,null);// 等待监听
                    waitLatch.await();return;}}}catch(KeeperException e){thrownewRuntimeException(e);}catch(InterruptedException e){thrownewRuntimeException(e);}}// 对zk解锁publicvoidunzkLock(){// 删除节点try{
            zk.delete(currentMode,-1);}catch(InterruptedException e){thrownewRuntimeException(e);}catch(KeeperException e){thrownewRuntimeException(e);}}}

2)分布式锁测试

(1)创建两个线程

importorg.apache.zookeeper.KeeperException;importjava.io.IOException;publicclassDistributeLockTest{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{// 创建分布式锁1finalDistributeLock lock1 =newDistributeLock();// 创建分布式锁2finalDistributeLock lock2 =newDistributeLock();newThread(newRunnable(){@Overridepublicvoidrun(){// 获取锁对象try{
                    lock1.zkLock();System.out.println("线程1 启动,获取到锁");Thread.sleep(5*1000);

                    lock1.unzkLock();System.out.println("线程1 释放锁");}catch(InterruptedException e){thrownewRuntimeException(e);}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){// 获取锁对象try{
                    lock2.zkLock();System.out.println("线程2 启动,获取到锁");Thread.sleep(5*1000);

                    lock2.unzkLock();System.out.println("线程2 释放锁");}catch(InterruptedException e){thrownewRuntimeException(e);}}}).start();}}

(2)观察控制台变化

线程1获取锁 
线程1释放锁 
线程2获取锁 
线程2释放锁

Curator框架实现分布式锁

1)原生的Java API开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch

(2)Watch需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

2)Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

3)Curator 案例实操

(1)添加依赖

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version></dependency>

(2)代码实现

importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importorg.apache.curator.retry.ExponentialBackoffRetry;publicclassCuratorLockTest{publicstaticvoidmain(String[] args){// 创建分布式锁1InterProcessMutex lock1 =newInterProcessMutex(getCuratorFramework(),"/locks");// 创建分布式锁2InterProcessMutex lock2 =newInterProcessMutex(getCuratorFramework(),"/locks");newThread(newRunnable(){@Overridepublicvoidrun(){try{
                    lock1.acquire();System.out.println("线程1 获取到锁");

                    lock1.acquire();System.out.println("线程1 再次获取到锁");Thread.sleep(5*1000);

                    lock1.release();System.out.println("线程1 释放锁");

                    lock1.release();System.out.println("线程1 再次释放锁");}catch(Exception e){thrownewRuntimeException(e);}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{
                    lock2.acquire();System.out.println("线程2 获取到锁");

                    lock2.acquire();System.out.println("线程2 再次获取到锁");Thread.sleep(5*1000);

                    lock2.release();System.out.println("线程2 释放锁");

                    lock2.release();System.out.println("线程2 再次释放锁");}catch(Exception e){thrownewRuntimeException(e);}}}).start();}// 分布式锁初始化privatestaticCuratorFrameworkgetCuratorFramework(){// 重试策略,初始时间3秒,重试3次ExponentialBackoffRetry policy =newExponentialBackoffRetry(3000,3);CuratorFramework client =CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181").connectionTimeoutMs(100000).sessionTimeoutMs(100000).retryPolicy(policy).build();// 启动客户端
        client.start();System.out.println("zookeeper 启动成功!");return client;}}

(2)观察控制台变化:

线程1获取锁 
线程1再次获取锁 
线程1释放锁 
线程1再次释放锁 
线程2获取锁 
线程2再次获取锁 
线程2释放锁 
线程2再次释放锁

Zookeeper面试重点

选举机制

半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则:

投票过半数时,服务器id大的胜出

(2)第二次启动选举规则:

①EPOCH大的直接胜出

②EPOCH相同,事务id大的胜出

③事务id相同,服务器id大的胜出

生产集群安装多少zk合适

安装奇数台。

生产经验:

  • 10台服务器:3台zk;
  • 20台服务器:5台zk;
  • 100台服务器:11台zk;
  • 200台服务器:11台zk

zk常用命令

ls、get、create、delete


本文转载自: https://blog.csdn.net/xd__xy/article/details/136018251
版权归原作者 泛黄的咖啡店 所有, 如有侵权,请联系我们删除。

“大数据Zookeeper--案例”的评论:

还没有评论