0


Redis与RabbitMQ配合使用多线程(多消费者)处理消息

引言

并发引起的服务器崩溃是非常常见的现象,为了解决这一问题,目前流行使用缓存数据库与消息队列搭配使用。在最近的项目中也是使用到这一手段,本篇文章通过一个案例为大家展示该套方案如何使用。

案例描述与流程

本案例是一个经典的并发下单的案例。在Redis中存在一条key为Apple,Value为10000的数据,为防止超卖问题的发生使用Redisson分布式锁避免超卖(在Redis解决超卖Demo这篇文章中已经讲过),在一个线程拿到锁并且符合下单条件则直接返回下单成功同时发送消息,使用AMQP监听队列消息,通过线程池创建多个线程作为消费者进行底层DB的更新。

环境准备

创建模块名为Redis

yml配置文件的编写
server:
  port: 9000
spring:
  application:
    name: redis
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/user?useSSL=false
    username: root
    password: 123456

  redis:
    host: 192.168.136.130
    port: 6379
    password: 123456
    lettuce:
      pool:
        max-active: 10
        max-idle: 10
        min-idle: 1
        time-between-eviction-runs: 10s

  rabbitmq:
    host: 192.168.136.130  #MQ地址
    port: 5672        #端口
    virtual-host: /   #虚拟主机
    username: demo   #用户密码
    password: 123321
    connection-timeout: 1s
    template:
      retry:  #重试机制
        enabled: true
        initial-interval: 1000ms
        multiplier: 1
        max-attempts: 3
    publisher-confirm-type: correlated
    publisher-returns: true
Controller Test类的编写

设置路径为 /testAddsetxAddFinally


@RestController
@RequestMapping("/")
public class Test {
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private RabbitTemplate rabbitTemplate;
 

    /*使用setnx锁,同时给锁释放过期时间,自动释放锁
    * */

    @RequestMapping("testAddsetxAddFinally")
    String cherkAndReduceStockAddSetnxAddFinally()
    {

        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", "0000",2, TimeUnit.SECONDS);
        //获取锁失败,停止50ms,递归调用
        if (!lock){
            try {
                Thread.sleep(3000);
                this.cherkAndReduceStockAddSetnxAddFinally();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }else {
            try {
                String stock = redisTemplate.opsForValue().get("Apple").toString();
                if(stock!=null&&stock.length()!=0)
                {
                    Integer valueOf = Integer.valueOf(stock);
                    if (valueOf>0)
                    {
                        redisTemplate.opsForValue().set("Apple",String.valueOf(--valueOf));
                        //推送MQ
                        String queue="demo.queue";

                        //123456为用户id    1为商品id
                        String masg="123456:1";
                        rabbitTemplate.convertAndSend(queue,masg);
                        return "抢购成功!";
                      

                    }else {
                        System.out.println("商品售罄!!!");
                        return "商品售罄!!!";
                    }
                }
            }finally {
                redisTemplate.delete("lock-stock");
            }

        }
        return "";
    }

}
编写RedisConfig类序列化存储
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory)
    {
        //缓存序列化配置避免存储乱码

        RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
        redisTemplate.setConnectionFactory(factory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return  redisTemplate;
    }
}

创建Consumer模块

yml文件
server:
  port: 9004
spring:
  application:
    name: redis
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/user?useSSL=false
    username: root
    password: 123456

  redis:
    host: 192.168.136.130
    port: 6379
    password: 123456
    lettuce:
      pool:
        max-active: 10
        max-idle: 10
        min-idle: 1
        time-between-eviction-runs: 10s

  rabbitmq:
    host: 192.168.136.130
    port: 5672
    virtual-host: /
    username: demo
    password: 123321
    connection-timeout: 1s
    template:
      retry:
        enabled: true
        initial-interval: 1000ms
        multiplier: 1
        max-attempts: 3
    publisher-confirm-type: correlated
    publisher-returns: true
编写order实体类
package cn.itcast.mq.pojo;

import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.relational.core.mapping.Table;

@Data
@TableName("orderlist")
public class order {
    //用户id
    @TableField("userId")
    private String userId;
    //商品id
    private String  id;

    public order(String userId, String id) {
        this.userId = userId;
        this.id = id;
    }
}

注意对应关系

编写orderMapper
package cn.itcast.mq.mapper;

import cn.itcast.mq.pojo.order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface orderMapper extends BaseMapper<order> {
}
编写orderService
package cn.itcast.mq.service;

import cn.itcast.mq.pojo.order;
import com.baomidou.mybatisplus.extension.service.IService;

public interface orderService extends IService<order> {
}
编写Iml实现类
package cn.itcast.mq.service;

import cn.itcast.mq.mapper.orderMapper;
import cn.itcast.mq.pojo.order;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

import org.springframework.stereotype.Service;

@Service
public class orderServiceImpl extends ServiceImpl<orderMapper, order> implements orderService{

  
}
构建Listerner线程池,构建容器工厂

使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。

package cn.itcast.mq.thread;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync
public class ThreadPoolConfig {

    @Bean("customContainerFactory")

    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

        factory.setConcurrentConsumers(10); //设置线程数

        factory.setMaxConcurrentConsumers(10); //最大线程数

        configurer.configure(factory, connectionFactory);

        return factory;

    }
    }
编写MQListener监听队列
package cn.itcast.mq.listeners;

import cn.itcast.mq.pojo.order;
import cn.itcast.mq.service.orderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqListener {

    @Autowired
    private orderService orderService;

    //声明队列    mq的容器工厂
    @RabbitListener(queues="demo.queue",containerFactory = "customContainerFactory")
    public void listenSimpleQueue(String msg)
    {
        //拆分消息
        String[] split = msg.split(":");
        order order = new order(split[0], split[1]);
        System.out.println(order.toString());
        //保存MYSQL
        orderService.save(order);

        //测试是否多个消费者
        System.out.println("线程" + Thread.currentThread().getName() + " 执行异步任务" );
    }

}

RabbitMQ的准备

创建demo.queue队列

创建demo用户并且配置虚拟主机

进行测试

启动Redis和Consumer服务

使用JMeter压测12000个用户

开始压测

查看队列

观察Consumer控制台,一万条消息瞬间执行完成!

** 查看MySQL orderlist表,有一万条数据**

** 查看Redis 数据库并没有出现超卖问题,案例成功!!**

附加

解决RabbitMQ消息堆积的方案有三种

  • 增加更多消费者,提高消息速度。(本案例采用这一种)
  • 在消费者中开启线程池加快消息处理速度。
  • 扩大队列容积,提高堆积上限,采用惰性队列。

总结

通过本次演示的案例,希望大家可以掌握并且多加练习,在日常的开发中缓存数据库和异步队列是必备的手段,同时也是大家找工作时的一个亮点。本文如有不妥之处希望大家指正!!!


本文转载自: https://blog.csdn.net/qq_63772695/article/details/137548379
版权归原作者 @耳東 所有, 如有侵权,请联系我们删除。

“Redis与RabbitMQ配合使用多线程(多消费者)处理消息”的评论:

还没有评论