0


springboot整合mongodb changestream

前言

changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更

想必对mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具

Change Stream 介绍

Chang Stream(变更记录流) 是指collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更。

关于changestream做如下说明,提供参考

  • 在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;
  • 但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;
  • Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;
  • 由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群

changestream可用于监听的mongodb目标类型

  • 单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持
  • 单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持
  • 整个集群,整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持

一个Change Stream Event的基本结构如下所示:

{
   _id :{<BSON Object>},
   "operationType":"<operation>",
   "fullDocument":{<document>},
   "ns":{"db":"<database>",
      "coll":"<collection"},
   "documentKey":{"_id":<ObjectId>},
   "updateDescription":{"updatedFields":{<document>},
      "removedFields":["<field>", ... ]}"clusterTime":<Timestamp>,
   "txnNumber":<NumberLong>,
   "lsid":{"id":<UUID>,
      "uid":<BinData>}}

关于上面的数据结构,做简单的解释说明,

  • _id,变更事件的Token对象
  • operationType,变更类型(见下面介绍)
  • fullDocument,文档内容
  • ns,监听的目标
  • ns.db,变更的数据库
  • ns.coll,变更的集合
  • documentKey,变更文档的键值,含_id字段
  • updateDescription,变更描述
  • updateDescription.updatedFields,变更中更新字段
  • updateDescription.removedFields,变更中删除字段
  • clusterTime,对应oplog的时间戳
  • txnNumber,事务编号,仅在多文档事务中出现,4.0版本支持
  • lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持

Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个:

  • insert,插入文档
  • delete,删除文档
  • replace,替换文档,当执行replace操作指定upsert时,可能是insert事件
  • update,更新文档,当执行update操作指定upsert时,可能是insert事件
  • invalidate,失效事件,比如执行了collection.drop或collection.rename

以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等

以上为changestream的必备理论知识,想要深入学习的话无比要了解,下面通过实操来展示下changestream的使用

环境准备

mongdb复制集群,本例的复制集群对应的mongodb版本为 4.0.X
在这里插入图片描述
登录primary节点,创建一个数据库

在这里插入图片描述

友情提醒:数据库需要提前创建

1、启动两个Mongo shell,一个操作数据库,一个watch

在其中一个窗口执行如下命令,开启监听

cursor = db.comment.watch()

在这里插入图片描述

2、在另一个窗口下,给上面的articledb插入一条数据

在这里插入图片描述

数据写入成功后,在第一个窗口下,执行下面的命令:

cursor.next()

在这里插入图片描述
说明已经成功监听到新增的数据,修改、删除事件可以做类似的操作即可

以上先通过shell窗口展示了一下changestream的使用效果,接下来,将通过程序演示下如何在客户端集成并使用changestream

Java客户端操作changestream

1、引入maven依赖

<dependency><groupId>org.mongodb</groupId><artifactId>mongo-java-driver</artifactId><version>3.12.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency>

2、测试类核心代码

importcom.mongodb.*;importcom.mongodb.client.MongoDatabase;importorg.bson.conversions.Bson;importjava.util.List;importstaticjava.util.Collections.singletonList;importcom.alibaba.fastjson.JSONObject;importcom.mongodb.MongoClient;importcom.mongodb.MongoClientURI;importcom.mongodb.client.MongoCollection;importcom.mongodb.client.MongoCursor;importcom.mongodb.client.MongoDatabase;importcom.mongodb.client.model.Aggregates;importcom.mongodb.client.model.Filters;importcom.mongodb.client.model.changestream.ChangeStreamDocument;importorg.bson.Document;importorg.bson.conversions.Bson;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.time.LocalDateTime;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importstaticjava.util.Arrays.asList;publicclassMongoTest{privatestaticLogger logger =LoggerFactory.getLogger(MongoTest.class);publicstaticvoidmain(String[] args){showmogodbdata();}privatestaticvoidshowmogodbdata(){String sURI ="mongodb://IP:27017";MongoClient mongoClient =newMongoClient(newMongoClientURI(sURI));MongoDatabase database = mongoClient.getDatabase("articledb");MongoCollection<Document> collec = database.getCollection("comment");List<Bson> pipeline =singletonList(Aggregates.match(Filters.or(Document.parse("{'fullDocument.articleid': '100007'}"),Filters.in("operationType",asList("insert","update","delete")))));MongoCursor<ChangeStreamDocument<Document>> cursor = collec.watch(pipeline).iterator();while(cursor.hasNext()){ChangeStreamDocument<Document> next = cursor.next();
            logger.info("输出mogodb的next的对应的值"+ next.toString());StringOperation= next.getOperationType().getValue();String tableNames = next.getNamespace().getCollectionName();System.out.println(tableNames);//获取主键id的值String pk_id = next.getDocumentKey().toString();//同步修改数据的操作if(next.getUpdateDescription()!=null){JSONObject jsonObject =JSONObject.parseObject(next.getUpdateDescription().getUpdatedFields().toJson());System.out.println(jsonObject);}//同步插入数据的操作if(next.getFullDocument()!=null){JSONObject jsonObject =JSONObject.parseObject(next.getFullDocument().toJson());System.out.println(jsonObject);}//同步删除数据的操作if(next.getUpdateDescription()==null&&Operation.matches("delete")){JSONObject jsonObject =JSONObject.parseObject(pk_id);System.out.println(jsonObject);}}}}

这段程序主要分为几个核心部分,做如下解释说明,

  • 连接mogodb服务端及相关配置
  • 通过pipline开启watch监听
  • 监听到特定数据库下集合的数据变化,然后打印出变化的数据

启动这段程序,观察控制台日志数据
在这里插入图片描述
在未对articledb数据库下的comment集合做任何操作之前,由于watch为检测到任何数据变化,所以无法进入到while循环中,接下来,从shell端给comment集合新增一条数据,然后再次观察控制台数据变化

在这里插入图片描述
可以看到,控制台很快就检测到变化的数据
在这里插入图片描述
以下为完整的日志数据

{operationType=OperationType{value='insert'}, resumeToken={"_data":"8262138891000000022B022C0100296E5A1004B9065629412942F8852D592B9FD441B946645F696400646213889158B116A29C3FD1140004"}, namespace=articledb.comment, destinationNamespace=null, fullDocument=Document{{_id=6213889158b116a29c3fd114, articleid=100010, content=hello kafka, userid=1010, nickname=marry}}, documentKey={"_id":{"$oid":"6213889158b116a29c3fd114"}}, clusterTime=Timestamp{value=7067142396626075650, seconds=1645447313, inc=2}, updateDescription=null, txnNumber=null, lsid=null}

至于在业务中的具体使用,可以结合自身的情况,举例来说,应用程序只想监听修改数据的事件,那么就可以在修改数据事件的监听逻辑中,解析变化后的数据做后续的操作

springboot整合changestream

在实际开发中,更通用的场景是整合到springboot工程中使用,有过一定的开发经验的同学应该很容易想到核心的逻辑长什么样了,和canal的客户端操作类似,需要在一个配置类去监听即可

下面来看看具体的整合步骤

1、引入核心依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>

2、核心配置文件

本例演示的是基于上文搭建的mongodb复制集群


server.port=8081#mongodb配置
spring.data.mongodb.uri=mongodb://IP:27017,IP:27018,IP:27019/articledb?maxPoolSize=512

3、编写实体类,映射comment集合中的字段

importorg.springframework.data.annotation.Id;importorg.springframework.data.mongodb.core.mapping.Document;@Document(collection="comment")publicclassComment{@IdprivateString articleid;privateString content;privateString userid;privateString nickname;privateDate createdatetime;publicStringgetArticleid(){return articleid;}publicvoidsetArticleid(String articleid){this.articleid = articleid;}publicStringgetContent(){return content;}publicvoidsetContent(String content){this.content = content;}publicStringgetUserid(){return userid;}publicvoidsetUserid(String userid){this.userid = userid;}publicStringgetNickname(){return nickname;}publicvoidsetNickname(String nickname){this.nickname = nickname;}publicDategetCreatedatetime(){return createdatetime;}publicvoidsetCreatedatetime(Date createdatetime){this.createdatetime = createdatetime;}}

3、编写一个服务类

简单的添加2个用接口测试的方法

importcom.congge.entity.Comment;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.mongodb.core.MongoTemplate;importorg.springframework.data.mongodb.core.query.Criteria;importorg.springframework.data.mongodb.core.query.Query;importorg.springframework.stereotype.Service;importjava.util.List;@ServicepublicclassMongoDbService{privatestaticfinalLogger logger =LoggerFactory.getLogger(MongoDbService.class);@AutowiredprivateMongoTemplate mongoTemplate;/**
     * 查询所有
     * @return
     */publicList<Comment>findAll(){return mongoTemplate.findAll(Comment.class);}/***
     * 根据id查询
     * @param id
     * @return
     */publicCommentgetBookById(String id){Query query =newQuery(Criteria.where("articleid").is(id));return mongoTemplate.findOne(query,Comment.class);}}

4、编写一个接口

@RestControllerpublicclassCommentController{@AutowiredprivateMongoDbService mongoDbService;@GetMapping("/listAll")publicObjectlistAll(){return mongoDbService.findAll();}@GetMapping("/findById")publicObjectfindById(String id){return mongoDbService.getBookById(id);}}

启动本工程,然后浏览器调用下查询所有数据的接口,数据能正常返回,说明工程的基础结构就完成了
在这里插入图片描述

5、接下来,只需要依次添加下面3个配置类即可

MongoMessageListener 类 ,顾名思义,该类用于监听特定数据库下的集合数据变化使用的,在实际开发中,该类的作用也是非常重要的,类似于许多中间件的客户端监听程序,当监听到数据变化后,做出后续的业务响应,比如,数据入库、推送消息到kafka、发送相关的事件等等

importcom.congge.entity.Comment;importcom.mongodb.client.model.changestream.ChangeStreamDocument;importcom.mongodb.client.model.changestream.OperationType;importorg.bson.Document;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.data.mongodb.core.messaging.Message;importorg.springframework.data.mongodb.core.messaging.MessageListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMongoMessageListenerimplementsMessageListener<ChangeStreamDocument<Document>,Comment>{privatestaticLogger logger =LoggerFactory.getLogger(MongoMessageListener.class);@OverridepublicvoidonMessage(Message<ChangeStreamDocument<Document>,Comment> message){OperationType operationType = message.getRaw().getOperationType();System.out.println("操作类型为 :"+ operationType);System.out.println("变更数据主体 :"+ message.getBody().getArticleid());System.out.println("变更数据主体 :"+ message.getBody().getContent());System.out.println("变更数据主体 :"+ message.getBody().getNickname());System.out.println("变更数据主体 :"+ message.getBody().getUserid());System.out.println();/*logger.info("Received Message in collection: {},message raw: {}, message body:{}",
                message.getProperties().getCollectionName(), message.getRaw(), message.getBody());*/}}

ChangeStream 类 ,事件注册类,即开篇中提到的那几种事件类型的操作等

importcom.congge.entity.Comment;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.CommandLineRunner;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.mongodb.core.aggregation.Aggregation;importorg.springframework.data.mongodb.core.messaging.ChangeStreamRequest;importorg.springframework.data.mongodb.core.messaging.MessageListenerContainer;importorg.springframework.data.mongodb.core.query.Criteria;@ConfigurationpublicclassChangeStreamimplementsCommandLineRunner{@AutowiredprivateMongoMessageListener mongoMessageListener;@AutowiredprivateMessageListenerContainer messageListenerContainer;@Overridepublicvoidrun(String... args)throwsException{ChangeStreamRequest<Comment> request =ChangeStreamRequest.builder(mongoMessageListener).collection("comment").filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert","update","replace")))).build();
        messageListenerContainer.register(request,Comment.class);}}

MongoConfig 配置MessageListenerContainer 容器的相关参数

importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.mongodb.core.MongoTemplate;importorg.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;importorg.springframework.data.mongodb.core.messaging.MessageListenerContainer;importjava.util.concurrent.Executor;importjava.util.concurrent.Executors;@ConfigurationpublicclassMongoConfig{@BeanMessageListenerContainermessageListenerContainer(MongoTemplate mongoTemplate){Executor executor =Executors.newFixedThreadPool(5);returnnewDefaultMessageListenerContainer(mongoTemplate,executor){@OverridepublicbooleanisAutoStartup(){returntrue;}};}}

3个类添加完成后,再次启动程序,并观察控制台数据日志

测试1:通过shell窗口登录primary节点,并给comment集合添加一条数据
在这里插入图片描述

几乎是实时的监听到事件操作的数据变化,下面是完整的输出日志

测试2:通过shell窗口删除上面新增的这条数据

在这里插入图片描述

典型应用场景

数据迁移

如果一个系统的数据需要迁移到另一个系统,可以考虑使用mongodb changestream这种方式,试想,如果老系统数据非常杂乱,并且文档中存在一些脏数据时,为了确保迁移后的数据能较快的投产,通过应用程序的方式,能够原始的数据做类似ETL的处理,这样更加方便

应用监控

如果您的系统对数据监管较为严格,可以考虑使用changestream这种方式,订阅特定事件的数据操作,比如修改和删除数据的事件,然后及时的发送告警通知

对接大数据应用

我们知道,mongodb作为一款性能优秀的分布式文档型数据库,其实是可以存储海量数据的,在一些大数据场景下,比如下游其他的应用采用大数据技术,需要对mongo中的数据做轨迹行为分析,changestream就是一种不错的选择,当监听到特定事件的数据变化时,向消息队列,比如kafka推送相应的消息,下游相关的大数据应用就可以做后续的业务处理了


本文转载自: https://blog.csdn.net/zhangcongyi420/article/details/123054439
版权归原作者 小码农叔叔 所有, 如有侵权,请联系我们删除。

“springboot整合mongodb changestream”的评论:

还没有评论