0


Kafka+redis分布式锁结合使用心得总结

#kafka部分

@KafkaListener(topics = "#{'${vsmart_alert_detection_tms_send_message_topic}'.split(',')}", groupId = "${vsmart.alert.detection.consumer.group}")
public void vsmartAlertDetectionTmsSendMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
doSendMessage(record,ack);
}

private void doSendMessage(ConsumerRecord<?, ?> record, Acknowledgment ack) {
Optional message = Optional.ofNullable(record.value());
String key = record.topic() + "-" + record.partition() + "-offset:" + record.offset();
if (RedisUtils.isExistsKey(key)) {
ack.acknowledge();
return;
}

  1. try {
  2. if (message.isPresent() && (record.timestamp() > (System.currentTimeMillis() - kafkaConsumerDelayTime))) {
  3. JSONObject msg = JSONObject.parseObject(record.value().toString());
  4. msg.put(VSMART_KAFKA_MSG_POSITION_INFO, key);
  5. //具体操作
  6. }
  7. }catch (Exception e){
  8. }finally {
  9. ack.acknowledge();
  10. }

}

#redis部分

public Boolean handler(JSONObject msg) {
//解析
Boolean isOk = jsonToDetectionInfos(msg);

  1. if (!isOk) {
  2. return false;
  3. }
  4. //加锁 associatedKey()
  5. String lockKey = associatedKey();
  6. if (StrUtil.isEmpty(lockKey)) {
  7. return false;
  8. }
  9. RLock lock = SpringUtils.getBean(RedissonClient.class).getLock(lockKey);
  10. //锁的时间 根据业务需要进行调整
  11. try {
  12. boolean flag_2 = lock.tryLock(10, 300, TimeUnit.SECONDS);
  13. if (flag_2) {
  14. //加锁后执行前判断是否已经处理过kafka中相同位置的信息了
  15. if (ObjectUtil.isNotNull(msg) &&
  16. ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO)) &&
  17. RedisUtils.isExistsKey(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO))) {
  18. return false;
  19. }
  20. //具体业务操作
  21. //...
  22. return true;
  23. } else {
  24. detectionRuleBo.getLogText().append(StrUtil.format("{}-获取锁失败;", detectionRuleBo.getName())).append("<br>");
  25. return false;
  26. }
  27. } catch (Exception e) {
  28. } finally {
  29. ///释放锁
  30. if (null != lock && lock.isHeldByCurrentThread()) {
  31. if (ObjectUtil.isNotNull(msg) &&
  32. ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO))) {
  33. RedisUtils.setCacheStrExpire(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), 60 * 60);
  34. }
  35. //解锁
  36. lock.unlock();
  37. }
  38. return true;
  39. }

}

标签: 分布式 kafka redis

本文转载自: https://blog.csdn.net/weixin_40976261/article/details/134282511
版权归原作者 提莫_ 所有, 如有侵权,请联系我们删除。

“Kafka+redis分布式锁结合使用心得总结”的评论:

还没有评论