0


RabbitMq和Canal的使用

一、RabbitMq

(1)RabbitMq是什么

RabbitMq是一种主流的消息队列,消息队列(Message Queue)是一种消息的容器,主要用于实现程序(服务、进程、线程)之间的通信;队列是 FIFO(先进先出)的数据结构

(2)为什么要使用RabbitMq

一般我们在做微服务项目时,会用feign来进行RPC远程调用,这样如果在一段逻辑代码中多次调用RPC,会比较浪费时间,因为是同步的,并且改一处逻辑,很多地方都要改,耦合性较强。因此使用RabbitMq这个消息中间件,来实现远程调用,主要作用是解耦、削峰、异步。

(3)RabbitMq的安装

1、首先安装erlang

2、安装rabbitmq

3、在安装的bin目录下打开cmd启动插件使能

rabbitmq-plugins enable rabbitmq_management

4、启动rabbitmq

net start rabbitmq

5、浏览器输入: http://localhost:15672 账号密码都是guest

Linux的安装见

https://blog.csdn.net/qq_45502336/article/details/118699251

(4)RabbitMq的基本使用

(1)一对一模式:生产者和消费者间只有一个队列,一个生产者发送消息到一个队列,一个消费者从队列中取消息。

(2)一对多模式(工作队列):生产者将消息分发给多个消费者,如果生产者生产了100条消息,消费者1消费50条,消费者2消费50条。

    扩展:工作队列的能者多劳模式:

因为队列默认采用是自动确认机制,消息发过去后就自动确认,队列不清楚每个消息具体什么时间处理完,所以平均分配消息数量。

实现能者多劳:

  1. channel.basicQos(1);限制队列一次发一个消息给消费者,等消费者有了反馈,再发下一条
  2. channel.basicAck 消费完消息后手动反馈,处理快的消费者就能处理更多消息
  3. basicConsume 中的参数改为false

(3)发布/订阅模式

发布/订阅模式和Work模式的区别是:Work模式只存在一个队列,多个消费者共同消费一个队列中的消息;而发布订阅模式存在多个队列,不同的消费者可以从各自的队列中处理完全相同的消息。 实现步骤:

  1. 创建交换机(Exchange)类型是fanout(扇出)

  2. 交换机需要绑定不同的队列

  3. 不同的消费者从不同的队列中获得消息

  4. 生产者发送消息到交换机

  5. 再由交换机将消息分发到多个队列

(4)路由模式

路由模式的消息队列可以给队列绑定不同的key,生产者发送消息时,给消息设置不同的key,这样交换机在分发消息时,可以让消息路由到key匹配的队列中。

(5)主题模式

主题模式和路由模式差不多,在key中可以加入通配符:

  • 匹配任意一个单词

匹配.号隔开的多个单词

二、Canal

(1)Canal介绍

Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。

(2)实现原理

  1. canal将自己伪装为MySQL的slave,向master发送dump协议
  2. master收到dump协议,数据发生修改后推送binary log给canal
  3. canal解析binary log对象,转换为增量数据,同步到ES、Redis等

(3)安装

(1)Mysql配置

首先要让mysql开启binlog模式

1、进入mysql查看是否启动binlog

SHOW VARIABLES LIKE '%log_bin%'

log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin

windows配置文件是MySQL安装目录的my.in

修改:

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

2、创建用户

进入mysql

   create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
   GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
   FLUSH PRIVILEGES;

(2)Windows安装

Windows上直接解压.gz的压缩包,在bin目录中编辑startup.bat,按图将下面的配置改为这样

直接运行即可

(3)Linux安装

(1)上传文件到Linux,解压到canal目录中

   cd /usr/local
   mkdir canal
   tar -vxf canal.deployer-1.1.4.tar.gz -C canal

(2)配置canal

进入mysql,输入命令,记录文件名和位置 (我连接的是本机的mysql,不是虚拟机的)

show master status;

(3)进入canal目录,修改配置文件

vi conf/example/instance.properties

(4)启动Canal

进入bin目录启动服务

./startup.sh

查看日志

cat /usr/local/canal/logs/canal/canal.log

这样就说明启动成功了

(3)Canal+RabbitMQ实现数据增量同步

以实际业务需求为例

给生产者服务添加依赖

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

canal那个依赖是引入的第三方库,因为第三方库比官方的简洁好用,代码量少,因此使用第三方库,需要导入第三方maven,首先从GitHub上克隆

https://github.com/chenqian56131/spring-boot-starter-canal

在maven上添加这个pom文件即可

添加配置文件(连接RabbitMq)

//连接RabbitMq
spring:
    rabbitmq:
      host: localhost
      port: 5672
      username: admin  //rabbitmq的用户名(这是我新建的用户)
      password: 123456  //Rabbitmq的密码 (新建用户的密码)
      virtual-host: myhost //Rabbitmq创建虚拟主机的名字
//连接canal
canal:
  client:
    instances:
      example:
        host: 192.168.160.136  //canal所在的ip,我这里是虚拟机ip
        port: 11111 //这是默认端口,可以在配置文件该
        batchSize: 1000

生产者的配置类(定义交换机、队列、绑定方式等)

/**
 * RabbitMQ的配置
 */
@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Bean
    public Queue queueCourseSave() {
        return new Queue(QUEUE_COURSE_SAVE);
    }

    @Bean
    public Queue queueCourseRemove() {
        return new Queue(QUEUE_COURSE_REMOVE);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(COURSE_EXCHANGE);
    }

    @Bean
    public Binding bindCourseSave() {
        return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
    }

    @Bean
    public Binding bindCourseRemove() {
        return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
    }
}

canal实现监听(数据库的同步)

package com.blb.educourseservice.listener;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.blb.common.entity.Course;
import com.blb.educourseservice.config.RabbitMQConfig;
import com.blb.educourseservice.service.CourseService;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * 事件监听器
 */
@CanalEventListener
public class CanalListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private CourseService courseService;

    /**
     * 监听 edu_course数据库的course表
     */
    @ListenPoint(schema = "edu_course",table = "course")
    public void handleCourseUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        //判断操作的类型
        if("DELETE".equals(eventType.name())){
            //遍历删除前数据行的每一列
            rowData.getBeforeColumnsList().forEach(column -> {
                //获得删除前的ID
                if("id".equals(column.getName())){
                    //发删除消息给处理删除的队列
                    rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_REMOVE,Long.valueOf(column.getValue()));
                    return;
                }
            });
        }else if("INSERT".equals(eventType.name()) || "UPDATE".equals(eventType.name())){
            //获得插入或更新后的数据
            rowData.getAfterColumnsList().forEach(column -> {
                if("id".equals(column.getName())){
                    //通过id查询课程的完整信息
                    Course course = courseService.getCourseById(Long.valueOf(column.getValue()));
                    //包装到Course对象中,转换为JSON
                    String json = JSON.toJSONString(course);
                    //发送给添加或更新队列
                    rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE,json);
                }
            });
        }
    }
}

启动类加上注解

@EnableCanalClient

消费者添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

添加配置

spring:
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
      config:
        file-extension: yaml
        server-addr: 127.0.0.1:8848
  application:
    name: edu-search-service
  elasticsearch:
    rest:
      uris: 127.0.0.1:9200
  rabbitmq:
    virtual-host: myhost
    host: 127.0.0.1
    username: wjm
    password: 123456
    port: 5672
server:
  port: 8001

添加监听器

package com.blb.edusearchservice.listener;

import com.alibaba.fastjson.JSON;
import com.blb.common.entity.Course;
import com.blb.edusearchservice.service.CourseIndexService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CourseMQListener {

    //对课程进行添加或更新的队列名
    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    //对课程进行删除的队列名
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    //对课程进行添加或更新的路由键
    public static final String KEY_COURSE_SAVE = "key.course.save";
    //对课程进行删除的路由键
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    //课程交换机名
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Autowired
    private CourseIndexService courseIndexService;

    /**
     * 监听课程添加操作
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_SAVE)})
    public void receiveCourseSaveMessage(String message) {
        try {
            log.info("课程保存:{}",message);
            //将json转换为课程对象
            Course course = JSON.parseObject(message, Course.class);
            courseIndexService.saveCourse(course);
        } catch (Exception ex) {
            log.error("接收消息出现异常",ex);
        }
    }

    /**
     * 监听课程删除操作
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true"),
                    key = KEY_COURSE_REMOVE)})
    public void receiveCourseDeleteMessage(Long id) {
        try {
            log.info("课程删除完成:{}",id);
            courseIndexService.removeCourse(String.valueOf(id));
        } catch (Exception ex) {
            log.error("接收消息出现异常",ex);
        }
    }
}

本文转载自: https://blog.csdn.net/m0_57816620/article/details/130099249
版权归原作者 时间总是会不够的啊 所有, 如有侵权,请联系我们删除。

“RabbitMq和Canal的使用”的评论:

还没有评论