1.导入依赖配置
pom文件
XML
<dependency>
<groupId>com.ky.common</groupId>
<artifactId>ky-common-kafka</artifactId>
<version>1.8-SNAPSHOT</version>
</dependency>
yaml文件
在项目的yaml中增加相关依赖信息,如公司业务项目中的
YAML
kafka:
bootstrap:
servers: 172.16.39.227:9092
#阿里云kafka集群
- *ali_bootstrap:
servers: 172.17.0.34:9092
configuration类
简单情境下,直接使用kafka进行测试可以在producer和customer类中写,为求模块化,这里才开使用,合并以及拆开都不影响使用。
kafkaProducer类
Java
public abstract class AbstractKafkaProducer{
// 需要bean类实现的
public abstract KafkaProducerConfig getConfig();
// 内部提供的函数
public void send(String topic,String key,String vlaue){
this.getProducer().send(new ProducerRecord(topic,key,value);
}
// 内部1: 在bean注入后第一时间执行
*@PostConstruct*
*public *Producer<String, String> initProducer() {
*if *(*this*.producer != *null*) {
*return this*.producer;
} *else *{
KafkaProducerConfig kafkaConfig = *this*.getKafkaConfig();
*if *(kafkaConfig != *null *&& StringUtils.equalsAnyIgnoreCase(kafkaConfig.getEnable(), *new *CharSequence[]{"true", "open"})) {
*this*.logger.info("abstract kafka producer,init producer,properties:{}", JSON.toJSONString(kafkaConfig));
Properties properties = *new *Properties();
String clientId = kafkaConfig.getProjectName() + "_producer_" + InetAddressUtils.getHostAddress();
properties.setProperty("client.id", clientId);
properties.put("bootstrap.servers", kafkaConfig.getBootstrapServers());
properties.put("acks", "all");
properties.put("retries", 1);
properties.put("batch.size", 16384);
properties.put("linger.ms", 50);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
*this*.producer = *new *KafkaProducer(properties);
*return this*.producer;
} *else *{
*this*.logger.error("abstract kafka producer,init producer fail,kafka config is null,properties:{}", JSON.toJSONString(kafkaConfig));
*return null*;
}
}
}
// 内部2: 在bean销毁前执行
*@PreDestroy*
*public void *close() {
*if *(*this*.producer != *null*) {
*try *{
*this*.producer.close();
} *catch *(Exception var2) {
*this*.logger.error("abstract kafka producer,producer close fail,error message:{}", var2.getMessage(), var2);
}
}
}
}
kafkaCostomer类
Java
public abstract class ByteArrayAbstractKafkaConsumer{
private String clientNumber;
// 需要bean类实现的
public abstract KafkaConsumerConfig getConfig();
*public abstract *String topic();
public abstract void handler(CustomerRecords<String, byte[]> var1);
// 内部提供的
public String clientId(String clientNumber)// 标示clientID,根据业务设置
public void handler(){this.handler("01")} // 提供外部接口的使用
public handler(String clientNumber){ // 配置以及实行相关
KafkaConsumerConfig kafkaConfig = *this*.getKafkaConfig();
*if *(kafkaConfig == *null*) {
*this*.logger.error("abstract kafka consumer,consumer fail,kafka config is null");
} *else *{
// 配置读取的相关信息
*this*.clientNumber = clientNumber;
Duration timeout = Duration.ofMillis(kafkaConfig.getPollTimeout() <= 0 ? 500L : (*long*)kafkaConfig.getPollTimeout());
String topic = *this*.topic();
Properties properties = *new *Properties();
properties.put("bootstrap.servers", kafkaConfig.getBootstrapServers());
properties.put("group.id", kafkaConfig.getGroupId());
properties.put("key.deserializer", StringUtils.defaultString(kafkaConfig.getKeyDeserializer(), "org.apache.kafka.common.serialization.StringDeserializer"));
properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "latest");
properties.put("session.timeout.ms", "30000");
properties.put("max.poll.interval.ms", "300000");
*if *(StringUtils.isNotBlank(kafkaConfig.getPartitionAssignmentStrategy())) {
properties.put("partition.assignment.strategy", kafkaConfig.getPartitionAssignmentStrategy());
}
String clientId = *this*.clientId(clientNumber);
properties.setProperty("client.id", clientId);
List<String> subTopic = Arrays.asList(topic);
Thread thread = *new *Thread(() -> {
*this*.consumer(properties, subTopic, timeout, 0); // 实际的执行
});
thread.setName("kafka-consumer-thread-" + clientNumber + "_" + topic);
thread.start();
}
}
private void consumer(Properties properties, Liat<String> subTopic, Duration time, int retryCount){
MDC.put("code", "0");
*try *{
KafkaConsumer<String, *byte*[]> consumer = *new *KafkaConsumer(properties);
Throwable var6 = *null*;
*try *{
consumer.subscribe(subTopic);
*this*.logger.info("abstract kafka consumer,consumer start,clientId:{},kafka config:{}", *this*.clientId(*this*.clientNumber), JSON.toJSONString(*this*.getKafkaConfig()));
*while*(*true*) {
*while*(*true*) {
*try *{
ConsumerRecords<String, *byte*[]> records = consumer.poll(timeout);
*if *(records != *null *&& records.count() != 0) {
*this*.handler(records);
}
} *catch *(Throwable var16) {
AlarmLogger.error("abstract kafka consumer,consumer fail,topic:{},error message:{}", *new *Object[]{JSON.toJSONString(subTopic), var16.getMessage(), var16});
}
}
}
} *catch *(Throwable var17) {
var6 = var17;
*throw *var17;
} *finally *{
*if *(consumer != *null*) {
*if *(var6 != *null*) {
*try *{
consumer.close();
} *catch *(Throwable var15) {
var6.addSuppressed(var15);
}
} *else *{
consumer.close();
}
}
}
} *catch *(Throwable var19) {
*if *(retryCount >= 10) {
AlarmLogger.error("system error,abstract kafka consumer,consumer fail,retry count is too long,shutdown current kafka consumer,properties:{},topic:{},retryCount:{},error message:{}", *new *Object[]{JSON.toJSONString(properties), JSON.toJSONString(subTopic), retryCount, var19.getMessage(), var19});
} *else *{
AlarmLogger.error("abstract kafka consumer,consumer fail,topic:{},retryCount:{},error message:{}", *new *Object[]{JSON.toJSONString(subTopic), retryCount, var19.getMessage(), var19});
++retryCount;
*this*.consumer(properties, subTopic, timeout, retryCount);
}
}
}
}
2.业务操作
在producer中,先创建类实现抽象类的方法,便于后续装配
Java
@Component
public class KafkaProducer extends AbstractKafkaProducer{
@Value("${kafka.bootstrap.servers}")
*private *String bootstrapServers;
*@Override*
- public *KafkaProducerConfig getKafkaConfig() {
KafkaProducerConfig kafkaProducerConfig = *new *KafkaProducerConfig();
kafkaProducerConfig.setBootstrapServers(bootstrapServers);
kafkaProducerConfig.setProjectName("ky-recommend-backend");
*return *kafkaProducerConfig;
}
}
同理 consumer类中
Java
@Component
*public class *JsonMusicSunoConsumer *extends *AbstractJsonKafkaConsumer{
Logger logger = LoggerFactory.getLogger(JsonMusicSunoConsumer.class);
*@Resource*
private *MusicLoadCsvService musicLoadCsvService;
public JsonMusicSunoConsumer(@Qualifier("bigDataKafkaJson") KafkaConsumerConfig kafkaConsumerConfig) {
super(kafkaConsumerConfig);
}@Override
public *String topic() {
*return *KafkaTopic.MUSIC_SUNO_CONTINUE.getTopic();
}@Override
public *KafkaConsumerConfig getKafkaConfig() {
return super.getKafkaConsumerConfig();
}@Override
public void *handler(ConsumerRecords<String, *byte*[]> consumerRecords) {
*for *(ConsumerRecord<String, *byte*[]> record : consumerRecords) {
String consumerRecord = *new *String(record.value());
*try *{
MusicSunoKafka musicSunoKafka = JSON.parseObject(consumerRecord, MusicSunoKafka.class);
*if *(musicSunoKafka == null) {
logger.warn("JsonMusicSunoConsumer#handler, parse json failed, record: {}", consumerRecord);
continue;
}
*if *(!musicSunoKafka.isValid()) {
continue;
}
musicLoadCsvService.submitForKafkaWork(musicSunoKafka);
} *catch *(Exception e) {
logger.error("JsonMusicSunoConsumer#handler, handler record err, record: {}, errMsg: {}",
consumerRecord, e.getMessage(), e);
}
}
}
}
标红地方属于自己的业务逻辑。
Java
//。。。。 submitForKafkaWork(musicSunoKafka) 的业务续写
启动调用
在启动类application中,必须先 注册消费类,让消费类运行
Java
@Override
*public void *run(String... args) {
// 注册consumer消费者
*musicSunoConsumer.handler();
logger.info(" ky recommend backend application,env:{},user.home:{},current timezone:{}", env, System.getProperty("user.home"), System.getProperty("user.timezone"));
}
最终可以简单写一个test的测试向kafka传递数据,就可以查看数据,并也可以设置断点,查看步骤,like this。
Java
@Resource
- private *KafkaProducer kafkaProducer;
@PostMapping("/test")
@ResponseBody - public *Result<Object> ProcessKafka(HttpServletRequest request){
Environment env = getEnvironment(request);
String value = " {"id":1,"prompt":"Apeacefulmorninginthecountryside","desc":"Inspiredbythetranquilityofrurallife,withchirpingbirdsandrustlingleaves.","diy_lrc":"Verse1:\nWakinguptothegentlebreeze,\nFieldsofgreenasfarastheeyecansee.\nChirpingbirdssingtheirsweetmelodies,\nInthispeacefulmorning,Ifeelfree.\n\nChorus:\nOh,whatabeautifulday,\nInthecountrysidewhereI'llstay.\nWitheverybreath,Ifindmyway,\nToalifeofpeaceandserenity.\n\n...","diy_music_name":"MorningSerenity"," +
""assign_uid":123456,"source_url":"https://example.com/countryside.mp3\",\"status\":0,\"reason\":\"\",\"music_id\":0,\"create_time\":1648486800,\"update_time\":1648519200}";
kafkaProducer.send("music_suno_continue",value);
// MusicSunoKafka musicSunoKafka = JSON.parseObject(value, MusicSunoKafka.class);
// musicLoadCsvService.submitForKafkaWork(musicSunoKafka);
}*logger.info("CsvLoadController#ProcessKafka, process kafka and storage success, request: {}, env: {}", request, env); *return *build(ErrorCode.OK);
ok,
相关问题
原先的公司包中有这些文件abstractProducer,ByteArrayAbstractKafkaConsumer,
Markdown
producerConfig
包含 bootstrapServers,projectName,enable
简单的一个实体类
KafkaConsumerConfig
包含partitionAssignmentStrategy,clientId,keyDeserializer等
AbstractKafkaProducer
getconfig,主要功能:send的函数,内部就是以及getProducer和initProducer和关闭
(为什么必须是自动移交(enale为true))?-- 可以刷新偏移量。
ByteArrayAbstractKafkaConsumer
topics,getconfig,handler的抽象,提供生成1调用consumer2调用handler(records)
(为什么没有关闭)-- 因为是一直获取读取的信息 可以不关闭,如果只获得特定信息,可以选择关闭
StringAbstractKafkaConsumer
和byte只是获取的区别,并有一个拼接string的函数
我看网上都是使用的spring-kafka来整合整个项目,但是该项目使用的是kafka-clients依赖。
版权归原作者 法耶会输出 所有, 如有侵权,请联系我们删除。