消息中间件之RabbitMQ初识

RabbitMq

RabbitMQ 相关概念介绍

RabbitMQ 整体上市一个生产者与消费者模型,主要负责接收、储存和转发消息。下面的图是RabbitMQ 的整体模型架构图:

RabbitMQ 的一些角色

  • Producer:生产者,就是投递消息的一方
  • Consumer:消费者,接收消息的一方
  • Broker:消息中间件的服务节点。
  • 队列:是 RabbitMQ的内部对象,用于储存消息。RabbitMQ 的生产者生产的消息并最终投递到队列中,消费者可以从队列中获取消息并消费。多个消费者可以订阅同一队列,这是队列中的消息会被平均分摊(Round-Robin,即轮询)。

交换器、路由键和绑定

交换器(Exchange)

交换器:在RabbitMQ 中,生产者实际把消息发送个 Exchange,有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返给生产者,或许直接丢弃。

RabbitMQ 中的交换器有四种,下面分别简单介绍下:

  • fanout:它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
  • direct:direct 类型的交换器路由规则也很简单,它会把消息路由到那些 BindingKey RoutingKey
    完全匹配的队列中。
  • topic:不同于direct交换器的严格,topic的匹配规则可以模糊匹配。RabbitMQ定义了一下规则:
    • RoutingKey 为一个点号” “分隔的字符串(被点号” “分隔开的每 段独立的字符
      串称为 个单词 )λ,
    • BindingKey RoutingKey 样也是点号” “分隔的字符串;
    • BindingKey 中可以存在两种特殊字符串”*“和”#”,用于做模糊匹配,其中”*”用于匹配一个单词,”#”用于匹配多规格单词(可以是零个)。
  • headers:根据发送的消息内容中的 headers 属性进行匹配。

路由键(RoutingKey)

生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由顾泽,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效

绑定

RabbitMQ 通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个BindingKey,这样消息就能准确发送到队列了。

消息确认

在使用消息中间件,我们需要注意消息是否发送成功,以及消息是否消费成功。

  • 保证消息的成功发送
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善的消息进行补偿机制

confirm 消息确认机制

消息的确认是指生产者投递消息后,如果 Broker 接收到消息,则会给生产者一个应答。生产者进行接收应答,
用来确认这条消息是否正常的发送到 Broker,这种方式也是消息可靠性投递的核心保障。

  • 在channel上启动确认模式:channel.confirmSelect()
  • 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理

return 消息机制

生产者通过指定一个 exchange 和 routingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理。但是在某些情况下,如果我们在发送消息时,当前的 exchange 不存在或者指定的 routingkey 路由不到,这个时候如果要监听这种不可达的消息,就要使用 ReturnListener。

如果生产端发送的消息,通过Return机制,讲这些不可达的消息发送给生产端,这时候生产端就需要设置 Return Listener去接收这些不可达的消息,然后及时记录日志,去处理这些消息。

mandatory 参数

当 mandatory 参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Returen 命令将消息返回给生产者。当 mandatory 参数设置为flase时,出现上述情况,则消息直接丢弃。

在设置 mandatory 的情况下,不想复杂化生产者代码逻辑,可以使用备份交换器,可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。可以通过在声明交换器(调用 channel.exchangeDeclare 方法)的时候添加
alternate-exchange 参数来实现,

immediate 参数

当 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在
任何消费者,那么这条消息将不会存入队列中。当与路由键匹配 所有队列都没有消费者时
该消息会通过 Basic Return 返回至生产者。

RabbitMQ 3.0 版本开始已经去掉了对 immediate 参数的支持

消息的持久化

为了保证RabbitMQ在重启、奔溃等异常情况下数据没有丢失,需要持久化。RabbitMQ 的持久化分为三部分:交换器的持久化、队列的持久化和消息的持久化。

注意:如果将所有的消息都设置持久化,这样会严重影响RabbitMQ的性能。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

消费端的确认和拒绝

为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制( message
acknowledgement) 消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除) 。当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

当 autoAck 参数置为 false ,对于 RabbitMQ 服务端而 ,队列中的消息分成了两个部分:
一部分是等待投递给消费者的消息;一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。 如果 RabbitMQ 直没有收到消费者的确认信号,并且消费此消息的消费
者己经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下 个消费者,当然也有可能还是原来的那个消费者。

关于延迟队列

在RabbitMQ中,可以设置2种延迟队列,一种是对队列设置过期时间;另外一种
是对消息设置过期时间

Per-Queue Message TTL

通过在 queue.declare 中设置 x-message-ttl 参数,可以控制被 publish
到 queue 中的 message 被丢弃前能够存活的时间。当某个 message 在 queue 留存
的时间超过了配置的 TTL 值时,我们说该 message “已死”。

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

Per-Message TTL

TTL 设置可以具体到每一条 message 本身,只要在通过 basic.publish 命令
发送 message 时设置 expiration 字段。消息过期,不会马上从队列中抹去,
因为每条消息是否过期时在即将投递到消费者之前判定的。

这种消息过期存在一定的问题,比如连续发送2条消息,第一条设置过期时间1分钟,第二条设置时间30秒,最终
消费的时候,第二条会被第一条阻塞,最终和第一条一起消费。

死信队列

DLX,全称 Dead-letter-Exchange,当消息在一个队列中变成死信(dead message)之后,它能被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

通过上述的死信队列,我们可以利用这一特性,实现延迟队列的操作。

优先级队列

优先队列,具有高优先级的队列具有高的优先权,优先权高的消息具备优先被消费的特权。可以通过设置x-max-priority参数来实现。默认最低为0,醉倒为队列设置的最大优先级。

注意:如果消费者的消费速度大于生产者的速度,且Broker中没有消息堆积的情况下,对发送的消息设置优先级也没有实际意义。

RabbitMQ 进阶

消息的分发

当 RabbitMQ 队列有多个消费者时,队列收到的消息将以轮询的分发方式发送给消费者。每条消息知会发给订阅列表中的一个消费者。但是当某个消费者任务繁忙时,来不及消费消息的时候,RabbitMQ 还是会把消息分发给这个消费者,而别的消费者有很空闲,这样就会造成整体应用吞吐量的下降。

消费端提供channel.basicQos(int prefetchCount)方法,该方法允许限制信道上的消费者所能保持的最大为确认消息的数量。比如该数字设置为5,如果达到上限,那么 RabbitMQ 就不会向这个消费者发送任何消息,等消费者确认某条消息之后,RabbitMQ将对应的技术减一,之后消费者可以继续接收消息了。

注意:Basic.Qos的使用对拉模式的消费方法无效。

博主 wechat
钟意作者
客官,赏一杯coffee嘛~~~~
0%