RabbitMQ基本概念
快速开始:https://www.rabbitmq.com/getstarted.html
消息队列基本介绍
我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列Queue是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。参与消息传递的双方称为生产者和消费者,生产者负责发送消息,消费者负责处理消息。
随着分布式和微服务系统的发展,消息队列在系统设计中有了更大的发挥空间,使用消息队列可以降低系统耦合性、实现任务异步、有效地进行流量削峰,是分布式和微服务系统中重要的组件之一。
RabbitMQ是什么
RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种语言的客户端,支持 AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。
架构图
publisher
:生产者,也就是发送消息的一方consumer
:消费者,也就是消费消息的一方queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queueBroker
:消息中间件的服务节点
Docker安装RabbitMQ
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
15672:RabbitMQ提供的管理控制台的端口
5672:RabbitMQ的消息发送处理接口
RabbitMQ消息模式
简单模式("Hello World!")
一个生产者,一个队列,一个消费者
// 生产者
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String QUEUE_NAME = "simple-queue";
public void send(String message) {
rabbitTemplate.convertAndSend(QUEUE_NAME, message);
}
}
// 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SimpleConsumer {
@RabbitListener(queues = "simple-queue")
public void receive(String message) {
System.out.println("Received message: " + message);
}
}
工作模式(Work Queues)
一个生产者,一个队列,多个消费者,默认采用轮训的方式
// 生产者和队列定义与简单模式相同
// 消费者1
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkerConsumer1 {
@RabbitListener(queues = "simple-queue")
public void receive(String message) {
System.out.println("Worker 1 Received message: " + message);
}
}
// 消费者2
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkerConsumer2 {
@RabbitListener(queues = "simple-queue")
public void receive(String message) {
System.out.println("Worker 2 Received message: " + message);
}
}
Exchange 模式
Fanout模式——发布/订阅模式(Publish/Subscribe)
一个生产者、一个 fanout 类型的交换机、多个队列、多个消费者。
该模式会将接收到的所有消息广播到它知道的所有队列中。一个生产者发送的消息会被多个消费者获取。其中 fanout 类型就是发布订阅模式,只有订阅该生产者的消费者会收到消息。
// 生产者和队列定义
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PubSubConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout-exchange");
}
@Bean
public Queue queue1() {
return new Queue("queue-1");
}
@Bean
public Queue queue2() {
return new Queue("queue-2");
}
}
// 发布者
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class PubSubProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private FanoutExchange fanoutExchange;
public void send(String message) {
rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message);
}
}
// 订阅者1
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Subscriber1 {
@RabbitListener(queues = "queue-1")
public void receive(String message) {
System.out.println("Subscriber 1 Received message: " + message);
}
}
// 订阅者2
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Subscriber2 {
@RabbitListener(queues = "queue-2")
public void receive(String message) {
System.out.println("Subscriber 2 Received message: " + message);
}
}
Direct模式——路由模式(Routing)
一个生产者,一个 direct 类型的交换机,多个队列,交换机与队列之间通过 routing-key 进行关联绑定,多个消费者。
消息只去到它绑定的 routingKey 队列中。生产者发送消息到交换机并且要指定routing-key,然后消息根据这交换机与队列之间的 routing-key 绑定规则进行路由被指定消费者消费。
// 路由配置
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RoutingConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}
@Bean
public Queue queueA() {
return new Queue("queue-A");
}
@Bean
public Queue queueB() {
return new Queue("queue-B");
}
}
// 生产者和发送消息
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RoutingProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectExchange directExchange;
public void sendToQueueA(String message) {
rabbitTemplate.convertAndSend(directExchange.getName(), "routeA", message);
}
public void sendToQueueB(String message) {
rabbitTemplate.convertAndSend(directExchange.getName(), "routeB", message);
}
}
// 订阅者1
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class QueueAConsumer {
@RabbitListener(queues = "queue-A")
public void receive(String message) {
System.out.println("Queue A Received message: " + message);
}
}
// 订阅者2
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class QueueBConsumer {
@RabbitListener(queues = "queue-B")
public void receive(String message) {
System.out.println("Queue B Received message: " + message);
}
}
Topics模式——主题模式(Topics)
一个生产者,一个 topic 类型的交换机,多个队列,交换机与队列之间通过 routing-key 进行关联绑定,多个消费者。
routing_key 必须是一个单词列表,以点号分隔开。星号*可以代替一个单词,井号#可以替代零个或多个单词。生产者发送消息到交换机并且要指定 routing-key,然后消息根据这交换机与队列之间的 routing-key 绑定规则进行路由被指定消费者消费。与路由模式不同是 routing-key 有指定的队则,可以更加的通用,满足更过的场景。
// 主题配置
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic-exchange");
}
@Bean
public Queue queueX() {
return new Queue("queue-X");
}
@Bean
public Queue queueY() {
return new Queue("queue-Y");
}
}
// 生产者和发送消息
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TopicExchange topicExchange;
public void sendToQueueX(String message) {
rabbitTemplate.convertAndSend(topicExchange.getName(), "topic.queue.X", message);
}
public void sendToQueueY(String message) {
rabbitTemplate.convertAndSend(topicExchange.getName(), "topic.queue.Y", message);
}
}
// 订阅者1
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class QueueXConsumer {
@RabbitListener(queues = "queue-X")
public void receive(String message) {
System.out.println("Queue X Received message: " + message);
}
}
// 订阅者2
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class QueueYConsumer {
@RabbitListener(queues = "queue-Y")
public void receive(String message) {
System.out.println("Queue Y Received message: " + message);
}
}
RabbitMQ可靠性
消息从生产者到消费者的每一步都可能导致消息丢失:
发送消息时丢失:
生产者发送消息时连接MQ失败
生产者发送消息到达MQ后未找到
Exchange
生产者发送消息到达MQ的
Exchange
后,未找到合适的Queue
消息到达MQ后,处理消息的进程发生异常
MQ导致消息丢失:
消息到达MQ,保存到队列后,尚未消费就突然宕机
消费者处理消息时:
消息接收后尚未处理突然宕机
消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
确保生产者一定把消息发送到MQ
确保MQ不会将消息弄丢
确保消费者一定要处理消息
发送者的可靠性
生产者重试机制
生产者发送消息时,可能出现了网络故障,导致与MQ的连接中断。而SpringAMQP提供了消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
生产者确认机制
在少数情况下,可能会出现消息发送到MQ之后丢失的现象,比如:
MQ内部处理消息的进程发生了异常
生产者发送消息到达MQ后未找到
Exchange
生产者发送消息到达MQ的
Exchange
后,未找到合适的Queue
,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
其它情况都会返回NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。 默认两种机制都是关闭状态,需要通过配置文件来开启。
实现方法
开启生产者确认
在publisher模块的application.yaml
中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type
有三种模式可选:
none
:关闭confirm机制simple
:同步阻塞等待MQ的回执correlated
:MQ异步回调返回回执
一般我们推荐使用correlated
,回调机制。
定义ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆SettableListenableFuture
:回执结果的Future对象
将来MQ的回执就会通过这个Future
来返回,我们可以提前给CorrelationData
中的Future
添加回调函数来处理消息回执:
我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback:
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。
当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。
而如果连交换机都是错误的,则只会收到nack。
注意: 开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
路由失败:一般是因为RoutingKey错误导致,往往是编程导致
交换机名称错误:同样是编程错误导致
MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。
MQ的可靠性
数据持久化
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
交换机持久化
队列持久化
消息持久化
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。 不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
LazyQueue
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
消费者宕机或出现网络故障
消息发送量激增,超过了消费者处理速度
消费者处理业务发生阻塞
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut
. PageOut
会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
代码配置Lazy模式
在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数可设置队列为Lazy模式:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}
我们也可以基于注解来声明队列并设置为Lazy模式:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
更新已有队列为lazy模式
对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。 可以基于命令行设置policy:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一个策略Lazy
:策略名称,可以自定义"^lazy-queue$"
:用正则表达式匹配队列的名字'{"queue-mode":"lazy"}'
:设置队列模式为lazy模式--apply-to queues
:策略的作用对象,是所有的队列
当然,也可以在控制台配置policy,进入在控制台的Admin
页面,点击Policies
,即可添加配置:
消费者的可靠性
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:如果是业务异常,会自动返回
nack
;如果是消息处理或校验异常,自动返回
reject
;
通过下面的配置可以修改SpringAMQP的ACK处理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力.
为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
本地重试3次以后,抛出了
AmqpRejectAndDontRequeueException
异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
结论:
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
重试达到最大次数后,Spring会返回reject,消息会被丢弃
失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。 因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代码
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x))
,例如求绝对值函数。 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
根据id删除数据
查询数据
新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
页面卡顿时频繁刷新导致表单重复提交
服务间调用的重试
MQ消息的重复投递
因此,必须想办法保证消息处理的幂等性。有两种方案:
唯一消息ID
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
我们该如何给消息添加唯一ID呢? 其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。 以Jackson的消息转换器为例:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
业务状态判断
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
相比较而言,消息ID的方案需要改造原有的数据库,所以更推荐使用业务判断的方案。
延迟消息
在RabbitMQ中实现延迟消息有两种方案:
死信交换机+TTL
延迟消息插件
死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false消息是一个过期消息,超时无人消费
要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
死信交换机有什么作用呢?
收集那些因处理失败而被拒绝的消息
收集那些因队列满了而被拒绝的消息
收集因TTL(有效期)到期的消息
死信交换机+TTL延迟消息实现方法
设置消息的有效期为5秒,不设置消费者,因此消息无人消费5秒之后,消息的有效期到期,成为死信而投递到死信交换机,再通过绑定死信交换机的死信队列转发到消费者。也就能成功消费消息了,但此时已经是5秒钟以后了。publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息。
DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
安装:由于基于Docker安装,插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明延迟交换机
基于注解方式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
基于@Bean
的方式:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
注意: 延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。 因此,不建议设置延迟时间过长的延迟消息。
其他消息队列
Kafka官方中文文档:https://kafka.apachecn.org/
RocketMQ官方中文文档:https://rocketmq.apache.org/zh/docs/
几种常见MQ的对比:
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka