从kafka读取数据并入库
层级结构
主类
@SpringBootApplication
@MapperScan("这里填充该项目mapper的相对路径") //用来扫描mapper包下的所有mapper
@Slf4j
public class GzBreakDownApplication {
public static void main(String[] args) {
SpringApplication.run(GzBreakDownApplication.class, args);
log.info("程序启动,开始监听kafka消息");
}
}
配置类(config)
@SpringBootConfiguration
publicclassKafkaConfig{
@Autowired
private KafkaProperties kafkaProperties;
@Value("${spring.kafka.topics}")private String[] topics;
@Value("${spring.kafka.concurrency}")private int concurrency;
@Autowired
private MyKafkaMessageListener myListener;// 多线程接收多主题多分区,每个主题每个分区一个线程,大大提升收取效率
@Bean
public ConcurrentMessageListenerContainer concurrentMessageListenerContainer(){
ContainerProperties containerProps =newContainerProperties(topics);
containerProps.setMessageListener(myListener);
Map<String, Object> map = kafkaProperties.buildConsumerProperties();
map.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
DefaultKafkaConsumerFactory<Integer, String> dkConsumerFactory =newDefaultKafkaConsumerFactory<>(map);
ConcurrentMessageListenerContainer concurrentMessageListenerContainer =newConcurrentMessageListenerContainer(dkConsumerFactory, containerProps);
concurrentMessageListenerContainer.setConcurrency(concurrency);// 允许的最大现场数// int concurrency = concurrentMessageListenerContainer.getConcurrency();return concurrentMessageListenerContainer;}}
监听类(liston)
@Component
@Slf4j
public class MyKafkaMessageListener implements MessageListener {
// 多线程接收消息 并处理
@Override
public void onMessage(Object data) {
ConsumerRecord consumerRecord= (ConsumerRecord) data;
StringBuffer stringBuffer=new StringBuffer();
Object value = consumerRecord.value();
stringBuffer.append(value);
String s = stringBuffer.toString();
//开始处理消息
...
}
}
mapper类(mapper)
@Repository
public interface xxxMapper extends BaseMapper<xxxVO> {
}
service
public interface xxxService extends IService<xxxVO> {
/**
* 更新并入库
* @param msg
*/
void parseMsg(String msg);
}
service的实现类
@Service
@Slf4j
public class xxxServiceImpl extends ServiceImpl<xxxMapper, xxxVO> implements xxxService {
@Override
public synchronized void parseMsg(String msg) {
log.info("数据:"+msg);
xxxVO xxVO = JSON.parseObject(msg, xxxVO.class);
//查找 唯一的id是否已经存入数据库
QueryWrapper<xxxVO> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id",xxVO.get.Id());
xxxVO one = this.getOne(queryWrapper);
if(one!=null){//存在 ,判断更新时间是否大于数据库存入的更新时间
//结果是 -1, 负数:表示 time1 小于 time2 如果是正数,则表示大于后者 如果是0:那就表示相等
if(one.getUpdateTime().compareTo(xxVO.getUpdateTime())<=0){
log.info("更新时间:"+xxVO.getUpdateTime()+" ,编号:"+xxVO.getId());
UpdateWrapper<xxxVO> updateWrapper = new UpdateWrapper<>();
updateWrapper.like("bill_id",xxxVO.getId());
this.update(xxVO,updateWrapper);
}
}else{ //不存在 直接入库
log.info("数据开始入库");
this.save(xxVO);
}
log.info("入库完成");
}
}
实体类(vo)
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@TableName(value = "数据库表名")
public class xxxVO {
@TableField(value = "id") //映射数据库表字段名
private String id ;
@TableField(value = "update_time")
private String updateTime ;
}
application.properties
#------mysql连接信息------
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/数据库名?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=用户名
spring.datasource.password=密码
#-----------mybatis---------
mybatis-plus.configuration.map-underscore-to-camel-case=false
#------Kafka配置信息-------
# kafka集群 三个自己配置的集群xxx
spring.kafka.bootstrap-servers=xxx,xxx,xxx
spring.kafka.consumer.group-id=groupId2
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.topics=自定义消费主题
# max thread size
spring.kafka.concurrency=9
logging.file.name=自定义日志位置
logging.file.max-size=300MB
本文转载自: https://blog.csdn.net/m0_62820470/article/details/134639444
版权归原作者 死磕诺崽 所有, 如有侵权,请联系我们删除。
版权归原作者 死磕诺崽 所有, 如有侵权,请联系我们删除。