Kafka 在创建消费者进行消费数据时,由于可以理解成为是一个kafka 的单独线程,所以在Kafka消费数据时想要在外部对消费到的数据进行业务处理时是获取不到的,所以就需要实现一个消息回调的接口来进行数据的保存及使用。
消息回调接口实现代码如下
/**
* 消息队列接收消息回调
*/
public interface TestCallBack {
/**
* 消息队列接收消息回调
*
* @param s 消息列表
*/
void callBack(String s);
}
Kafka消费者代码实现如下
public class KafkaTest extends Thread{
private String topic;
private String ip;
private TestCallBack testCallBack;
/**
* kafka停止消费标识
*/
public static Map<String, Boolean> kafka = new ConcurrentHashMap<>();
public KafkaTest(String topic, String ip, TestCallBack testCallBack){
this.topic = topic;
this.ip = ip;
this.testCallBack = testCallBack;
}
public void run(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KafkaConsumer<String, String> indexKafkaConsumer = new KafkaConsumer<>(props);
indexKafkaConsumer.subscribe(Arrays.asList(topic));
KafkaTest.kafka.put(ip+topic, true);
while (kafka.get(ip+topic)){
ConsumerRecords<String, String> poll = indexKafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : poll) {
String value = consumerRecord.value();
testCallBack.callBack(value);
}
}
}
}
其中
ConsumerConfig.GROUP_ID_CONFIG 为消费者组Id 不同的消费者组每次消费都是新数据否则会出现消费者1消费到的数据以后 消费者2消费不到数据的情况
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为自动提交offset
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 为设置消费顺序是否重头开始消费
三种情况如下
latest (默认)
earliest
none
三者均有共同定义:
对于同一个消费者组,若已有提交的offset,则从提交的offset开始接着消费意思就是,只要这个消费者组消费过了,不管auto.offset.reset指定成什么值,效果都一样,每次启动都是已有的最新的offset开始接着往后消费
不同的点为:
latest(默认):对于同一个消费者组,若没有提交过offset,则只消费消费者连接topic后,新产生的数据
就是说如果这个topic有历史消息,现在新启动了一个消费者组,且auto.offset.reset=latest,此时已存在的历史消息无法消费到,那保持消费者组运行,如果此时topic有新消息进来了,这时新消息才会被消费到。而一旦有消费,则必然会提交offset
这时候如果该消费者组意外下线了,topic仍然有消息进来,接着该消费者组在后面恢复上线了,它仍然可以从下线时的offset处开始接着消费,此时走的就是共同定义earliest:对于同一个消费者组,若没有提交过offset,则从头开始消费
就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费,这就是与latest不同之处。
一旦该消费者组消费过topic后,此时就有该消费者组的offset了,这种情况下即使指定了auto.offset.reset=earliest,再重新启动该消费者组,效果是与latest一样的,也就是此时走的是共同的定义none:对于同一个消费者组,若没有提交过offset,会抛异常
一般生产环境基本用不到该参数
触发创建Kafka消费者代码如下
@RestController
@RequestMapping("/kafka")
public class KafkaDemo {
/**
* 线程缓存工具
*/
public static Map<String, Thread> threadHashMap = new ConcurrentHashMap<>();
/**
* Kafka消费者创建消费
* @param ip kafka连接ip
* @param topic kafka消费topic
*/
@GetMapping("/start")
public void start(@RequestParam("ip") String ip, @RequestParam("topic") String topic) {
KafkaTest kafkaDemo = new KafkaTest(topic, ip, new TestCallBack() {
@Override
public void callBack(String s) {
System.out.println("Kafka消费到的消息 : " + s);
}
});
threadHashMap.put("10.4.130.71:9092/kafkaDemo", kafkaDemo);
kafkaDemo.start();
System.out.println(123);
}
/**
* Kafka消费者停止
* @param ip kafka连接ip
* @param topic kafka消费topic
*/
@GetMapping("/stop")
public void stop(@RequestParam("ip") String ip, @RequestParam("topic") String topic) {
Thread thread = threadHashMap.get(ip + topic);
KafkaTest.kafka.put(ip + topic, false);
thread.isInterrupted();
}
}
在调用创建Kafka消费者时可以进行回调方式实现,这样就可以在外部进行消费者的消费数据进行后续逻辑实现
版权归原作者 小叶同学的猫 所有, 如有侵权,请联系我们删除。