在并发量很高的时候,服务端处理不过来客户端发的请求,这个时候可以使用消息队列,实现削峰。原理就是请求先打到队列上,服务端从队列里取出消息进行处理,处理不过来的消息就堆积在消息队列里等待。
可以模拟一下这个过程:
发送方把10万条消息在短时间内发送到消息队列
接收方把这些消息存储到数据库
一、具体实现
1. 创建两个spring项目
2. 分别引入 RabbitMQ 的依赖
<!-- rabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3. 配置文件中配置RabbitMQ的信息(这里是.yml文件的格式)
spring:
rabbitmq:
host: 这里写IP地址
port: 5672 #端口号
username: 用户名
password: 密码
virtual-host: /
4. 发送方 Sender
发送消息
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String str) {
//first是消息队列的名称
rabbitTemplate.convertAndSend("first", str);
}
}
多线程实现Runnable接口
@Data
public class MyRunnable implements Runnable {
private String str;
private Sender sender;
private CountDownLatch countDownLatch;
public MyRunnable(CountDownLatch countDownLatch,Sender sender, String str){
this.countDownLatch = countDownLatch;
this.sender = sender;
this.str = str;
}
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
//发送线程名和消息
sender.send(threadName + " " + str);
//控制台输出
System.out.println(threadName + " " + str);
countDownLatch.countDown();
} catch (Exception e) {
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
}
测试类
@SpringBootTest
class Rabbitmq1ApplicationTests {
@Autowired
Sender sender;
@Test
void contextLoads() throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10,
300, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
//循环100000次,模拟10万条消息
final int count = 100000;
CountDownLatch countDownLatch = new CountDownLatch(count);
for(int i = 0; i < count; i++){
MyRunnable myRunnable = new MyRunnable(countDownLatch,sender,"" + i);
threadPoolExecutor.execute(myRunnable);
}
countDownLatch.await();
}
}
5. 接收方
@Component
@RabbitListener(queues = "first")
public class Receiver {
@Autowired
MessageService messageService;
@RabbitHandler
public void process(String str){
//控制台输出
System.out.println("msg: " + str);
//对消息的处理,字符串分割
String[] strings = str.split(" ");
Message message = new Message(strings[0],strings[1]);
//插入数据库
messageService.insert(message);
}
}
这里@RabbitListener注解监听着 first 队列 ,当有收到消息的时候,就交给 @RabbitHandler 的方法处理。
二、结果
可以看到发送方很快就发送完毕了,接收方这边还在慢慢的处理中
过了一会。。。
又过了一会。。。
完美地发挥了削峰的作用。
版权归原作者 早起之王 所有, 如有侵权,请联系我们删除。