引言
并发引起的服务器崩溃是非常常见的现象,为了解决这一问题,目前流行使用缓存数据库与消息队列搭配使用。在最近的项目中也是使用到这一手段,本篇文章通过一个案例为大家展示该套方案如何使用。
案例描述与流程
本案例是一个经典的并发下单的案例。在Redis中存在一条key为Apple,Value为10000的数据,为防止超卖问题的发生使用Redisson分布式锁避免超卖(在Redis解决超卖Demo这篇文章中已经讲过),在一个线程拿到锁并且符合下单条件则直接返回下单成功同时发送消息,使用AMQP监听队列消息,通过线程池创建多个线程作为消费者进行底层DB的更新。
环境准备
创建模块名为Redis
yml配置文件的编写
server:
port: 9000
spring:
application:
name: redis
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/user?useSSL=false
username: root
password: 123456
redis:
host: 192.168.136.130
port: 6379
password: 123456
lettuce:
pool:
max-active: 10
max-idle: 10
min-idle: 1
time-between-eviction-runs: 10s
rabbitmq:
host: 192.168.136.130 #MQ地址
port: 5672 #端口
virtual-host: / #虚拟主机
username: demo #用户密码
password: 123321
connection-timeout: 1s
template:
retry: #重试机制
enabled: true
initial-interval: 1000ms
multiplier: 1
max-attempts: 3
publisher-confirm-type: correlated
publisher-returns: true
Controller Test类的编写
设置路径为 /testAddsetxAddFinally
@RestController
@RequestMapping("/")
public class Test {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
/*使用setnx锁,同时给锁释放过期时间,自动释放锁
* */
@RequestMapping("testAddsetxAddFinally")
String cherkAndReduceStockAddSetnxAddFinally()
{
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", "0000",2, TimeUnit.SECONDS);
//获取锁失败,停止50ms,递归调用
if (!lock){
try {
Thread.sleep(3000);
this.cherkAndReduceStockAddSetnxAddFinally();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
try {
String stock = redisTemplate.opsForValue().get("Apple").toString();
if(stock!=null&&stock.length()!=0)
{
Integer valueOf = Integer.valueOf(stock);
if (valueOf>0)
{
redisTemplate.opsForValue().set("Apple",String.valueOf(--valueOf));
//推送MQ
String queue="demo.queue";
//123456为用户id 1为商品id
String masg="123456:1";
rabbitTemplate.convertAndSend(queue,masg);
return "抢购成功!";
}else {
System.out.println("商品售罄!!!");
return "商品售罄!!!";
}
}
}finally {
redisTemplate.delete("lock-stock");
}
}
return "";
}
}
编写RedisConfig类序列化存储
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory)
{
//缓存序列化配置避免存储乱码
RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
}
创建Consumer模块
yml文件
server:
port: 9004
spring:
application:
name: redis
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/user?useSSL=false
username: root
password: 123456
redis:
host: 192.168.136.130
port: 6379
password: 123456
lettuce:
pool:
max-active: 10
max-idle: 10
min-idle: 1
time-between-eviction-runs: 10s
rabbitmq:
host: 192.168.136.130
port: 5672
virtual-host: /
username: demo
password: 123321
connection-timeout: 1s
template:
retry:
enabled: true
initial-interval: 1000ms
multiplier: 1
max-attempts: 3
publisher-confirm-type: correlated
publisher-returns: true
编写order实体类
package cn.itcast.mq.pojo;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.relational.core.mapping.Table;
@Data
@TableName("orderlist")
public class order {
//用户id
@TableField("userId")
private String userId;
//商品id
private String id;
public order(String userId, String id) {
this.userId = userId;
this.id = id;
}
}
注意对应关系
编写orderMapper
package cn.itcast.mq.mapper;
import cn.itcast.mq.pojo.order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface orderMapper extends BaseMapper<order> {
}
编写orderService
package cn.itcast.mq.service;
import cn.itcast.mq.pojo.order;
import com.baomidou.mybatisplus.extension.service.IService;
public interface orderService extends IService<order> {
}
编写Iml实现类
package cn.itcast.mq.service;
import cn.itcast.mq.mapper.orderMapper;
import cn.itcast.mq.pojo.order;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
@Service
public class orderServiceImpl extends ServiceImpl<orderMapper, order> implements orderService{
}
构建Listerner线程池,构建容器工厂
使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。
package cn.itcast.mq.thread;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(10); //设置线程数
factory.setMaxConcurrentConsumers(10); //最大线程数
configurer.configure(factory, connectionFactory);
return factory;
}
}
编写MQListener监听队列
package cn.itcast.mq.listeners;
import cn.itcast.mq.pojo.order;
import cn.itcast.mq.service.orderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MqListener {
@Autowired
private orderService orderService;
//声明队列 mq的容器工厂
@RabbitListener(queues="demo.queue",containerFactory = "customContainerFactory")
public void listenSimpleQueue(String msg)
{
//拆分消息
String[] split = msg.split(":");
order order = new order(split[0], split[1]);
System.out.println(order.toString());
//保存MYSQL
orderService.save(order);
//测试是否多个消费者
System.out.println("线程" + Thread.currentThread().getName() + " 执行异步任务" );
}
}
RabbitMQ的准备
创建demo.queue队列
创建demo用户并且配置虚拟主机
进行测试
启动Redis和Consumer服务
使用JMeter压测12000个用户
开始压测
查看队列
观察Consumer控制台,一万条消息瞬间执行完成!
** 查看MySQL orderlist表,有一万条数据**
** 查看Redis 数据库并没有出现超卖问题,案例成功!!**
附加
解决RabbitMQ消息堆积的方案有三种
- 增加更多消费者,提高消息速度。(本案例采用这一种)
- 在消费者中开启线程池加快消息处理速度。
- 扩大队列容积,提高堆积上限,采用惰性队列。
总结
通过本次演示的案例,希望大家可以掌握并且多加练习,在日常的开发中缓存数据库和异步队列是必备的手段,同时也是大家找工作时的一个亮点。本文如有不妥之处希望大家指正!!!
版权归原作者 @耳東 所有, 如有侵权,请联系我们删除。