0


Zookeeper应用场景实战一

1. Zookeeper Java客户端实战

ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:

  • ZooKeeper官方的Java客户端API。
  • 第三方的Java客户端API,比如Curator。

ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:

  • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。
  • 会话超时之后没有实现重连机制。
  • 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
  • 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
  • 创建节点时如果抛出异常,需要自行检查节点是否存在。
  • 无法实现级联删除。

总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。

1.1 Zookeeper 原生Java客户端使用

引入zookeeper client依赖

  1. <!-- zookeeper client -->
  2. <dependency>
  3. <groupId>org.apache.zookeeper</groupId>
  4. <artifactId>zookeeper</artifactId>
  5. <version>3.8.0</version>
  6. </dependency>

注意:保持与服务端版本一致,不然会有很多兼容性的问题

ZooKeeper原生客户端主要使用org.apache.zookeeper.ZooKeeper这个类来使用ZooKeeper服务。

ZooKeeper常用构造器

  1. ZooKeeper (connectString, sessionTimeout, watcher)
  • connectString:使用逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host 是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选取connectString 中的一个节点建立连接。
  • sessionTimeout : session timeout时间。
  • watcher:用于接收到来自ZooKeeper集群的事件。

使用 zookeeper 原生 API,连接zookeeper集群

示例代码:

  1. public class ZkClientDemo {
  2. private static final String CONNECT_STR="192.168.189.131";
  3. private final static String CLUSTER_CONNECT_STR="192.168.65.163:2181,192.168.65.184:2181,192.168.65.186:2181";
  4. public static void main(String[] args) throws Exception {
  5. //获取zookeeper对象
  6. ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);
  7. //CONNECTED
  8. System.out.println(zooKeeper.getState());
  9. Stat stat = zooKeeper.exists("/user",false);
  10. if(null ==stat){
  11. //创建持久节点
  12. zooKeeper.create("/user","bubble".getBytes(),
  13. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  14. }
  15. //永久监听 addWatch -m mode /user
  16. zooKeeper.addWatch("/user",new Watcher() {
  17. @Override
  18. public void process(WatchedEvent event) {
  19. System.out.println("event:=> " + event);
  20. //TODO
  21. }
  22. },AddWatchMode.PERSISTENT);
  23. stat = new Stat();
  24. byte[] data = zooKeeper.getData("/user", false, stat);
  25. System.out.println(" data:=> "+new String(data));
  26. // -1: 无条件更新
  27. //zooKeeper.setData("/user", "third".getBytes(), -1);
  28. // 带版本条件更新
  29. int version = stat.getVersion();
  30. zooKeeper.setData("/user", "bubble".getBytes(), version);
  31. Thread.sleep(Integer.MAX_VALUE);
  32. }
  33. }
  1. public class ZooKeeperFacotry {
  2. private static final int SESSION_TIMEOUT = 5000;
  3. public static ZooKeeper create(String connectionString) throws Exception {
  4. final CountDownLatch connectionLatch = new CountDownLatch(1);
  5. ZooKeeper zooKeeper = new ZooKeeper(connectionString, SESSION_TIMEOUT, new Watcher() {
  6. @Override
  7. public void process(WatchedEvent event) {
  8. if (event.getType()== Event.EventType.None
  9. && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
  10. connectionLatch.countDown();
  11. System.out.println("连接建立");
  12. }
  13. }
  14. });
  15. System.out.println("等待连接建立...");
  16. connectionLatch.await();
  17. return zooKeeper;
  18. }
  19. }

运行结果:

  1. 等待连接建立...
  2. // ...省略
  3. 连接建立
  4. CONNECTED
  5. data:=> bubble
  6. event:=> WatchedEvent state:SyncConnected type:NodeDataChanged path:/user//第一次运行触发的监听
  7. event:=> WatchedEvent state:SyncConnected type:NodeDataChanged path:/user//操作/user触发的监听,set /user bubble
  8. event:=> WatchedEvent state:SyncConnected type:NodeDataChanged path:/user//操作一次触发一次

Zookeeper主要方法

  • create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。
  • delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
  • exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
  • getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
  • setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
  • getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
  • sync(path):把客户端 session 连接节点和 leader 节点进行同步。

方法特点:

  • 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。
  • 所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新,这样的更新是条件更新。
  • 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响 应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来自服务端的响应。

同步创建节点:

  1. @Test
  2. public void createTest() throws KeeperException, InterruptedException {
  3. String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  4. log.info("created path: {}",path);
  5. }

异步创建节点:

  1. @Test
  2. public void createAsycTest() throws InterruptedException {
  3. zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
  4. CreateMode.PERSISTENT,
  5. (rc, path, ctx, name) -> log.info("rc {},path {},ctx {},name {}",rc,path,ctx,name),"context");
  6. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  7. }

修改节点数据:

  1. @Test
  2. public void setTest() throws KeeperException, InterruptedException {
  3. Stat stat = new Stat();
  4. byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
  5. log.info("修改前: {}",new String(data));
  6. zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
  7. byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
  8. log.info("修改后: {}",new String(dataAfter));
  9. }

1.2 Curator开源客户端使用

Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。

Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

Guava is to Java that Curator to ZooKeeper

在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。

官网:Welcome to Apache Curator | Apache Curator

引入依赖

Curator 包含了几个包:

  • curator-framework是对ZooKeeper的底层API的一些封装。
  • curator-client提供了一些客户端的操作,例如重试策略等。
  • curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
  1. <!-- zookeeper client -->
  2. <dependency>
  3. <groupId>org.apache.zookeeper</groupId>
  4. <artifactId>zookeeper</artifactId>
  5. <version>3.8.0</version>
  6. </dependency>
  7. <!--curator-->
  8. <dependency>
  9. <groupId>org.apache.curator</groupId>
  10. <artifactId>curator-recipes</artifactId>
  11. <version>5.1.0</version>
  12. <exclusions>
  13. <exclusion>
  14. <groupId>org.apache.zookeeper</groupId>
  15. <artifactId>zookeeper</artifactId>
  16. </exclusion>
  17. </exclusions>
  18. </dependency>

示例代码:

  1. public class CuratorDemo {
  2. // ZooKeeper集群连接字符串,包括多个ZooKeeper服务器的地址和端口
  3. //private final static String CLUSTER_CONNECT_STR="192.168.65.163:2181,192.168.65.184:2181,192.168.65.186:2181";
  4. private final static String CLUSTER_CONNECT_STR="192.168.189.131";
  5. public static void main(String[] args) throws Exception {
  6. //构建客户端实例
  7. CuratorFramework curatorFramework= CuratorFrameworkFactory.builder()
  8. .connectString(CLUSTER_CONNECT_STR)
  9. .retryPolicy(new ExponentialBackoffRetry(1000,3)) // 设置重试策略
  10. .build();
  11. //启动客户端
  12. curatorFramework.start();
  13. String path = "/user";
  14. // 检查节点是否存在
  15. Stat stat = curatorFramework.checkExists().forPath(path);
  16. if (stat != null) {
  17. // 删除节点
  18. curatorFramework.delete()
  19. .deletingChildrenIfNeeded() // 如果存在子节点,则删除所有子节点
  20. .forPath(path); // 删除指定节点
  21. }
  22. // 创建节点
  23. curatorFramework.create()
  24. .creatingParentsIfNeeded() // 如果父节点不存在,则创建父节点
  25. .withMode(CreateMode.PERSISTENT)
  26. .forPath(path, "Init Data".getBytes());
  27. // 注册节点监听
  28. curatorFramework.getData()
  29. .usingWatcher(new CuratorWatcher() {
  30. @Override
  31. public void process(WatchedEvent event) throws Exception {
  32. byte[] bytes = curatorFramework.getData().forPath(path);
  33. System.out.println("Node data changed: " + new String(bytes));
  34. }
  35. })
  36. .forPath(path);
  37. // 更新节点数据 set /user Update Data
  38. curatorFramework.setData()
  39. .forPath(path, "Update Data".getBytes());
  40. stat=new Stat();
  41. //查询节点数据
  42. byte[] bytes = curatorFramework.getData().storingStatIn(stat)
  43. .forPath("/user");
  44. System.out.println(new String(bytes));
  45. ExecutorService executorService = Executors.newSingleThreadExecutor();
  46. //异步处理,可以指定线程池
  47. curatorFramework.getData().inBackground((item1, item2) -> {
  48. System.out.println("background:"+item1+" <---> "+item2);
  49. System.out.println("item2.getStat()=> " + item2.getStat());
  50. },executorService).forPath(path);
  51. // 创建节点缓存,用于监听指定节点的变化
  52. final NodeCache nodeCache = new NodeCache(curatorFramework, path);
  53. // 启动NodeCache并立即从服务端获取最新数据
  54. nodeCache.start(true);
  55. // 注册节点变化监听器
  56. nodeCache.getListenable().addListener(new NodeCacheListener() {
  57. @Override
  58. public void nodeChanged() throws Exception {
  59. byte[] newData = nodeCache.getCurrentData().getData();
  60. System.out.println("Node data changed: " + new String(newData));
  61. }
  62. });
  63. // 创建PathChildrenCache
  64. PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
  65. pathChildrenCache.start();
  66. // 注册子节点变化监听器
  67. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  68. @Override
  69. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  70. if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
  71. ChildData childData = event.getData();
  72. System.out.println("Child added: " + childData.getPath());
  73. } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
  74. ChildData childData = event.getData();
  75. System.out.println("Child removed: " + childData.getPath());
  76. } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
  77. ChildData childData = event.getData();
  78. System.out.println("Child updated: " + childData.getPath());
  79. }
  80. }
  81. });
  82. Thread.sleep(Integer.MAX_VALUE);
  83. }
  84. }

运行结果

  1. .....
  2. Node data changed: Update Data
  3. Update Data
  4. background:org.apache.curator.framework.imps.CuratorFrameworkImpl@3c5f290a <---> CuratorEventImpl{type=GET_DATA, resultCode=0, path='/user', name='null', children=null, context=null, stat=40,41,1696611808183,1696611808193,1,0,0,0,11,0,40
  5. , data=[85, 112, 100, 97, 116, 101, 32, 68, 97, 116, 97], watchedEvent=null, aclList=null, opResults=null}
  6. item2.getStat()=> 40,41,1696611808183,1696611808193,1,0,0,0,11,0,40
  7. Child added: /user/bubble//修改子节点触发监听器的打印
  8. Child removed: /user/bubble

创建一个客户端实例

在使用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例。这是一个CuratorFramework类型的对象,有两种方法:

  • 使用工厂类CuratorFrameworkFactory的静态newClient()方法。
  1. // 重试策略
  2. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
  3. //创建客户端实例
  4. CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
  5. //启动客户端
  6. client.start();
  • 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。
  1. //随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
  2. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  3. CuratorFramework client = CuratorFrameworkFactory.builder()
  4. .connectString("192.168.128.129:2181")
  5. .sessionTimeoutMs(5000) // 会话超时时间
  6. .connectionTimeoutMs(5000) // 连接超时时间
  7. .retryPolicy(retryPolicy)
  8. .namespace("base") // 包含隔离名称
  9. .build();
  10. client.start();
  • connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。
  • retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。

策略名称

描述

ExponentialBackoffRetry

重试一组次数,重试之间的睡眠时间增加

RetryNTimes

重试最大次数

RetryOneTime

只重试一次

RetryUntilElapsed

在给定的时间结束之前重试

  • 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

创建节点

创建节点的方式如下面的代码所示,回顾我们之前课程中讲到的内容,描述一个节点要包括节点的类型,即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。

  1. @Test
  2. public void testCreate() throws Exception {
  3. String path = curatorFramework.create().forPath("/curator-node");
  4. curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())
  5. log.info("curator create node :{} successfully.",path);
  6. }

在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。

一次性创建带层级结构的节点

  1. @Test
  2. public void testCreateWithParent() throws Exception {
  3. String pathWithParent="/node-parent/sub-node-1";
  4. String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
  5. log.info("curator create node :{} successfully.",path);
  6. }

获取数据

  1. @Test
  2. public void testGetData() throws Exception {
  3. byte[] bytes = curatorFramework.getData().forPath("/curator-node");
  4. log.info("get data from node :{} successfully.",new String(bytes));
  5. }

更新节点

我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。

  1. @Test
  2. public void testSetData() throws Exception {
  3. curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
  4. byte[] bytes = curatorFramework.setData().forPath("/curator-node");
  5. log.info("get data from node /curator-node :{} successfully.",new String(bytes));
  6. }

删除节点

  1. @Test
  2. public void testDelete() throws Exception {
  3. String pathWithParent="/node-parent";
  4. curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
  5. }

guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。

deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。

异步接口

Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 **EventThread **中调用,也可以自定义线程池。

  1. public interface BackgroundCallback
  2. {
  3. /**
  4. * Called when the async background operation completes
  5. *
  6. * @param client the client
  7. * @param event operation result details
  8. * @throws Exception errors
  9. */
  10. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
  11. }

如上接口,主要参数为 client 客户端, 和 服务端事件 event。

inBackground 异步处理默认在EventThread中执行

  1. @Test
  2. public void test() throws Exception {
  3. curatorFramework.getData().inBackground((item1, item2) -> {
  4. log.info(" background: {}", item2);
  5. }).forPath(ZK_NODE);
  6. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  7. }

指定线程池

  1. @Test
  2. public void test() throws Exception {
  3. ExecutorService executorService = Executors.newSingleThreadExecutor();
  4. curatorFramework.getData().inBackground((item1, item2) -> {
  5. log.info(" background: {}", item2);
  6. },executorService).forPath(ZK_NODE);
  7. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  8. }

Curator 监听器

  1. /**
  2. * Receives notifications about errors and background events
  3. */
  4. public interface CuratorListener
  5. {
  6. /**
  7. * Called when a background task has completed or a watch has triggered
  8. *
  9. * @param client client
  10. * @param event the event
  11. * @throws Exception any errors
  12. */
  13. public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
  14. }

针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听

Curator Caches

Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。

node cache:

NodeCache 对某一个节点进行监听

  1. public NodeCache(CuratorFramework client,
  2. String path)
  3. Parameters:
  4. client - the client
  5. path - path to cache

可以通过注册监听器来实现,对当前节点数据变化的处理

  1. public void addListener(NodeCacheListener listener)
  2. Add a change listener
  3. Parameters:
  4. listener - the listener
  1. @Slf4j
  2. public class NodeCacheTest extends AbstractCuratorTest{
  3. public static final String NODE_CACHE="/node-cache";
  4. @Test
  5. public void testNodeCacheTest() throws Exception {
  6. createIfNeed(NODE_CACHE);
  7. NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);
  8. nodeCache.getListenable().addListener(new NodeCacheListener() {
  9. @Override
  10. public void nodeChanged() throws Exception {
  11. log.info("{} path nodeChanged: ",NODE_CACHE);
  12. printNodeData();
  13. }
  14. });
  15. nodeCache.start();
  16. }
  17. public void printNodeData() throws Exception {
  18. byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);
  19. log.info("data: {}",new String(bytes));
  20. }
  21. }

path cache:

PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听

  1. public PathChildrenCache(CuratorFramework client,
  2. String path,
  3. boolean cacheData)
  4. Parameters:
  5. client - the client
  6. path - path to watch
  7. cacheData - if true, node contents are cached in addition to the stat

可以通过注册监听器来实现,对当前节点的子节点数据变化的处理

  1. public void addListener(PathChildrenCacheListener listener)
  2. Add a change listener
  3. Parameters:
  4. listener - the listener
  1. @Slf4j
  2. public class PathCacheTest extends AbstractCuratorTest{
  3. public static final String PATH="/path-cache";
  4. @Test
  5. public void testPathCache() throws Exception {
  6. createIfNeed(PATH);
  7. PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);
  8. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  9. @Override
  10. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  11. log.info("event: {}",event);
  12. }
  13. });
  14. // 如果设置为true则在首次启动时就会缓存节点内容到Cache中
  15. pathChildrenCache.start(true);
  16. }
  17. }

tree cache:

TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。

  1. public TreeCache(CuratorFramework client,
  2. String path,
  3. boolean cacheData)
  4. Parameters:
  5. client - the client
  6. path - path to watch
  7. cacheData - if true, node contents are cached in addition to the stat

可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理

  1. public void addListener(TreeCacheListener listener)
  2. Add a change listener
  3. Parameters:
  4. listener - the listener
  1. @Slf4j
  2. public class TreeCacheTest extends CuratorBaseOperations {
  3. public static final String TREE_CACHE="/tree-path";
  4. @Test
  5. public void testTreeCache() throws Exception {
  6. CuratorFramework curatorFramework = getCuratorFramework();
  7. createIfNeed(TREE_CACHE);
  8. TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);
  9. treeCache.getListenable().addListener(new TreeCacheListener() {
  10. @Override
  11. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  12. log.info(" tree cache: {}",event);
  13. Map<String, ChildData> currentChildren = treeCache.getCurrentChildren(TREE_CACHE);
  14. log.info("currentChildren: {}",currentChildren);
  15. }
  16. });
  17. treeCache.start();
  18. }
  19. }

运行结果

  1. 2023-10-07 01:03:44.105 [main-EventThread] INFO o.a.curator.framework.state.ConnectionStateManager --- State change: CONNECTED
  2. 2023-10-07 01:03:44.106 [Curator-ConnectionStateManager-0] INFO com.bubble.zk_demo.curator.CuratorStandaloneBase --- 连接成功!
  3. 2023-10-07 01:03:44.113 [main-EventThread] INFO org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  4. 2023-10-07 01:03:44.113 [main-EventThread] INFO org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  5. 2023-10-07 01:03:44.119 [main] INFO com.bubble.zk_demo.curator.CuratorStandaloneBase --- path /tree-path created!
  6. 2023-10-07 01:03:44.133 [Curator-TreeCache-0] INFO com.bubble.zk_demo.curator.cache.TreeCacheTest --- tree cache: TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/tree-path', stat=117,117,1696615424405,1696615424405,0,0,0,0,13,0,117
  7. , data=[49, 57, 50, 46, 49, 54, 56, 46, 49, 56, 57, 46, 49]}}
  8. 2023-10-07 01:03:44.134 [Curator-TreeCache-0] INFO com.bubble.zk_demo.curator.cache.TreeCacheTest --- currentChildren: {}
  9. 2023-10-07 01:03:44.136 [Curator-TreeCache-0] INFO com.bubble.zk_demo.curator.cache.TreeCacheTest --- tree cache: TreeCacheEvent{type=INITIALIZED, data=null}
  10. 2023-10-07 01:03:44.136 [Curator-TreeCache-0] INFO com.bubble.zk_demo.curator.cache.TreeCacheTest --- currentChildren: {}
  11. //修改节点值触发的监听
  12. 2023-10-07 01:04:45.104 [Curator-TreeCache-0] INFO com.bubble.zk_demo.curator.cache.TreeCacheTest --- tree cache: TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/tree-path', stat=117,118,1696615424405,1696615485384,1,0,0,0,13,0,117
  13. , data=[49, 57, 50, 46, 49, 54, 56, 46, 49, 56, 57, 46, 50]}}
  14. 2023-10-07 01:04:45.104 [Curator-TreeCache-0] INFO com.bubble.zk_demo.curator.cache.TreeCacheTest --- currentChildren: {}

2. Zookeeper在分布式命名服务中的实战

命名服务是为系统中的资源提供标识能力。ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名。

哪些应用场景需要用到分布式命名服务呢?典型的有:

  • 分布式API目录
  • 分布式节点命名
  • 分布式ID生成器

2.1 分布式API目录

为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。

著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,使用ZooKeeper维护的全局服务接口API的地址列表。大致的思路为:

  • 服务提供者(Service Provider)在启动的时候,向ZooKeeper上的指定节点/dubbo/${serviceName}/providers写入自己的API地址,这个操作就相当于服务的公开。
  • 服务消费者(Consumer)启动的时候,订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地址,获得所有服务提供者的API。

2.2 分布式节点的命名

一个分布式系统通常会由很多的节点组成,节点的数量不是固定的,而是不断动态变化的。比如说,当业务不断膨胀和流量洪峰到来时,大量的节点可能会动态加入到集群中。而一旦流量洪峰过去了,就需要下线大量的节点。再比如说,由于机器或者网络的原因,一些节点会主动离开集群。

如何为大量的动态节点命名呢?

一种简单的办法是可以通过配置文件,手动为每一个节点命名。但是,如果节点数据量太大,或者说变动频繁,手动命名则是不现实的,这就需要用到分布式节点的命名服务。

可用于生成集群节点的编号的方案:

(1)使用数据库的自增ID特性,用数据表存储机器的MAC地址或者IP来维护。

(2)使用ZooKeeper持久顺序节点的顺序特性来维护节点的NodeId编号。

在第2种方案中,集群节点命名服务的基本流程是:

  • 启动节点服务,连接ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点。
  • 在根节点下创建一个临时顺序ZNode节点,取回ZNode的编号把它作为分布式系统中节点的NODEID。
  • 如果临时节点太多,可以根据需要删除临时顺序ZNode节点。

2.3 分布式的ID生成器

在分布式系统中,分布式ID生成器的使用场景非常之多:

  • 大量的数据记录,需要分布式ID。
  • 大量的系统消息,需要分布式ID。
  • 大量的请求日志,如restful的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析。
  • 分布式节点的命名服务,往往也需要分布式ID。
  • 。。。

传统的数据库自增主键已经不能满足需求。在分布式系统环境中,迫切需要一种全新的唯一ID系统,这种系统需要满足以下需求:

(1)全局唯一:不能出现重复ID。

(2)高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严重影响。

有哪些分布式的ID生成器方案呢?大致如下:

  1. Java的UUID。(不推荐)
  2. 分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生成全局唯一的ID。
  3. Twitter的SnowFlake算法。
  4. ZooKeeper生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID。
  5. MongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库,每插入一条记录会自动生成全局唯一的一个“_id”字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID。

基于Zookeeper实现分布式ID生成器

在ZooKeeper节点的四种类型中,其中有以下两种类型具备自动编号的能力

  • PERSISTENT_SEQUENTIAL持久化顺序节点。
  • EPHEMERAL_SEQUENTIAL临时顺序节点。

ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,会记录每个子节点创建的先后顺序,这个顺序编号是分布式同步的,也是全局唯一的。

可以通过创建ZooKeeper的临时顺序节点的方法,生成全局唯一的ID

  1. @Slf4j
  2. public class IDMaker extends CuratorBaseOperations {
  3. private String createSeqNode(String pathPefix) throws Exception {
  4. CuratorFramework curatorFramework = getCuratorFramework();
  5. //创建一个临时顺序节点
  6. String destPath = curatorFramework.create()
  7. .creatingParentsIfNeeded()
  8. .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
  9. .forPath(pathPefix);
  10. return destPath;
  11. }
  12. public String makeId(String path) throws Exception {
  13. String str = createSeqNode(path);
  14. if(null != str){
  15. //获取末尾的序号
  16. int index = str.lastIndexOf(path);
  17. if(index>=0){
  18. index+=path.length();
  19. return index<=str.length() ? str.substring(index):"";
  20. }
  21. }
  22. return str;
  23. }
  24. }

测试

  1. @Slf4j
  2. public class IDMakerTest {
  3. @Test
  4. public void testMarkId() throws Exception {
  5. IDMaker idMaker = new IDMaker();
  6. idMaker.init();
  7. String pathPrefix = "/idmarker/id-";
  8. //模拟5个线程创建id
  9. for(int i=0;i<5;i++){
  10. new Thread(()->{
  11. for (int j=0;j<10;j++){
  12. String id = null;
  13. try {
  14. id = idMaker.makeId(pathPrefix);
  15. log.info("线程{}第{}次创建id为{}",Thread.currentThread().getName(),
  16. j,id);
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. },"thread"+i).start();
  22. }
  23. Thread.sleep(Integer.MAX_VALUE);
  24. }
  25. }

运行结果

  1. ........
  2. 2023-10-07 01:00:17.459 [main-EventThread] INFO o.a.curator.framework.state.ConnectionStateManager --- State change: CONNECTED
  3. 2023-10-07 01:00:17.459 [Curator-ConnectionStateManager-0] INFO com.bubble.zk_demo.curator.CuratorStandaloneBase --- 连接成功!
  4. 2023-10-07 01:00:17.468 [main-EventThread] INFO org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  5. 2023-10-07 01:00:17.468 [main-EventThread] INFO org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  6. 2023-10-07 01:00:17.478 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread10次创建id0000000001
  7. 2023-10-07 01:00:17.478 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread00次创建id0000000002
  8. 2023-10-07 01:00:17.478 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread40次创建id0000000000
  9. 2023-10-07 01:00:17.478 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread20次创建id0000000003
  10. 2023-10-07 01:00:17.478 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread30次创建id0000000004
  11. 2023-10-07 01:00:17.478 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread11次创建id0000000005
  12. 2023-10-07 01:00:17.478 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread01次创建id0000000006
  13. 2023-10-07 01:00:17.478 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread41次创建id0000000007
  14. 2023-10-07 01:00:17.478 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread21次创建id0000000008
  15. 2023-10-07 01:00:17.478 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread31次创建id0000000009
  16. 2023-10-07 01:00:17.478 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread12次创建id0000000010
  17. 2023-10-07 01:00:17.478 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread02次创建id0000000011
  18. 2023-10-07 01:00:17.478 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread42次创建id0000000012
  19. 2023-10-07 01:00:17.478 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread22次创建id0000000013
  20. 2023-10-07 01:00:17.478 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread32次创建id0000000014
  21. 2023-10-07 01:00:17.478 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread03次创建id0000000016
  22. 2023-10-07 01:00:17.478 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread13次创建id0000000015
  23. 2023-10-07 01:00:17.490 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread43次创建id0000000017
  24. 2023-10-07 01:00:17.490 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread23次创建id0000000018
  25. 2023-10-07 01:00:17.490 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread33次创建id0000000019
  26. 2023-10-07 01:00:17.490 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread04次创建id0000000020
  27. 2023-10-07 01:00:17.490 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread14次创建id0000000021
  28. 2023-10-07 01:00:17.490 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread44次创建id0000000023
  29. 2023-10-07 01:00:17.490 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread24次创建id0000000022
  30. 2023-10-07 01:00:17.490 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread05次创建id0000000025
  31. 2023-10-07 01:00:17.490 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread34次创建id0000000024
  32. 2023-10-07 01:00:17.490 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread15次创建id0000000026
  33. 2023-10-07 01:00:17.498 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread45次创建id0000000027
  34. 2023-10-07 01:00:17.498 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread25次创建id0000000028
  35. 2023-10-07 01:00:17.498 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread06次创建id0000000029
  36. 2023-10-07 01:00:17.498 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread16次创建id0000000030
  37. 2023-10-07 01:00:17.498 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread35次创建id0000000031
  38. 2023-10-07 01:00:17.498 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread46次创建id0000000032
  39. 2023-10-07 01:00:17.498 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread26次创建id0000000033
  40. 2023-10-07 01:00:17.498 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread07次创建id0000000034
  41. 2023-10-07 01:00:17.498 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread17次创建id0000000035
  42. 2023-10-07 01:00:17.498 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread36次创建id0000000036
  43. 2023-10-07 01:00:17.506 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread47次创建id0000000037
  44. 2023-10-07 01:00:17.506 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread27次创建id0000000038
  45. 2023-10-07 01:00:17.508 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread08次创建id0000000039
  46. 2023-10-07 01:00:17.508 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread18次创建id0000000040
  47. 2023-10-07 01:00:17.508 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread37次创建id0000000041
  48. 2023-10-07 01:00:17.508 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread48次创建id0000000042
  49. 2023-10-07 01:00:17.508 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread28次创建id0000000043
  50. 2023-10-07 01:00:17.508 [thread0] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread09次创建id0000000044
  51. 2023-10-07 01:00:17.508 [thread1] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread19次创建id0000000045
  52. 2023-10-07 01:00:17.508 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread38次创建id0000000046
  53. 2023-10-07 01:00:17.508 [thread4] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread49次创建id0000000047
  54. 2023-10-07 01:00:17.508 [thread2] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread29次创建id0000000048
  55. 2023-10-07 01:00:17.518 [thread3] INFO c.bubble.zk_demo.curator.namingserver.IDMakerTest --- 线程thread39次创建id0000000049

基于Zookeeper实现SnowFlakeID算法

Twitter(推特)的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字,如图10-2所示。这个64bit被划分成四个部分,其中后面三个部分分别表示时间戳、工作机器ID、序列号。

SnowFlakeID的四个部分,具体介绍如下:

(1)第一位 占用1 bit,其值始终是0,没有实际作用。

(2)时间戳 占用41 bit,精确到毫秒,总共可以容纳约69年的时间。

(3)工作机器id占用10 bit,最多可以容纳1024个节点。

(4)序列号 占用12 bit。这个值在同一毫秒同一节点上从0开始不断累加,最多可以累加到4095。

在工作节点达到1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数量为: 1024 * 4096 =4194304,在绝大多数并发场景下都是够用的。

SnowFlake算法的优点:

  • 生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性。
  • 容量大,每秒可生成几百万个ID。
  • ID呈趋势递增,后续插入数据库的索引树时,性能较高。

SnowFlake算法的缺点:

  • 依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成ID冲突,或者ID乱序。
  • 在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险。

基于zookeeper实现雪花算法:

  1. public class SnowflakeIdGenerator {
  2. /**
  3. * 单例
  4. */
  5. public static SnowflakeIdGenerator instance =
  6. new SnowflakeIdGenerator();
  7. /**
  8. * 初始化单例
  9. *
  10. * @param workerId 节点Id,最大8091
  11. * @return the 单例
  12. */
  13. public synchronized void init(long workerId) {
  14. if (workerId > MAX_WORKER_ID) {
  15. // zk分配的workerId过大
  16. throw new IllegalArgumentException("woker Id wrong: " + workerId);
  17. }
  18. instance.workerId = workerId;
  19. }
  20. private SnowflakeIdGenerator() {
  21. }
  22. /**
  23. * 开始使用该算法的时间为: 2017-01-01 00:00:00
  24. */
  25. private static final long START_TIME = 1483200000000L;
  26. /**
  27. * worker id 的bit数,最多支持8192个节点
  28. */
  29. private static final int WORKER_ID_BITS = 13;
  30. /**
  31. * 序列号,支持单节点最高每毫秒的最大ID数1024
  32. */
  33. private final static int SEQUENCE_BITS = 10;
  34. /**
  35. * 最大的 worker id ,8091
  36. * -1 的补码(二进制全1)右移13位, 然后取反
  37. */
  38. private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
  39. /**
  40. * 最大的序列号,1023
  41. * -1 的补码(二进制全1)右移10位, 然后取反
  42. */
  43. private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);
  44. /**
  45. * worker 节点编号的移位
  46. */
  47. private final static long WORKER_ID_SHIFT = SEQUENCE_BITS;
  48. /**
  49. * 时间戳的移位
  50. */
  51. private final static long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;
  52. /**
  53. * 该项目的worker 节点 id
  54. */
  55. private long workerId;
  56. /**
  57. * 上次生成ID的时间戳
  58. */
  59. private long lastTimestamp = -1L;
  60. /**
  61. * 当前毫秒生成的序列
  62. */
  63. private long sequence = 0L;
  64. /**
  65. * Next id long.
  66. *
  67. * @return the nextId
  68. */
  69. public Long nextId() {
  70. return generateId();
  71. }
  72. /**
  73. * 生成唯一id的具体实现
  74. */
  75. private synchronized long generateId() {
  76. long current = System.currentTimeMillis();
  77. if (current < lastTimestamp) {
  78. // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1
  79. return -1;
  80. }
  81. if (current == lastTimestamp) {
  82. // 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1
  83. sequence = (sequence + 1) & MAX_SEQUENCE;
  84. if (sequence == MAX_SEQUENCE) {
  85. // 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳
  86. current = this.nextMs(lastTimestamp);
  87. }
  88. } else {
  89. // 当前的时间戳已经是下一个毫秒
  90. sequence = 0L;
  91. }
  92. // 更新上次生成id的时间戳
  93. lastTimestamp = current;
  94. // 进行移位操作生成int64的唯一ID
  95. //时间戳右移动23位
  96. long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;
  97. //workerId 右移动10位
  98. long workerId = this.workerId << WORKER_ID_SHIFT;
  99. return time | workerId | sequence;
  100. }
  101. /**
  102. * 阻塞到下一个毫秒
  103. */
  104. private long nextMs(long timeStamp) {
  105. long current = System.currentTimeMillis();
  106. while (current <= timeStamp) {
  107. current = System.currentTimeMillis();
  108. }
  109. return current;
  110. }
  111. }

测试

  1. @Slf4j
  2. public class SnowflakeIdTest {
  3. /**
  4. * The entry point of application.
  5. *
  6. * @param args the input arguments
  7. * @throws InterruptedException the interrupted exception
  8. */
  9. public static void main(String[] args) throws InterruptedException
  10. {
  11. //创建worker节点
  12. long workerId = SnowflakeIdWorker.instance.getId();
  13. SnowflakeIdGenerator.instance.init(workerId);
  14. ExecutorService threadPool = Executors.newFixedThreadPool(10);
  15. final HashSet idSet = new HashSet();
  16. Collections.synchronizedCollection(idSet);
  17. long start = System.currentTimeMillis();
  18. log.info(" start generate id *");
  19. int threadCount = 10;
  20. int turn = 50000;
  21. CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  22. for (int i = 0; i < threadCount; i++)
  23. threadPool.execute(() ->
  24. {
  25. for (long j = 0; j < turn; j++)
  26. {
  27. long id = SnowflakeIdGenerator.instance.nextId();
  28. synchronized (idSet)
  29. {
  30. if (j % 10000 == 0)
  31. {
  32. log.info("线程{}生成第{}个 id 为:{}",
  33. Thread.currentThread().getName(),j,id);
  34. }
  35. idSet.add(id);
  36. }
  37. }
  38. countDownLatch.countDown();
  39. });
  40. countDownLatch.await(50000, TimeUnit.MICROSECONDS);
  41. threadPool.shutdown();
  42. threadPool.awaitTermination(10, TimeUnit.SECONDS);
  43. long end = System.currentTimeMillis();
  44. log.info(" end generate id ");
  45. log.info("* cost " + (end - start) + " ms!");
  46. }
  47. }

运行结果

  1. 2023-10-07 01:12:30.441 [main-EventThread] INFO o.a.curator.framework.state.ConnectionStateManager --- State change: CONNECTED
  2. 2023-10-07 01:12:30.442 [Curator-ConnectionStateManager-0] INFO com.bubble.zk_demo.curator.CuratorStandaloneBase --- 连接成功!
  3. 2023-10-07 01:12:30.448 [main-EventThread] INFO org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  4. 2023-10-07 01:12:30.449 [main-EventThread] INFO org.apache.curator.framework.imps.EnsembleTracker --- New config event received: {}
  5. 2023-10-07 01:12:30.474 [main] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- start generate id *
  6. 2023-10-07 01:12:30.476 [pool-4-thread-1] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第0 id 为:1790262749490577408
  7. 2023-10-07 01:12:30.476 [pool-4-thread-3] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第0 id 为:1790262749490577410
  8. 2023-10-07 01:12:30.476 [pool-4-thread-6] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第0 id 为:1790262749490577412
  9. 2023-10-07 01:12:30.476 [pool-4-thread-4] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第0 id 为:1790262749490577453
  10. 2023-10-07 01:12:30.476 [pool-4-thread-2] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第0 id 为:1790262749490577409
  11. 2023-10-07 01:12:30.476 [pool-4-thread-8] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第0 id 为:1790262749490577477
  12. 2023-10-07 01:12:30.476 [pool-4-thread-5] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第0 id 为:1790262749490577468
  13. 2023-10-07 01:12:30.477 [pool-4-thread-7] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第0 id 为:1790262749490577416
  14. 2023-10-07 01:12:30.478 [pool-4-thread-10] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第0 id 为:1790262749490577482
  15. 2023-10-07 01:12:30.478 [pool-4-thread-9] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第0 id 为:1790262749490577481
  16. 2023-10-07 01:12:30.547 [pool-4-thread-6] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第10000 id 为:1790262750086168838
  17. 2023-10-07 01:12:30.556 [pool-4-thread-4] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第10000 id 为:1790262750161666780
  18. 2023-10-07 01:12:30.578 [pool-4-thread-7] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第10000 id 为:1790262750346216432
  19. 2023-10-07 01:12:30.594 [pool-4-thread-2] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第10000 id 为:1790262750480433418
  20. 2023-10-07 01:12:30.597 [pool-4-thread-8] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第10000 id 为:1790262750505599648
  21. 2023-10-07 01:12:30.608 [pool-4-thread-8] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第20000 id 为:1790262750597874351
  22. 2023-10-07 01:12:30.633 [pool-4-thread-6] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第20000 id 为:1790262750807589505
  23. 2023-10-07 01:12:30.649 [pool-4-thread-4] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第20000 id 为:1790262750941807275
  24. 2023-10-07 01:12:30.666 [pool-4-thread-9] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第10000 id 为:1790262751084413200
  25. 2023-10-07 01:12:30.670 [pool-4-thread-1] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第10000 id 为:1790262751117967610
  26. 2023-10-07 01:12:30.683 [pool-4-thread-3] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第10000 id 为:1790262751227020199
  27. 2023-10-07 01:12:30.687 [pool-4-thread-2] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第20000 id 为:1790262751260574361
  28. 2023-10-07 01:12:30.694 [pool-4-thread-10] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第10000 id 为:1790262751319293986
  29. 2023-10-07 01:12:30.699 [pool-4-thread-5] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第10000 id 为:1790262751361237527
  30. 2023-10-07 01:12:30.699 [pool-4-thread-8] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第30000 id 为:1790262751361237589
  31. 2023-10-07 01:12:30.720 [pool-4-thread-4] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第30000 id 为:1790262751537398236
  32. 2023-10-07 01:12:30.738 [pool-4-thread-6] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第30000 id 为:1790262751688393287
  33. 2023-10-07 01:12:30.751 [pool-4-thread-7] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第20000 id 为:1790262751797445477
  34. 2023-10-07 01:12:30.758 [pool-4-thread-2] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第30000 id 为:1790262751856165768
  35. 2023-10-07 01:12:30.765 [pool-4-thread-1] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第20000 id 为:1790262751914885757
  36. 2023-10-07 01:12:30.768 [pool-4-thread-3] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第20000 id 为:1790262751940051228
  37. 2023-10-07 01:12:30.775 [pool-4-thread-9] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第20000 id 为:1790262751998772139
  38. 2023-10-07 01:12:30.788 [pool-4-thread-10] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第20000 id 为:1790262752107823274
  39. 2023-10-07 01:12:30.810 [pool-4-thread-4] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-4生成第40000 id 为:1790262752292372664
  40. 2023-10-07 01:12:30.814 [pool-4-thread-3] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第30000 id 为:1790262752325927641
  41. 2023-10-07 01:12:30.824 [pool-4-thread-3] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-3生成第40000 id 为:1790262752409813500
  42. 2023-10-07 01:12:30.832 [pool-4-thread-7] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第30000 id 为:1790262752476922621
  43. 2023-10-07 01:12:30.850 [pool-4-thread-2] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-2生成第40000 id 为:1790262752627916964
  44. 2023-10-07 01:12:30.858 [pool-4-thread-6] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-6生成第40000 id 为:1790262752695025715
  45. 2023-10-07 01:12:30.872 [pool-4-thread-9] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第30000 id 为:1790262752812466932
  46. 2023-10-07 01:12:30.885 [pool-4-thread-10] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第30000 id 为:1790262752921518631
  47. 2023-10-07 01:12:30.898 [pool-4-thread-8] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-8生成第40000 id 为:1790262753030570681
  48. 2023-10-07 01:12:30.906 [pool-4-thread-7] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-7生成第40000 id 为:1790262753097679390
  49. 2023-10-07 01:12:30.929 [pool-4-thread-1] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第30000 id 为:1790262753290617713
  50. 2023-10-07 01:12:30.959 [pool-4-thread-5] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第20000 id 为:1790262753542275827
  51. 2023-10-07 01:12:30.966 [pool-4-thread-1] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-1生成第40000 id 为:1790262753600996220
  52. 2023-10-07 01:12:30.972 [pool-4-thread-9] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-9生成第40000 id 为:1790262753651327184
  53. 2023-10-07 01:12:30.990 [pool-4-thread-10] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-10生成第40000 id 为:1790262753802322148
  54. 2023-10-07 01:12:31.017 [pool-4-thread-5] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第30000 id 为:1790262754028815348
  55. 2023-10-07 01:12:31.029 [pool-4-thread-5] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- 线程pool-4-thread-5生成第40000 id 为:1790262754129478187
  56. 2023-10-07 01:12:31.040 [main] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- end generate id
  57. 2023-10-07 01:12:31.040 [main] INFO c.bubble.zk_demo.curator.namingserver.SnowflakeIdTest --- * cost 566 ms!
  58. Process finished with exit code 0

3. zookeeper实现分布式队列

常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中还是比较好用的。

3.1 设计思路

1.创建队列根节点:在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。

2.实现入队操作:当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。

3.实现出队操作:当需要从队列中取出一个元素时,可以执行以下操作:

  • 获取根节点下的所有子节点。
  • 找到具有最小序号的子节点。
  • 获取该节点的数据。
  • 删除该节点。
  • 返回节点的数据。
  1. /**
  2. * 入队
  3. * @param data
  4. * @throws Exception
  5. */
  6. public void enqueue(String data) throws Exception {
  7. // 创建临时有序子节点
  8. zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),
  9. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  10. }
  11. /**
  12. * 出队
  13. * @return
  14. * @throws Exception
  15. */
  16. public String dequeue() throws Exception {
  17. while (true) {
  18. List<String> children = zk.getChildren(QUEUE_ROOT, false);
  19. if (children.isEmpty()) {
  20. return null;
  21. }
  22. Collections.sort(children);
  23. for (String child : children) {
  24. String childPath = QUEUE_ROOT + "/" + child;
  25. try {
  26. byte[] data = zk.getData(childPath, false, null);
  27. zk.delete(childPath, -1);
  28. return new String(data, StandardCharsets.UTF_8);
  29. } catch (KeeperException.NoNodeException e) {
  30. // 节点已被其他消费者删除,尝试下一个节点
  31. }
  32. }
  33. }
  34. }

3.2 使用Apache Curator实现分布式队列

Apache Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。

  1. public class CuratorDistributedQueueDemo {
  2. private static final String QUEUE_ROOT = "/curator_distributed_queue";
  3. public static void main(String[] args) throws Exception {
  4. CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",
  5. new ExponentialBackoffRetry(1000, 3));
  6. client.start();
  7. // 定义队列序列化和反序列化
  8. QueueSerializer<String> serializer = new QueueSerializer<String>() {
  9. @Override
  10. public byte[] serialize(String item) {
  11. return item.getBytes();
  12. }
  13. @Override
  14. public String deserialize(byte[] bytes) {
  15. return new String(bytes);
  16. }
  17. };
  18. // 定义队列消费者
  19. QueueConsumer<String> consumer = new QueueConsumer<String>() {
  20. @Override
  21. public void consumeMessage(String message) throws Exception {
  22. System.out.println("消费消息: " + message);
  23. }
  24. @Override
  25. public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
  26. }
  27. };
  28. // 创建分布式队列
  29. DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT)
  30. .buildQueue();
  31. queue.start();
  32. // 生产消息
  33. for (int i = 0; i < 5; i++) {
  34. String message = "Task-" + i;
  35. System.out.println("生产消息: " + message);
  36. queue.put(message);
  37. Thread.sleep(1000);
  38. }
  39. Thread.sleep(10000);
  40. queue.close();
  41. client.close();
  42. }
  43. }

3.3 注意事项

使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。

在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。当然,并非所有场景都需要指定锁节点路径。如果您的应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。

  1. // 创建分布式队列
  2. QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
  3. //指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
  4. queue = builder.lockPath("/orderlock").buildQueue();
  5. //启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
  6. queue.start();
标签: zookeeper 服务器

本文转载自: https://blog.csdn.net/qq_45061342/article/details/141297942
版权归原作者 bubble小拾 所有, 如有侵权,请联系我们删除。

“Zookeeper应用场景实战一”的评论:

还没有评论