RabbitMQ


基本介绍

不同消息队列选择

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala & Java
协议支持 AMQP
XMPP
STOMP
SMTP
AMQP
XMPP
STOMP
OpenWire
REST
自定义协议 自定义协议
支持语言 几乎支持所有 Java
C/C++
Python
PHP
Perl
.NET等
Java 官方支持Java
社区产出多种API,如PHP、Python等
单机吞吐量 每秒十万左右级别 每秒数万级 每秒十万+级 每秒百万级
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性
完整的消息确认机制
一般
内置消息表,消息保存到数据库实现持久化
一般

RabbitMQ核心概念

  • publisher:消息发送者
  • consumer:消息消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息
  • virtual-host:虚拟主机,起到数据隔离作用
  • Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
  • Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立Connection的开销是巨大的,效率也较低。Channel是Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个线程创建单独的Channel进行通讯,AMQP method 包含了 Channel ID 帮助客户端和 Message Broker 识别Channel,所以Channel之间完全隔离。Channel作为轻量级的Connection极大减少了建立TCP连接的开销

Work Queues

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理

默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每个消费者。但这并未考虑到消费者是否已经处理完消息,可能出现消息堆积。

解决方案:设置不公平分发

  1. 设置预取值
  2. 开启手动应答

预取值:

https://blog.csdn.net/qq_43856972/article/details/130835883

https://zhuanlan.zhihu.com/p/582836002

Fanout交换机

Fanout交换机会将接收到的消息广播到每一个绑定的Queue,所以也叫广播模式

Direct交换机

Direct交换机会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与交换机设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • 交换机将消息路由到BindingKey与消息RoutingKey一致的Queue

Topic交换机

Topic交换机与Direct交换机类似,区别在于RoutingKey可以是多个单词的列表,并以.分割

Queue与Exchange指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

例如:china.##.news

可靠性

生产者可靠性

生产者重连

配置重连相关设置,包括连接超时时长、重连间隔、最大重连次数等

生产者确认

RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回结果有以下情况:

  • 消息投递到了MQ,但是路由失败。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知消息投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其他情况都会返回NACK,告知投递失败

消息从 producer 到 exchange 则会返回一个 confirmCallback

消息从 exchange 到 queue 投递失败则会返回一个 returnCallback

如何处理生产者的确认消息?

  • 生产者确认需要额外的网络和系统资源开销,尽量不要使用
  • 如果一定要使用,无需开启Publisher Return机制,因为一般路由失败是自己业务问题
  • 对于nack消息可以有限次数重试,依旧失败则记录异常消息

MQ可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

数据持久化

RabbitMQ实现数据持久化包括三个方面:

  • 交换机持久化

    创建时配置DurabilityDurableTransient为临时的

  • 队列持久化

    同上

  • 消息持久化

    配置delivery mode1为临时的,2为持久化的

补充:

即使队列和消息都是持久化的,也不能完全保证消息的 100% 不丢失。例如:在消息尚未被刷写到磁盘时,RabbitMQ 服务器突然崩溃,这种情况下的消息仍然可能会丢失。此外,RabbitMQ 不保证消息的立即持久化,而是尽可能快地将消息保存到磁盘

如果需要强有力的持久化策略,需要结合生产者的确认模式。

Lazy Queue

惰性队列,它有以下特征:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

在3.12版本后,所有队列都是Lazy Queue模式,无法更改

消费者可靠性

自动应答

消息发送成功后立刻被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。

这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

消费失败处理

当消费者出现异常时,消息会不断重新入队(requeue)到队列,再重新发送给消费者,如果一直异常,就会无限循环导致MQ的消息处理飙升,带来不必要的压力。

当出现异常时,客户端尽量本地重试,而不是无限制重新入队。业务控制失败的处理情况,最终返回ack或者reject。

延迟消息

什么是延迟消息?

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

延迟任务:设置在一定时间之后才执行的任务

死信交换机

当一个队列中的消息满足下列情况之一时,将会称为死信(dead letter):

  • 消费者reject或nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费

    队列设置x-message-ttl(简称TTL)属性,即可设置该队列中消息的过期时间,单位ms

  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过x-dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。该交换机被称为死信交换机(Dead Letter Exchange,简称DLX),x-dead-letter-routing-key属性(简称DLK)可以将死信消息投递到指定交换机指定路由键的队列中去

延迟队列

延迟队列最重要的特性体现在延迟属性上,延迟队列中的元素是希望在指定时间到了以后取出和处理,简单来说,延迟队列就是用来存放需要指定时间被处理的消息的队列,即存放延迟消息的队列。

RabbitMQ中的延迟队列可以看做死信队列的一种,即消息过期的情况,利用DLX+TTL实现。

延迟队列优化

单纯依靠DLX+TTL实现延迟队列会引发一个问题,队列中所有消息的过期时间是固定的,如果需求不同消息有不同的过期时间,这种方式就无法实现。

优化方式:为每一条消息设置过期时间

引发问题:有些情况下,即使消息过期,也无法立刻从队列中删除,这是因为每条消息是否过期是在即将投递到消费者之前判定的,如果第一条消息过期时间很长,即使后面的消息过期时间很短,也需要等到第一条消息执行完再处理后面的消息

解决方案:延迟队列插件rabbitmq_delayed_message_exchange

其他

优先级队列

队列设置参数x-max-priority可设置为优先级队列,参数值表示消息可以选取的优先级范围,如10代表消息可以选择0-10之间的优先级