实现在Spring Boot项目中监听Kafka指定topic中的消息,有两种实现思路:
一种是使用Spring Boot提供的**@KafkaListener**注解
另外一种是在kafka提供的原生java客户端中,消费者使用定时任务或者采**while(true){…}**进行消息拉取,这种方式可以避免与parent 版本出现冲突
一、@KafkaListener注解
导入依赖
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>版本号</version>
</dependency>
配置文件
# kafka地址
spring.kafka.bootstrap-servers=127.0.0.1:9092
# 消费者组ID
spring.kafka.consumer.group-id=1
# 键序列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 值序列化方式
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 默认提交偏移量
spring.kafka.consumer.enable-auto-commit=true
监听类配置
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaConsumer {
@KafkaListener(topics = {"test","dev"}) //在这里指定要监听的topic,可以监听多个
public void listenToMessage(String message){
System.out.println("使用注解监听到的消息"+message);
}
}
运行效果
二、使用Kafka提供的原生java客户端中,消费者采while(true){…}用.poll()方式进行消息拉取
导入依赖
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>版本号</version>
</dependency>
配置文件
这里以properties文件为例
#建立与kafka集群连接的host/port组,请通过控制台公网访问获取
bootstrap.servers=127.0.0.1:9092
#用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group
group.id=1
#键的序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#值的序列化方式
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#自动提交
enable.auto.commit=true
编写配置类
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
public class MqsConsumer {
public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";//配置文件
private KafkaConsumer<Object, Object> consumer;
MqsConsumer(String path)
{
Properties props = new Properties();
try {
InputStream in = new BufferedInputStream(new FileInputStream(path));
props.load(in);
}catch (IOException e)
{
e.printStackTrace();
return;
}
consumer = new KafkaConsumer<Object, Object>(props);
}
public MqsConsumer()
{
Properties props = new Properties();
try {
props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
}catch (IOException e)
{
e.printStackTrace();
return;
}
consumer = new KafkaConsumer<Object, Object>(props);
}
public void consume(List topics)
{
consumer.subscribe(topics);
}
public ConsumerRecords<Object, Object> poll(long timeout)
{
return consumer.poll(timeout);
}
public void close()
{
consumer.close();
}
/**
* get classloader from thread context if no classloader found in thread
* context return the classloader which has loaded this class
*
* @return classloader*/
public static ClassLoader getCurrentClassLoader()
{
ClassLoader classLoader = Thread.currentThread()
.getContextClassLoader();
if (classLoader == null)
{
classLoader = MqsConsumer.class.getClassLoader();
}
return classLoader;
}
/**
* 从classpath 加载配置信息
*
* @param configFileName 配置文件名称
* @return 配置信息
* @throws IOException*/
public static Properties loadFromClasspath(String configFileName) throws IOException
{
ClassLoader classLoader = getCurrentClassLoader();
Properties config = new Properties();
List<URL> properties = new ArrayList<URL>();
Enumeration<URL> propertyResources = classLoader
.getResources(configFileName);
while (propertyResources.hasMoreElements())
{
properties.add(propertyResources.nextElement());
}
for (URL url : properties)
{
InputStream is = null;
try
{
is = url.openStream();
config.load(is);
}
finally
{
if (is != null)
{
is.close();
is = null;
}
}
}
return config;
}
采用while(true){...}进行消息拉取
import com.rococo.mqs.consumer.MqsConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
public class KafkaController {
@PostConstruct
public void testConsumer() throws Exception {
MqsConsumer consumer = new MqsConsumer();
consumer.consume(Arrays.asList("test"));//监听的topic
try {
while (true){
//timeout表示消费者在没有可用消息时愿意等待的最大时间。等待期间,一旦有消息到达,poll() 会立即返回,并从新开始计时。
ConsumerRecords<Object, Object> records = consumer.poll(1000);
System.out.println("the numbers of topic:" + records.count());
for (ConsumerRecord<Object, Object> record : records)
{
Object value = record.value();
System.out.println(value);
}
}
}catch (Exception e)
{
// 异常处理
e.printStackTrace();
}finally {
consumer.close();
}
}
}
执行效果
版权归原作者 洛可可Blue 所有, 如有侵权,请联系我们删除。