0


Kafka 实战演练:创建、配置与测试 Kafka全面教程

文章目录

本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得

1.配置文件

  1. Yml配置
spring:
  kafka:
    bootstrap-servers: 
    consumer:
      group-id: iot-testaaaaaaaaaa11aaaaaaaaa
      auto-offset-reset: earliest
      key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
    properties:
      security:
        protocol:SASL_SSL
      sasl:
        mechanism:PLAIN
        jaas:
          config:org.apache.kafka.common.security.plain.PlainLoginModule required 
              username="" 
              password="";
      ssl:
        truststore:
          type:JKS
          location: src/main/resources/client.truststore.jks
          password: 
        endpoint.identification.algorithm:

yaml配置

2.消费者

1.注解方式

@KafkaListener(topics ={"abcd"})publicvoidlisten(ConsumerRecord<?,?> record){Optional<?> kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Object message = kafkaMessage.get();System.out.println("---->"+record);System.out.println("---->"+message);}}

注解方式

2.KafkaConsumer

/**
 * @author XHao
 */publicclassMqsConsumer{publicstaticfinalStringCONFIG_CONSUMER_FILE_NAME="mqs.sdk.consumer.properties";privateKafkaConsumer<Object,Object> consumer;MqsConsumer(String path){Properties props =newProperties();try{InputStream in =newBufferedInputStream(newFileInputStream(path));
            props.load(in);}catch(IOException e){
            e.printStackTrace();return;}
        consumer =newKafkaConsumer<Object,Object>(props);}publicMqsConsumer(){Properties props =newProperties();try{
            props =loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);}catch(IOException e){
            e.printStackTrace();return;}
        consumer =newKafkaConsumer<Object,Object>(props);}publicvoidconsume(List topics){
        consumer.subscribe(topics);}publicConsumerRecords<Object,Object>poll(long timeout){return consumer.poll(timeout);}publicvoidclose(){
        consumer.close();}/**
     * get classloader from thread context if no classloader found in thread
     * context return the classloader which has loaded this class
     *
     * @return classloader
     */publicstaticClassLoadergetCurrentClassLoader(){ClassLoader classLoader =Thread.currentThread().getContextClassLoader();if(classLoader ==null){
            classLoader =MqsConsumer.class.getClassLoader();}return classLoader;}/**
     * 从classpath 加载配置信息
     *
     * @param configFileName 配置文件名称
     * @return 配置信息
     * @throws IOException
     */publicstaticPropertiesloadFromClasspath(String configFileName)throwsIOException{ClassLoader classLoader =getCurrentClassLoader();Properties config =newProperties();List<URL> properties =newArrayList<URL>();Enumeration<URL> propertyResources = classLoader
                .getResources(configFileName);while(propertyResources.hasMoreElements()){
            properties.add(propertyResources.nextElement());}for(URL url : properties){InputStream is =null;try{
                is = url.openStream();
                config.load(is);}finally{if(is !=null){
                    is.close();
                    is =null;}}}return config;}}

在这里插入图片描述

3.依赖

1.注解依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2.KafkaConsumer依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency>

可视化大屏项目经常用到消息转换,实时状态等 记录一下吧

如果点赞多,评论多会更新详细教程,待补充。

标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/xh365647/article/details/141961629
版权归原作者 重生之豪哥 所有, 如有侵权,请联系我们删除。

“Kafka 实战演练:创建、配置与测试 Kafka全面教程”的评论:

还没有评论