ZooKeeper 实战(六) - 生成分布式ID
文章目录
1.何为分布式ID
分布式唯一ID指在分布式系统中用于标识和区分各个实体、资源或事件的唯一标识符。由于分布式系统可能包含多个节点和多个并发操作,需要确保在整个系统中每个实体都具有唯一的标识,避免冲突和重复的情况。
分布式系统唯一ID的设计通常需要满足以下要求:
- 唯一性:每个ID在整个分布式系统中都是唯一的,不会发生冲突。
- 高并发性:ID的生成应该是高并发的,能应对绝大部分(接近于100%)的并发场景,不会成为系统的性能瓶颈。
- 可读性:ID尽可能具有可读性,方便开发人员和用户进行识别和使用。
- 可扩展性:能够适应分布式系统的扩展性需求,支持产生大规模、高并发的ID。
- 可排序性:ID可以按照一定规则进行排序,方便查询和排序操作。主键的排序性也能提升数据库索引的效率。数据库的索引结构通常是基于B树或B+树的,有序的主键可以保持索引结构的有序性,减少数据的分裂和平衡,从而提高索引的维护效率和查询性能。
2.分布式ID方案
使用ZooKeeper实现生成分布式ID可以保证分布式系统中每个节点生成的ID是唯一且递增的。以下是使用ZooKeeper实现生成分布式ID的基本步骤:
- 创建ZooKeeper节点:在ZooKeeper集群中创建一个顺序节点来实现全局递增的功能。节点路径可以选择一个统一的命名规则,例如"/appName/distributeStr"。
- 获取序列ID:每个节点需要在生成ID的时候,向ZooKeeper集群发起一个创建节点的请求。在请求的路径中添加顺序节点的标志,例如"/appName/distributeStr/id_"。ZooKeeper将为每个节点创建一个唯一的有序节点,并返回节点的路径。
- 处理序列ID:获取到ZooKeeper返回的节点路径后,需要解析路径中的序列号,也可以附加上某些信息,作为生成的分布式ID。
- 使用分布式ID:使用解析出的分布式ID进行业务处理。分布式ID可以在多个节点上同时生成,并且保证每个节点生成的ID是唯一且递增的。
3.创建ZooKeeper节点
问题来了,我们要明确创建一个什么样的ZooKeeper节点呢?首先,要知道ID是用来标识一个实体的,而不是作用于整个系统。比如一个系统中,实体有用户、商品、订单等,但是我们并不需要保证用户、商品和订单之间的ID是唯一的,而是要保证同一类实体比如用户这一个类,即任意两个用户A和B的ID不能重复。所以我们可以把系统中用于产生分布式ID的Znode的粒度控制在实体维度,例如
/app/user
节点是用户实体的分布式ID节点,
/app/product
节点是商品实体的分布式ID节点等。
/**
* 分布式ID节点缓存
*/privateMap<String,String>NodePathMap=newHashMap<>();/**
* 生成分布式ID节点路径
* @param module 模块名称
* @return
*/publicStringgetIdNodePath(Stringmodule){if(module==null||module.isBlank()){thrownewNullPointerException("请设置模块名称");}if(NodePathMap.get(module)==null){if(appName ==null|| appName.isBlank()){thrownewNullPointerException("请设置系统名称");}synchronized(IdGenerator.class){if(NodePathMap.get(module)==null){NodePathMap.put(module,"/"+appName+"/"+module);}}}returnNodePathMap.get(module);}
4.获取序列ID
先通过上一节中的临时顺序节点的路径,向ZooKeeper集群发起一个创建节点的请求并返回节点的名称,最后去除节点路径的前缀,获取最后的序列号。
/**
* 分布式ID节点的前缀
*/publicstaticfinalStringID_PREFIX="id_";/**
* 创建临时节点,并返回Id
* @param module
* @return
*/publicStringcreateNodeId(Stringmodule){String idNodePath =getIdNodePath(module);try{String idPrefix = idNodePath +"/"+ID_PREFIX;String id = client.create()// 创建节点.creatingParentsIfNeeded()// 如果需要,递归创建节点.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定创建节点类型,使用临时顺序节点.forPath(idPrefix);// 设置节点路径// 去除nodeId的前缀return id.replace(idPrefix,"");}catch(Exception e){thrownewRuntimeException(e);}}
5.处理序列ID
在ZK生成的临时顺序节点ID的前附加时日期时间,以便直观显示此ID的生成日期。
/**
* 日期格式
*/publicstaticfinalStringDATE_FORMAT="yyyyMMdd";/**
* 获取当天的日期
* 格式:yyyyMMdd
* @return
*/publicStringdatePrefix(){DateTimeFormatter yyyyMMdd =DateTimeFormatter.ofPattern(DATE_FORMAT);LocalDateTime now =LocalDateTime.now();return now.format(yyyyMMdd);}/**
* 生成id
* @param module 模块
* @return
*/publicStringnextId(Stringmodule){// id前缀以日期开头returndatePrefix()+createNodeId(module);}
6.使用分布式ID
一切从简,直接把测试方法写在启动类中😂。先启动对应的Zookeeper,然后启动同一个应用的多个实例,并发开始生成ID。
@Slf4j@SpringBootApplicationpublicclassCuratorDemoApplicationimplementsApplicationRunner{publicstaticvoidmain(String[] args){SpringApplication.run(CuratorDemoApplication.class, args);}@AutowiredprivateIdGenerator idGenerator;@Overridepublicvoidrun(ApplicationArguments args)throwsException{// 为了保证两个应用能并发生成id,同时为了方便偷懒,不想写的很复杂// 这里控制在当前时间的分钟为06时开始往下执行while(newDate().getMinutes()!=6){}System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));System.out.println("分布式id生成:"+idGenerator.nextId("test"));}}
日志输出如下:
实例1
实例2
7.完整代码
/**
* @Name: IdGenerator
* @Description: 分布式id生成器
* @Author: ahao
* @Date: 2024/8/24 23:42
*/@ComponentpublicclassIdGenerator{@Value("${spring.application.name}")privateString appName;@AutowiredprivateCuratorFramework client;/**
* 分布式ID节点的前缀
*/publicstaticfinalStringID_PREFIX="id_";/**
* 日期格式
*/publicstaticfinalStringDATE_FORMAT="yyyyMMdd";/**
* 分布式ID节点缓存
* K -> 模块名称
* V -> 分布式ID节点路径
*/privatestaticfinalMap<String,String>NodePathMap=newHashMap<>();/**
* 生成分布式ID节点路径
*
* @param module 模块名称
* @return
*/publicStringgetIdNodePath(Stringmodule){if(module==null||module.isBlank()){thrownewNullPointerException("请设置模块名称");}if(NodePathMap.get(module)==null){if(appName ==null|| appName.isBlank()){thrownewNullPointerException("请设置系统名称");}synchronized(IdGenerator.class){if(NodePathMap.get(module)==null){NodePathMap.put(module,"/"+ appName +"/"+module);}}}returnNodePathMap.get(module);}/**
* 创建临时节点,并返回Id
*
* @param module
* @return
*/publicStringcreateNodeId(Stringmodule){String idNodePath =getIdNodePath(module);// 创建节点try{String idPrefix = idNodePath +"/"+ID_PREFIX;String id = client.create()// 如果需要,递归创建节点.creatingParentsIfNeeded()// 指定创建节点类型,使用临时顺序节点.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 设置节点路径.forPath(idPrefix);// 去除nodeId的前缀return id.replace(idPrefix,"");}catch(Exception e){thrownewRuntimeException(e);}}/**
* 获取当天的日期
* 格式:yyyyMMdd
*
* @return
*/publicStringdatePrefix(){DateTimeFormatter yyyyMMdd =DateTimeFormatter.ofPattern(DATE_FORMAT);LocalDateTime now =LocalDateTime.now();return now.format(yyyyMMdd);}/**
* 生成id
*
* @param module 模块
* @return
*/publicStringnextId(Stringmodule){returndatePrefix()+createNodeId(module);}}
8.功能优化
8.1.问题思考?
1.容量问题
在ZooKeeper中,临时顺序节点的序号长度默认为10个字符。这个长度是根据ZooKeeper解析生成的顺序节点路径时确定的。顺序节点路径由节点名称前缀和10位的顺序号组成。由此可见,最多可生成100亿的节点序列号,如果实体数据量超过了这个限度怎么办?
2.并发问题
还是一样的,每次生成分布式ID,都需要在Zookeeper创建一个临时顺序节点,而且每次请求都是远程调用,其并发性能远远低于本地调用。
3.内存问题
每次生成分布式ID,都需要在Zookeeper创建一个临时顺序节点,虽然说当客户端断开时,会自动删除这些遗留的节点。但是如果客户端生成id十分频繁,并且长期没有重启或者版本更新,这将会导致大量的”无效“节点存在,浪费了Zookeeper的内存资源,甚至会导致Zookeeper崩溃。
8.2.解决并发问题
如何把远程调用转变为本地调用呢?其实这是不可能的,本身就是依托于Zookeeper去实现分布式ID的分配。所以有没有别的办法呢,想象一下以下场景:假如,储户去银行柜台取钱,每一次取1块钱,然后柜员打开保险柜,从里面拿出一块钱,关闭保险柜,然后给储户一块钱。事实是这样吗?不是,柜员旁都会有部分现金,比如100张一元钱,每次我们取钱,就不需要去保险柜拿,而是直接从旁边的100张中拿一张给我们,等100张都发放完了,再去保险柜拿。由此省去了99次开关保险柜的重复动作,显著提高效率。
所以我们参照上述场景,Zookeeper好比存放现金的保险柜,应用是柜员,线程是储户。每次生成ID时,应用向Zookeeper申请一定份额的“ID”,然后所有的ID的发放完,再去向Zookeeper申请。
又一个问题接踵而至,如何实现请求一次就获取到100个ID呢?单位转换。比如本次生成的临时顺序节点的序号为0000000001,也就是1,但是在应用中,不认为这就是单纯的数字1,而是表示1份ID,这一份有100个ID,也可就是从100-199,即序号000000000100-000000000199。代码实现如下:
/**
* 份额
*/privatestaticfinallong share =100;/**
* ID的长度 = DATE_FORMAT的长度 + zk顺序节点的序号长度 + 份额的长度
*/privatestaticfinalint length =20;/**
* 用于存储每个模块,本地份额所用的数量
* K -> 模块
* V -> 已用份额
*/privatestaticfinalMap<String,Long>LocalSerialNumber=newHashMap<>();/**
* 用于存储每个模块所分配的份额序号
* K -> 模块
* V -> 当前是第几份ID
*/privatestaticfinalMap<String,Long>RemoteSerialNumber=newHashMap<>();publicsynchronizedStringgenerateId(Stringmodule){// 先判断是否已分配本地份额long local =LocalSerialNumber.get(module)==null?0:LocalSerialNumber.get(module);long remote =RemoteSerialNumber.get(module)==null?0:RemoteSerialNumber.get(module);if(remote ==0// 表示还未分配本地份额|| local >= share // 表示本地份额已用完){// 向Zookeeper请求分配"一份"IDString nodeId =createNodeId(module);
remote =Long.valueOf(nodeId);RemoteSerialNumber.put(module, remote);// 重置local
local =0;}LocalSerialNumber.put(module, local +1);// 当前序号long sort = remote * share + local;returnpadding(sort);}/**
* 转成固定长度的字符串
*
* @param sort 序号
* @return
*/privateStringpadding(long sort){String s =String.valueOf(sort);return"0000000000".substring(0, length -DATE_FORMAT.length()- s.length())+ s;}/**
* 生成id
*
* @param module 模块
* @return
*/publicStringnextId(Stringmodule){returndatePrefix()+generateId(module);}
在100次ID的生成中,只有一次远程调用,大大提高了系统的并发性能,同时也解决了容量问题,原本只能生成100亿个ID,经过单位转换(ZK中的序号1代表应用中的100),容量提高了100倍。如果有更高要求,可以提高份额至1000,10000等。
8.3.内存问题
为了模拟创建临时顺序节点的内存资源消耗,博主创建了创建了100w个临时顺序节点。并用visualVM监控Zookeeper的堆内存的占用情况,大约消耗内存460m,结果如下。
很明显,当客户端应用长期运行并且产生大量分布式ID时,Zookeeper需要承担大量的内存消耗。有什么办法能降低这种内存消耗呢?
org.apache.curator.framework.recipes.atomic.DistributedAtomicLong
是Curator框架提供的一种分布式原子计数器的实现。内部使用乐观锁实现,当失败时,再尝试加互斥排他锁。不论是乐观锁还是排他锁,都会按照重试策略进行重试操作。
/**
* 分布式原子计数器
*
* @param module
* @return
*/publicStringdistributedAtomicLong(Stringmodule){Assert.notNull(module,"模块名称不能为空");// 获取当前模块对应的Znode路径String idNodePath =getIdNodePath(module);try{// 创建分布式原子计数器的节点,并返回路径String nodePath =ZKPaths.makePath(idNodePath,"AtomicLong");// 重试策略 ExponentialBackoffRetry参数说明:// baseSleepTimeMs 初始sleep时间// maxRetries 最大重试次数// maxSleepMs 最大sleep时间RetryPolicy retryPolicy =newExponentialBackoffRetry(50,20,200);// 可升级锁的配置信息String lockPath = idNodePath +"/AtomicLongLock";RetryPolicy lockRetryPolicy =newExponentialBackoffRetry(50,10,500);PromotedToLock promotedToLock =PromotedToLock.builder().lockPath(lockPath).retryPolicy(lockRetryPolicy).timeout(500,TimeUnit.MILLISECONDS).build();// 分布式原子计数器,首先尝试使用乐观锁进行增量操作,如果失败,则采用可选的InterProcessMutex锁进行增量操作。// 对于乐观锁和悲观锁,重试策略都用于重试增量操作。DistributedAtomicLong distributedAtomicLong =newDistributedAtomicLong(client, nodePath, retryPolicy, promotedToLock);// 自增并获取分布式原子长整型AtomicValue<Long> longAtomicValue = distributedAtomicLong.increment();int retryTimes =20;// !longAtomicValue.succeeded() && i < retryTimes 表示如果获取失败并且重试次数小于规定的20次for(int i =0;!longAtomicValue.succeeded()&& i < retryTimes; i++){// 继续增加
longAtomicValue= distributedAtomicLong.increment();}if(longAtomicValue.succeeded()){// 获取自增后的值Long obj = longAtomicValue.postValue();returnString.valueOf(obj);}else{thrownewRuntimeException("获取分布式ID失败:DistributedAtomicLong获取超时");}}catch(Exception e){thrownewRuntimeException(e);}}
优化后的内存消耗如下:
版权归原作者 QQ_AHAO 所有, 如有侵权,请联系我们删除。