概述
rocketmq消息组件在springboot框架中的应用,使用rocketmq的整合包进行编码实现。
编码参考
- 引入rocketmq依赖jar, 最新版本参考GitHub - apache/rocketmq-spring: Apache RocketMQ Spring Integration
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
- 基本配置
rocketmq:
name-server: 192.168.15.175:9876;192.168.15.77:9876
producer:
group: ta-cipher-encode
rocketmq.name-server: rocketmq集群地址,单点或集群
rocketmq.producer.group : 生产者组名,用于标识一组具有相同功能的生产者
- 其他配置
Name Server相关配置:
rocketmq.namesrv.domain:Name Server的域名,用于自动发现Name Server地址。
Producer相关配置:
rocketmq.producer.group:生产者组名,用于标识一组具有相同功能的生产者。
rocketmq.producer.send-msg-timeout:发送消息的超时时间,默认为3秒。
rocketmq.producer.compress-msg-body-over-howmuch:消息体大于指定字节大小时启用压缩。
Consumer相关配置:
rocketmq.consumer.group:消费者组名,用于标识一组具有相同功能的消费者。
rocketmq.consumer.consume-thread-min:消费者线程池的最小线程数。
rocketmq.consumer.consume-thread-max:消费者线程池的最大线程数。
rocketmq.consumer.consume-message-batch-max-size:批量消费消息时每次拉取的最大消息数量。
rocketmq.consumer.pull-interval:拉取消息间隔时间,默认为0,表示尽可能快地拉取消息。
Message相关配置:
rocketmq.message.max-size:消息的最大大小,默认为4MB。
rocketmq.message.compress-level:消息压缩级别,可选值为0(不压缩)到9(最高压缩率)。
rocketmq.message.timeout:消息的过期时间,默认为3天。
集群模式相关配置:
rocketmq.broker.cluster.name:Broker集群的名称。
rocketmq.broker.cluster.slave-read-only:Slave节点是否只读,默认为true。
- 生产端代码
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RmqProvdier{
@Autowired
private RocketMQTemplate rocketMQTemplate;
public boolean send(String message) {
try {
// 发送消息
rocketMQTemplate.convertAndSend("ta-cipher-persist", message);
} catch (Exception e) {
log.error("send message:{}", message, e);
return false;
}
return true;
}
}
- 消费端代码
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode")
public class RocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
log.info("message:{}", message);
} catch (Exception e) {
log.error("errorMessage:{}", message);
// 抛出异常会重新消费消息
throw new RuntimeException("Message processing failed", e);
}
}
}
@RocketMQMessageListener参数
- topic 消费这从那个topic中读取消息
- consumerGroup 当前消费者的消费组
版权归原作者 lizz666 所有, 如有侵权,请联系我们删除。