Redis的发布订阅(pub/sub)功能是一种常用的消息通信模式,它允许发送者(发布者)向多个接收者(订阅者)发送消息,实现了解耦和异步通信的需求。
实现方式:
命令概述:Redis提供了一组用于发布和订阅消息的命令,用于构建实时通信应用,比如聊天室、广播通知等。
发布订阅的实现:
订阅端:订阅端使用 SUBSCRIBE 命令来订阅一个或多个频道。一旦订阅成功,客户端将进入订阅模式,等待接收来自指定频道的消息。
SUBSCRIBE dingdada
在订阅成功后,客户端将持续监听来自 dingdada 频道的消息。
发送端:发送端使用 PUBLISH 命令将消息发布到指定的频道。
PUBLISH dingdada "hello world!"
PUBLISH dingdada "my name is dyj"
这些命令将分别向 dingdada 频道发布消息 "hello world!" 和 "my name is dyj"。
PSUBSCRIBE 命令:
PSUBSCRIBE 命令用于订阅符合指定模式的所有频道。
PSUBSCRIBE ding*
这将订阅所有以 ding 开头的频道。
PUBLISH 命令:
PUBLISH 命令用于向指定频道发布消息。
PUBLISH dingdada "hello world!"
PUNSUBSCRIBE 命令:
PUNSUBSCRIBE 命令用于退订一个或多个模式。
PUNSUBSCRIBE ding*
这将取消订阅所有以 ding 开头的频道。
总结:
Redis的发布订阅功能提供了一个高效的消息传递机制,特别适用于需要实时通信和广播的场景。发布者可以向一个或多个频道发布消息,而订阅者则可以选择性地接收他们感兴趣的频道的消息,从而实现了灵活的消息传递和处理。
这种模式对于构建实时聊天、消息推送、实时数据更新等应用非常有用,能够有效地降低系统的耦合度和增强实时性。
Spring Boot中使用Redis的发布订阅功能
在Java的Spring Boot中使用Redis的发布订阅功能,可以通过Spring Data Redis提供的支持来实现。下面是具体的步骤和代码示例:
添加依赖
首先,在Spring Boot项目的pom.xml文件中添加Spring Data Redis的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置Redis连接
在application.properties或application.yml中配置Redis连接信息:
spring:
data:
redis:
host: localhost
port: 6379
database: 0
password: # 密码(默认为空)
实现发布者
创建一个发布消息的服务:
package com.example.demo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* Redis发布者类,用于向指定的Redis频道发布消息。
* 使用StringRedisTemplate来操作Redis,实现消息的发布和列表存储。
*/
@Service
public class RedisPublisher {
/**
* 自动注入StringRedisTemplate,用于操作Redis字符串和列表。
*/
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 发布消息到指定的Redis频道。
* 同时将消息添加到Redis的列表中,用于存储和后续查询。
*
* @param channel Redis频道名称,消息将发布到这个频道。
* @param message 要发布的消息内容。
*/
public void publishMessage(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
// 将消息存储在Redis列表中
redisTemplate.opsForList().rightPush("messageList", message);
}
}
实现订阅者
创建一个订阅消息的服务:
package com.example.demo.component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
* Redis订阅者类,用于监听Redis频道上的消息。
* 通过实现MessageListener接口,当有消息发布到订阅的频道时,会调用onMessage方法。
*/
@Component
public class RedisSubscriber implements MessageListener {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 当收到消息时的处理方法。
* @param message 收到的消息对象,包含消息内容和消息频道。
* @param pattern 模式匹配参数,用于订阅模式匹配的消息(此应用中未使用)。
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 转换消息频道和内容为字符串
String channel = new String(message.getChannel());
String body = new String(message.getBody());
// 打印收到的消息内容和频道信息
System.out.println("收到的消息: " + body + " from channel: " + channel);
// 将收到的消息存储到Redis的列表中,列表名称为"messageList"
// 将消息存储在Redis列表中
redisTemplate.opsForList().rightPush("messageList", body);
}
}
配置订阅通道
可以使用Spring的配置类来注册订阅通道:
package com.example.demo.config;
import com.example.demo.component.RedisSubscriber;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* Redis配置类,用于配置Redis的相关监听器和连接工厂。
*/
@Configuration
public class RedisConfig {
/**
* 配置Redis消息监听容器。
*
* @param connectionFactory Redis连接工厂,用于创建Redis连接。
* @param listenerAdapter 消息监听适配器,将特定方法绑定到Redis消息监听上。
* @return RedisMessageListenerContainer,配置好的消息监听容器。
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 设置连接工厂
// 绑定监听器到特定的Redis频道
// 订阅 dingdada 频道
container.addMessageListener(listenerAdapter, new ChannelTopic("dingdada"));
return container;
}
/**
* 配置消息监听适配器。
*
* @param redisSubscriber Redis订阅者,实现具体的业务逻辑。
* @return MessageListenerAdapter,配置好的消息监听适配器。
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisSubscriber redisSubscriber) {
// 通过MessageListenerAdapter将redisSubscriber的onMessage方法绑定为消息处理方法
return new MessageListenerAdapter(redisSubscriber, "onMessage");
}
}
使用发布者发布消息
然后可以在任何需要的地方注入RedisPublisher并调用publishMessage方法来发布消息:
package com.example.demo.controller;
import com.example.demo.service.RedisPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* 发布控制器,用于处理消息的发布操作。
*/
@RestController
public class PublishController {
/**
* Redis发布者服务,用于向Redis发布消息。
*/
@Autowired
private RedisPublisher redisPublisher;
/**
* 发布消息到指定频道。
*
* @param message 要发布的消息内容。
* @return 返回一个字符串指示消息已成功发布。
*/
@PostMapping("/publish")
public String publishMessage(@RequestBody String message) {
// 向名为"dingdada"的频道发布消息
redisPublisher.publishMessage("dingdada", message);
return "消息已成功发布";
}
}
使用postman来测试api
版权归原作者 正在奋斗的程序猿 所有, 如有侵权,请联系我们删除。