0


使用kafka-clients依赖 集成kafka

1.导入依赖配置

pom文件

XML

<dependency>
<groupId>com.ky.common</groupId>
<artifactId>ky-common-kafka</artifactId>
<version>1.8-SNAPSHOT</version>
</dependency>

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

yaml文件

在项目的yaml中增加相关依赖信息,如公司业务项目中的

YAML
kafka:
bootstrap:
servers: 172.16.39.227:9092
#阿里云kafka集群

  • *ali_bootstrap:
    servers: 172.17.0.34:9092

configuration类

简单情境下,直接使用kafka进行测试可以在producer和customer类中写,为求模块化,这里才开使用,合并以及拆开都不影响使用。

  • kafkaProducer类

Java
public abstract class AbstractKafkaProducer{
// 需要bean类实现的
public abstract KafkaProducerConfig getConfig();

 // 内部提供的函数
 public void send(String topic,String key,String vlaue){
     this.getProducer().send(new ProducerRecord(topic,key,value);
 }
 // 内部1: 在bean注入后第一时间执行
 *@PostConstruct*
 *public *Producer<String, String> initProducer() {
     *if *(*this*.producer != *null*) {
         *return this*.producer;
     } *else *{
         KafkaProducerConfig kafkaConfig = *this*.getKafkaConfig();
         *if *(kafkaConfig != *null *&& StringUtils.equalsAnyIgnoreCase(kafkaConfig.getEnable(), *new *CharSequence[]{"true", "open"})) {
             *this*.logger.info("abstract kafka producer,init producer,properties:{}", JSON.toJSONString(kafkaConfig));
             Properties properties = *new *Properties();
             String clientId = kafkaConfig.getProjectName() + "_producer_" + InetAddressUtils.getHostAddress();
             properties.setProperty("client.id", clientId);
             properties.put("bootstrap.servers", kafkaConfig.getBootstrapServers());
             properties.put("acks", "all");
             properties.put("retries", 1);
             properties.put("batch.size", 16384);
             properties.put("linger.ms", 50);
             properties.put("buffer.memory", 33554432);
             properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             *this*.producer = *new *KafkaProducer(properties);
             *return this*.producer;
         } *else *{
             *this*.logger.error("abstract kafka producer,init producer fail,kafka config is null,properties:{}", JSON.toJSONString(kafkaConfig));
             *return null*;
         }
     }
 }
 // 内部2: 在bean销毁前执行
 *@PreDestroy*
 *public void *close() {
     *if *(*this*.producer != *null*) {
         *try *{
             *this*.producer.close();
         } *catch *(Exception var2) {
             *this*.logger.error("abstract kafka producer,producer close fail,error                             message:{}", var2.getMessage(), var2);
         }
     }
 }    

}

  • kafkaCostomer类

Java
public abstract class ByteArrayAbstractKafkaConsumer{
private String clientNumber;

 // 需要bean类实现的
 public abstract  KafkaConsumerConfig getConfig();
 *public abstract *String topic();
 public abstract void handler(CustomerRecords<String, byte[]> var1);
 
 // 内部提供的
 public String clientId(String clientNumber)// 标示clientID,根据业务设置
 public void handler(){this.handler("01")} // 提供外部接口的使用
 public handler(String clientNumber){ // 配置以及实行相关
     KafkaConsumerConfig kafkaConfig = *this*.getKafkaConfig();
     *if *(kafkaConfig == *null*) {
         *this*.logger.error("abstract kafka consumer,consumer fail,kafka config is null");
     } *else *{
         // 配置读取的相关信息
        *this*.clientNumber = clientNumber;
         Duration timeout = Duration.ofMillis(kafkaConfig.getPollTimeout() <= 0 ? 500L : (*long*)kafkaConfig.getPollTimeout());
         String topic = *this*.topic();
         Properties properties = *new *Properties();
         properties.put("bootstrap.servers", kafkaConfig.getBootstrapServers());
         properties.put("group.id", kafkaConfig.getGroupId());
         properties.put("key.deserializer", StringUtils.defaultString(kafkaConfig.getKeyDeserializer(), "org.apache.kafka.common.serialization.StringDeserializer"));
         properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
         properties.put("enable.auto.commit", "true");
         properties.put("auto.commit.interval.ms", "1000");
         properties.put("auto.offset.reset", "latest");
         properties.put("session.timeout.ms", "30000");
         properties.put("max.poll.interval.ms", "300000");
         *if *(StringUtils.isNotBlank(kafkaConfig.getPartitionAssignmentStrategy())) {
             properties.put("partition.assignment.strategy", kafkaConfig.getPartitionAssignmentStrategy());
         }

         String clientId = *this*.clientId(clientNumber);
         properties.setProperty("client.id", clientId);
         List<String> subTopic = Arrays.asList(topic);
         Thread thread = *new *Thread(() -> {
             *this*.consumer(properties, subTopic, timeout, 0); // 实际的执行
         });
         thread.setName("kafka-consumer-thread-" + clientNumber + "_" + topic);
         thread.start();
    }
 }
 private void consumer(Properties properties, Liat<String> subTopic, Duration time, int retryCount){
     MDC.put("code", "0");
     *try *{
         KafkaConsumer<String, *byte*[]> consumer = *new *KafkaConsumer(properties);
         Throwable var6 = *null*;

         *try *{
             consumer.subscribe(subTopic);
             *this*.logger.info("abstract kafka consumer,consumer start,clientId:{},kafka config:{}", *this*.clientId(*this*.clientNumber), JSON.toJSONString(*this*.getKafkaConfig()));
             *while*(*true*) {
                 *while*(*true*) {
                     *try *{
                         ConsumerRecords<String, *byte*[]> records = consumer.poll(timeout);
                         *if *(records != *null *&& records.count() != 0) {
                             *this*.handler(records);
                         }
                     } *catch *(Throwable var16) {
                         AlarmLogger.error("abstract kafka consumer,consumer fail,topic:{},error message:{}", *new *Object[]{JSON.toJSONString(subTopic), var16.getMessage(), var16});
                     }
                 }
             }
         } *catch *(Throwable var17) {
             var6 = var17;
             *throw *var17;
         } *finally *{
             *if *(consumer != *null*) {
                 *if *(var6 != *null*) {
                     *try *{
                         consumer.close();
                     } *catch *(Throwable var15) {
                         var6.addSuppressed(var15);
                     }
                 } *else *{
                     consumer.close();
                 }
             }
         }
     } *catch *(Throwable var19) {
         *if *(retryCount >= 10) {
             AlarmLogger.error("system error,abstract kafka consumer,consumer fail,retry count is too long,shutdown current kafka consumer,properties:{},topic:{},retryCount:{},error message:{}", *new *Object[]{JSON.toJSONString(properties), JSON.toJSONString(subTopic), retryCount, var19.getMessage(), var19});
         } *else *{
             AlarmLogger.error("abstract kafka consumer,consumer fail,topic:{},retryCount:{},error message:{}", *new *Object[]{JSON.toJSONString(subTopic), retryCount, var19.getMessage(), var19});
             ++retryCount;
             *this*.consumer(properties, subTopic, timeout, retryCount);
         }
     }
 }    

}

2.业务操作

在producer中,先创建类实现抽象类的方法,便于后续装配

Java
@Component
public class KafkaProducer extends AbstractKafkaProducer{
@Value("${kafka.bootstrap.servers}")
*private *String bootstrapServers;

 *@Override*
  • public *KafkaProducerConfig getKafkaConfig() {
    KafkaProducerConfig kafkaProducerConfig = *new *KafkaProducerConfig();
    kafkaProducerConfig.setBootstrapServers(bootstrapServers);
    kafkaProducerConfig.setProjectName("ky-recommend-backend");
    *return *kafkaProducerConfig;
    }
    }

同理 consumer类中

Java

@Component
*public class *JsonMusicSunoConsumer *extends *AbstractJsonKafkaConsumer{
Logger logger = LoggerFactory.getLogger(JsonMusicSunoConsumer.class);

 *@Resource*
  • private *MusicLoadCsvService musicLoadCsvService;

    public JsonMusicSunoConsumer(@Qualifier("bigDataKafkaJson") KafkaConsumerConfig kafkaConsumerConfig) {
    super(kafkaConsumerConfig);
    }

    @Override

  • public *String topic() {
    *return *KafkaTopic.MUSIC_SUNO_CONTINUE.getTopic();
    }

    @Override

  • public *KafkaConsumerConfig getKafkaConfig() {
    return super.getKafkaConsumerConfig();
    }

    @Override

  • public void *handler(ConsumerRecords<String, *byte*[]> consumerRecords) {
    *for *(ConsumerRecord<String, *byte*[]> record : consumerRecords) {
    String consumerRecord = *new *String(record.value());
    *try *{
    MusicSunoKafka musicSunoKafka = JSON.parseObject(consumerRecord, MusicSunoKafka.class);
    *if *(musicSunoKafka == null) {
    logger.warn("JsonMusicSunoConsumer#handler, parse json failed, record: {}", consumerRecord);
    continue;
    }
    *if *(!musicSunoKafka.isValid()) {
    continue;
    }
    musicLoadCsvService.submitForKafkaWork(musicSunoKafka);
    } *catch *(Exception e) {
    logger.error("JsonMusicSunoConsumer#handler, handler record err, record: {}, errMsg: {}",
    consumerRecord, e.getMessage(), e);
    }
    }
    }
    }

标红地方属于自己的业务逻辑。

Java
//。。。。 submitForKafkaWork(musicSunoKafka) 的业务续写

启动调用

在启动类application中,必须先 注册消费类,让消费类运行

Java
@Override
*public void *run(String... args) {
// 注册consumer消费者

  • *musicSunoConsumer.handler();

    logger.info(" ky recommend backend application,env:{},user.home:{},current timezone:{}", env, System.getProperty("user.home"), System.getProperty("user.timezone"));

}

最终可以简单写一个test的测试向kafka传递数据,就可以查看数据,并也可以设置断点,查看步骤,like this。

Java
@Resource

  • private *KafkaProducer kafkaProducer;
    @PostMapping("/test")
    @ResponseBody
  • public *Result<Object> ProcessKafka(HttpServletRequest request){
    Environment env = getEnvironment(request);
    String value = " {"id":1,"prompt":"Apeacefulmorninginthecountryside","desc":"Inspiredbythetranquilityofrurallife,withchirpingbirdsandrustlingleaves.","diy_lrc":"Verse1:\nWakinguptothegentlebreeze,\nFieldsofgreenasfarastheeyecansee.\nChirpingbirdssingtheirsweetmelodies,\nInthispeacefulmorning,Ifeelfree.\n\nChorus:\nOh,whatabeautifulday,\nInthecountrysidewhereI'llstay.\nWitheverybreath,Ifindmyway,\nToalifeofpeaceandserenity.\n\n...","diy_music_name":"MorningSerenity"," +
    ""assign_uid":123456,"source_url":"https://example.com/countryside.mp3\",\"status\":0,\"reason\":\"\",\"music_id\":0,\"create_time\":1648486800,\"update_time\":1648519200}";
    kafkaProducer.send("music_suno_continue",value);

// MusicSunoKafka musicSunoKafka = JSON.parseObject(value, MusicSunoKafka.class);
// musicLoadCsvService.submitForKafkaWork(musicSunoKafka);

  •    *logger.info("CsvLoadController#ProcessKafka, process kafka and storage success, request: {}, env: {}", request, env);
       *return *build(ErrorCode.OK);
    
    }

ok,

相关问题

原先的公司包中有这些文件abstractProducer,ByteArrayAbstractKafkaConsumer,

Markdown

producerConfig

包含 bootstrapServers,projectName,enable
简单的一个实体类

KafkaConsumerConfig

包含partitionAssignmentStrategy,clientId,keyDeserializer等

AbstractKafkaProducer

getconfig,主要功能:send的函数,内部就是以及getProducer和initProducer和关闭
(为什么必须是自动移交(enale为true))?-- 可以刷新偏移量。

ByteArrayAbstractKafkaConsumer

topics,getconfig,handler的抽象,提供生成1调用consumer2调用handler(records)
(为什么没有关闭)-- 因为是一直获取读取的信息 可以不关闭,如果只获得特定信息,可以选择关闭

StringAbstractKafkaConsumer

和byte只是获取的区别,并有一个拼接string的函数

我看网上都是使用的spring-kafka来整合整个项目,但是该项目使用的是kafka-clients依赖。

标签: kafka java-ee

本文转载自: https://blog.csdn.net/qq_63880817/article/details/138152567
版权归原作者 法耶会输出 所有, 如有侵权,请联系我们删除。

“使用kafka-clients依赖 集成kafka”的评论:

还没有评论