0


Java Kafka 怎么写

Java Kafka

pom文件 引入的jar包是 spring-kafka
gruopid 是 org.springframework.kafka

配置文件:

kafka:
 topic:
   cbas: cmp-server-audit-formal

 listener:
   #是否开启批量消费,true表示批量消费
   batch-listener: true
   #如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法;如果队列中有消息,立即消费消息。
   poll-timeout: 15000
   #设置消费的线程数
   concurrency: 1

 #消费者配置
 consumer:
   #手动提交offset
   enable-auto-commit: false
   auto-offset-reset: latest
   #批量消费一次最大拉取的数据量
   max-poll-records: 500
   #连接超时时间
   session-timeout: 120000
   #心跳时长
   heartbeat-interval: 30000
   #拉取间隔时长
   max-poll-interval: 60000
   #消费消息最大长度 100M
   fetch-message-max-bytes: 104857600

   #cbas kafka - 消费者
   cbas:
     bootstrap-servers: 10.0.5.106:9092

配置类(写了个抽象类):

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
* Kafka consumer 相关配置
*
* @author nixuebing
* @date 2020/9/27 14:26
**/
@Configuration
@EnableKafka
public abstract class AbstractKafkaConsumerConfig {

   @Value("${kafka.consumer.enable-auto-commit}")
   private Boolean enableAutoCommit;

   @Value("${kafka.consumer.auto-offset-reset}")
   private String autoOffsetReset;

   @Value("${kafka.consumer.max-poll-records}")
   private Integer maxPollRecords;

   @Value("${kafka.consumer.session-timeout}")
   private Integer sessionTimeout;

   @Value("${kafka.consumer.heartbeat-interval}")
   private Integer heartbeatInterval;

   @Value("${kafka.consumer.max-poll-interval}")
   private Integer maxPollInterval;

   @Value("${kafka.consumer.fetch-message-max-bytes}")
   private Integer fetchMessageMaxBytes;

   @Value("${kafka.listener.batch-listener}")
   private Boolean batchListener;

   @Value("${kafka.listener.poll-timeout}")
   private Integer pollTimeout;

   @Value("${kafka.listener.concurrency}")
   private Integer concurrency;

   /**
    * Kafka Listener Container Factory
    */
   @Bean
   ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
       ConcurrentKafkaListenerContainerFactory<String, String> factory =
               new ConcurrentKafkaListenerContainerFactory<>();
       // 设置消费线程数
       factory.setConcurrency(concurrency);
       // 设置批量消费
       factory.setBatchListener(batchListener);
       factory.getContainerProperties().setPollTimeout(pollTimeout);
       factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
       return factory;
   }

   /**
    * 消费者工厂
    */
   public ConsumerFactory<String, String> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(allConsumerConfigs());
   }

   /**
    * 消费者参数配置
    */
   public Map<String, Object> allConsumerConfigs() {
       Map<String, Object> props = new HashMap<>(16);

       props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
       props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
       props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
       props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
       props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMessageMaxBytes);

       props.putAll(consumerConfigs());
       return props;
   }

   /**
    * 消費者設置
    * @return
    */
   public abstract Map<String, Object> consumerConfigs();

}

具体的配置类:

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * 资讯 consumer 相关配置
 *
 * @author nixuebing
 * @date 2020/11/10 16:19
 **/
@Configuration
@EnableKafka
public class CBASConsumerConfig extends AbstractKafkaConsumerConfig {

    @Value("${kafka.listener.cbas.security.protocol:PLAINTEXT}")
    private String securityProtocol;

    @Value("${kafka.listener.cbas.sasl.mechanism:GSSAPI}")
    private String saslMechanism;

    @Value("${kafka.consumer.cbas.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.consumer.cbas.username:}")
    private String username;

    @Value("${kafka.consumer.cbas.password:}")
    private String password;

    /**
     * Kafka Listener Container Factory
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    cbasListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = super.kafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Override
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(16);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        // 不使用配置文件方式
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\""
                + username + "\" password=\"" + password + "\";");
        return props;
    }
}

后续调用的方法:


@Slf4j
@Component
public class kafkaConsumer {

    private static final String JOB_NAME  = "datax_ads_ita_msg_user_act_stat_ds_48_file";

    @Autowired
    private ArticleDetailService articleDetailService;

    @KafkaListener(containerFactory = "cbasListenerContainerFactory", topics = "${kafka.topic.cbas}", groupId = "${kafka.consumer.cbas.group-id}")
    @NewSpan
    public void handleFormalMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) {
        String originTime = DateUtil.now();
        DateTime currentTime = DateUtil.offsetDay(Convert.toDate(originTime), -1);
        String s = currentTime.toString();
        log.debug("[CbasConsumer] ======> start == kafka开始消费");
        long offset = consumerRecord.offset();
        int partition = consumerRecord.partition();
        long timestamp = consumerRecord.timestamp();
        log.debug("[AsyncTaskService] ======> offset = {}, partition = {}, timestamp = {}",
                offset, partition, timestamp);
        String value = consumerRecord.value();
        KafkaConsumerDTO kafkaConsumerDTO = new KafkaConsumerDTO();
        kafkaConsumerDTO = JSONUtil.toBean(value, KafkaConsumerDTO.class);
        if(JOB_NAME.equals(kafkaConsumerDTO.getJobName())){
            articleDetailService.getDetailBean(s);
        }
        // 手动提交 offset
        ack.acknowledge();
        log.debug("[CbasConsumer] ======> finish");
    }

}

@KafkaListener 用于消费类,代表说你的topic产生一条数据时,只要你是对应的group,就会自动收到一条对应的消息,此时在里头写执行代码,就做到了 kafka放出消息自动执行。

标签: java kafka

本文转载自: https://blog.csdn.net/qq_34093082/article/details/132168123
版权归原作者 我喜欢山,也喜欢海 所有, 如有侵权,请联系我们删除。

“Java Kafka 怎么写”的评论:

还没有评论