0


Java实现Kafka消费者(Consumer)两种方式

实现在Spring Boot项目中监听Kafka指定topic中的消息,有两种实现思路:

一种是使用Spring Boot提供的**@KafkaListener**注解

另外一种是在kafka提供的原生java客户端中,消费者使用定时任务或者采**while(true){…}**进行消息拉取,这种方式可以避免与parent 版本出现冲突

一、@KafkaListener注解

导入依赖

        <!-- spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>版本号</version>
        </dependency>

配置文件

# kafka地址
spring.kafka.bootstrap-servers=127.0.0.1:9092
# 消费者组ID
spring.kafka.consumer.group-id=1
# 键序列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 值序列化方式
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 默认提交偏移量
spring.kafka.consumer.enable-auto-commit=true

监听类配置

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyKafkaConsumer {

    @KafkaListener(topics = {"test","dev"}) //在这里指定要监听的topic,可以监听多个
    public void listenToMessage(String message){
        System.out.println("使用注解监听到的消息"+message);
    }
}

运行效果

二、使用Kafka提供的原生java客户端中,消费者采while(true){…}用.poll()方式进行消息拉取

导入依赖

 <!--kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>版本号</version>
        </dependency>

配置文件

这里以properties文件为例

#建立与kafka集群连接的host/port组,请通过控制台公网访问获取
bootstrap.servers=127.0.0.1:9092
#用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group
group.id=1
#键的序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#值的序列化方式
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#自动提交
enable.auto.commit=true

编写配置类

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;

public class MqsConsumer {

    public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";//配置文件

    private KafkaConsumer<Object, Object> consumer;

    MqsConsumer(String path)
    {
        Properties props = new Properties();
        try {
            InputStream in = new BufferedInputStream(new FileInputStream(path));
            props.load(in);
        }catch (IOException e)
        {
            e.printStackTrace();
            return;
        }
        consumer = new KafkaConsumer<Object, Object>(props);
    }

    public MqsConsumer()
    {
        Properties props = new Properties();
        try {
            props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
        }catch (IOException e)
        {
            e.printStackTrace();
            return;
        }
        consumer = new KafkaConsumer<Object, Object>(props);
    }
    public void consume(List topics)
    {
        consumer.subscribe(topics);
    }

    public ConsumerRecords<Object, Object> poll(long timeout)
    {
        return consumer.poll(timeout);
    }

    public void close()
    {
        consumer.close();
    }

/**
     * get classloader from thread context if no classloader found in thread
     * context return the classloader which has loaded this class
     *
     * @return classloader*/

    public static ClassLoader getCurrentClassLoader()
    {
        ClassLoader classLoader = Thread.currentThread()
                .getContextClassLoader();
        if (classLoader == null)
        {
            classLoader = MqsConsumer.class.getClassLoader();
        }
        return classLoader;
    }

/**
     * 从classpath 加载配置信息
     *
     * @param configFileName 配置文件名称
     * @return 配置信息
     * @throws IOException*/

    public static Properties loadFromClasspath(String configFileName) throws IOException
    {
        ClassLoader classLoader = getCurrentClassLoader();
        Properties config = new Properties();

        List<URL> properties = new ArrayList<URL>();
        Enumeration<URL> propertyResources = classLoader
                .getResources(configFileName);
        while (propertyResources.hasMoreElements())
        {
            properties.add(propertyResources.nextElement());
        }

        for (URL url : properties)
        {
            InputStream is = null;
            try
            {
                is = url.openStream();
                config.load(is);
            }
            finally
            {
                if (is != null)
                {
                    is.close();
                    is = null;
                }
            }
        }

        return config;
    }

采用while(true){...}进行消息拉取

import com.rococo.mqs.consumer.MqsConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;

public class KafkaController {
    @PostConstruct
    public void testConsumer() throws Exception {
        MqsConsumer consumer = new MqsConsumer();
        consumer.consume(Arrays.asList("test"));//监听的topic
        try {
            while (true){
                //timeout表示消费者在没有可用消息时愿意等待的最大时间。等待期间,一旦有消息到达,poll() 会立即返回,并从新开始计时。
                ConsumerRecords<Object, Object> records = consumer.poll(1000);
                System.out.println("the numbers of topic:" + records.count());
                for (ConsumerRecord<Object, Object> record : records)
                {
                    Object value = record.value();
                    System.out.println(value);
                }
            }
        }catch (Exception e)
        {
            // 异常处理
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}

执行效果

标签: java kafka spring boot

本文转载自: https://blog.csdn.net/m0_57038084/article/details/139450619
版权归原作者 洛可可Blue 所有, 如有侵权,请联系我们删除。

“Java实现Kafka消费者(Consumer)两种方式”的评论:

还没有评论