0


从kafka读取数据并入库(mysql)

从kafka读取数据并入库

层级结构

在这里插入图片描述

主类

@SpringBootApplication
@MapperScan("这里填充该项目mapper的相对路径")  //用来扫描mapper包下的所有mapper
@Slf4j
public class GzBreakDownApplication {

    public static void main(String[] args) {
        SpringApplication.run(GzBreakDownApplication.class, args);
        log.info("程序启动,开始监听kafka消息");
    }

}

配置类(config)

@SpringBootConfiguration
publicclassKafkaConfig{

    @Autowired
    private KafkaProperties kafkaProperties;
    @Value("${spring.kafka.topics}")private String[] topics;
    @Value("${spring.kafka.concurrency}")private int concurrency;

    @Autowired
    private MyKafkaMessageListener myListener;// 多线程接收多主题多分区,每个主题每个分区一个线程,大大提升收取效率
    @Bean
    public ConcurrentMessageListenerContainer concurrentMessageListenerContainer(){
        ContainerProperties containerProps =newContainerProperties(topics);
        containerProps.setMessageListener(myListener);

        Map<String, Object> map = kafkaProperties.buildConsumerProperties();
        map.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

        DefaultKafkaConsumerFactory<Integer, String> dkConsumerFactory =newDefaultKafkaConsumerFactory<>(map);
        ConcurrentMessageListenerContainer concurrentMessageListenerContainer =newConcurrentMessageListenerContainer(dkConsumerFactory, containerProps);

        concurrentMessageListenerContainer.setConcurrency(concurrency);// 允许的最大现场数//        int concurrency = concurrentMessageListenerContainer.getConcurrency();return concurrentMessageListenerContainer;}}

监听类(liston)

@Component
@Slf4j
public class MyKafkaMessageListener implements MessageListener {

    // 多线程接收消息 并处理
    @Override
    public void onMessage(Object data) {
        ConsumerRecord consumerRecord= (ConsumerRecord) data;
        StringBuffer stringBuffer=new StringBuffer();
        Object value = consumerRecord.value();
        stringBuffer.append(value);
        String s = stringBuffer.toString();
        //开始处理消息
        ...
       
    }
}

mapper类(mapper)

@Repository
public interface xxxMapper extends BaseMapper<xxxVO> {
}

service

public interface xxxService extends IService<xxxVO> {

    /**
     * 更新并入库
     * @param msg
     */
    void parseMsg(String msg);
}

service的实现类

@Service
@Slf4j
public class xxxServiceImpl extends ServiceImpl<xxxMapper, xxxVO> implements xxxService {
    @Override
    public synchronized void parseMsg(String msg) {
        log.info("数据:"+msg);

        xxxVO xxVO  = JSON.parseObject(msg, xxxVO.class);

        //查找 唯一的id是否已经存入数据库
        QueryWrapper<xxxVO> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("id",xxVO.get.Id());
        xxxVO one = this.getOne(queryWrapper);
        if(one!=null){//存在 ,判断更新时间是否大于数据库存入的更新时间

            //结果是 -1, 负数:表示 time1 小于 time2 如果是正数,则表示大于后者 如果是0:那就表示相等
            if(one.getUpdateTime().compareTo(xxVO.getUpdateTime())<=0){
                log.info("更新时间:"+xxVO.getUpdateTime()+" ,编号:"+xxVO.getId());
                UpdateWrapper<xxxVO> updateWrapper = new UpdateWrapper<>();
                updateWrapper.like("bill_id",xxxVO.getId());
                this.update(xxVO,updateWrapper);
            }
        }else{ //不存在 直接入库
            log.info("数据开始入库");
            this.save(xxVO);
        }
        log.info("入库完成");
    }
}

实体类(vo)

@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@TableName(value = "数据库表名")
public class xxxVO {

    @TableField(value = "id")  //映射数据库表字段名
    private String id ;

    @TableField(value = "update_time")
    private String updateTime ;

}

application.properties

#------mysql连接信息------
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/数据库名?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=用户名
spring.datasource.password=密码

#-----------mybatis---------
mybatis-plus.configuration.map-underscore-to-camel-case=false

#------Kafka配置信息-------
# kafka集群 三个自己配置的集群xxx
spring.kafka.bootstrap-servers=xxx,xxx,xxx

spring.kafka.consumer.group-id=groupId2

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=1000

spring.kafka.topics=自定义消费主题

# max thread size
spring.kafka.concurrency=9

logging.file.name=自定义日志位置
logging.file.max-size=300MB
标签: kafka mysql linq

本文转载自: https://blog.csdn.net/m0_62820470/article/details/134639444
版权归原作者 死磕诺崽 所有, 如有侵权,请联系我们删除。

“从kafka读取数据并入库(mysql)”的评论:

还没有评论