消息中间件
消息中间件一般有两种传递模式:点对点模式和发布订阅模式。
点对点模式是基于队列的,生产者将消息发送到队列,消费者从队列中接收消息。
发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic,topic可以认为是消息传递的中介,它只是一个逻辑概念,消息真正是怎样进行存储的还是要看具体MQ的实现。
这两种模式其实就是单播和广播的区别,而且当订阅者只有一个时,后者和前者在功能上是完全兼容的。因此主流的RocketMQ、Kafka都是直接基于发布订阅模式实现的。RabbitMQ和Kafka等有些区别,但是大体意思都差不多。
AMQP基础概念
Exchange、RoutingKey、Binding
Exchange
在RabbitMQ中,生产者将消息发送到Exchange,而不是队列之中。消息是由Exchange路由到一个或多个队列之中,如果路由不到,或返回给生产者、或直接丢弃。
RoutingKey
生产者将消息发送给Exchange时,一般会指定一个RoutingKey,Exchange会根据这个值选择一些路由规则。
Binding
RabbitMQ通过Bingding将Exchange和队列关联起来,在绑定的时候一般会指定一个BindingKey。
Exchange类型
Exchange有4种类型对应4种不同的路由策略。
fanout
针对队列的广播,它会忽略BindingKey,将所有发送到该Exchange的消息路由到所有与该Exchange绑定的队列中。
direct
它会将消息路由到那些RoutingKey和BindingKey完全一样的队列中。
topic
与direct类似,只不过不要求RoutingKey和BindingKey完全一致,可以模糊匹配。
headers
根据消息内容中的headers属性进行匹配,很少用。
队列
RabbitMQ中,队列用于存储消息。多个消费者可以订阅同一个队列,但是消息是以轮询的方式交给多个消费者处理的。RabbitMQ不支持队列层面的广播消费,如果需要则需要进行二次开发,不建议这么做。广播需要借助Exchange来实现。
RabbitMQ特性
过期时间(TTL)
RabbitMQ可以对消息和队列设置TTL。
给队列设置了TTL后,队列内所有消息会拥有相同的过期时间。如果消息和队列都设置了TTL,那么以两者间最小的为准。消息一旦超过了TTL,那么就会变为死信(Dead Message)。
队列设置TTL的方式是创建队列时指定x-message-ttl参数,单位毫秒。
消息设置TTL的方法是给消息增加expiration属性,单位毫秒。如果为单个消息设置TTL并且此消息过期了,不会立即从队列中删除,而是在出队时进行判断。
死信队列
当消息在一个队列中变成死信之后,它能被重新被发送到另一个exchange,DLX(Dead-Letter-Exchange)中。绑定了DLX的队列就称为死信队列。
消息变死信的情况:
- 消息被拒绝,并且requeue参数为false;
- 消息过期
- 队列达到最大长度
DLX与一般的exchange没有区别,可以在任何队列上通过x-dead-letter-exchange参数指定。
持久化
RabbitMQ持久化分三个部分:exchange持久化、队列持久化和消息的持久化。
exchange持久化只需要在exchange声明时将durable设置为true,这样能保证exchange的元数据不会丢失。队列持久化只需要在队列声明时将durable设置为true,同样的,队列持久化保证的是队列的元数据不丢失,但是并不保证消息不丢失。因此要确保消息不丢失,需要设置消息的投递模式为持久化。消息的持久化建立在队列持久化的前提下,单独设置消息持久化无意义。消息持久化是通过写入磁盘实现的,因此会拖慢RabbitMQ的性能。
但是这三部分的持久化并不能100%保证数据不丢失,依然需要镜像队列和事务机制的帮助。
生产者确认
针对生产者无法知道消息是否真正到达RabbitMQ的exchange这种情况,RabbitMQ提供了两种解决方案。
事务机制
RabbitMQ是有事务机制的。使用channel.txSelect方法开启事务,channel.commit方法提交事务,channel.txRollback方法回滚事务。但是事务机制非常耗费性能,因此RabbitMQ提供了改进方案。
发送方确认(publisher confirm)
生产者将信道设置为confirm模式,所有在该信道上面发布的消息都会被指派一个唯一ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID)。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘后发出。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。这简直就是TCP协议可靠性传输的翻版~
消费端要点
消息分发
因为RabbitMQ的队列是不支持广播的,队列收到消息后会以轮询的方式分发消息,但是每个消费者的消费能力可能不一样,轮询的方式就会造成整个系统的吞吐量下降,RabbitMQ提供了channel.basicQos方法来限制信道上的消费者所能保持的最大未确认消息的数量。这种机制可以类比TCP/IP中的“滑动窗口”。
消息顺序性
RabbitMQ无法保证在任何情况下,消息者消费消息的顺序与生产者发送消息的顺序一致。
消息传输保障
一般消息中间件的消息传输保障分为三个层次:
- 最多一次
- 最少一次
- 恰好一次
RabbitMQ可以通过生产者确认、消息持久化、消费者ack等机制保证最多一次和最少一次,但是没有去重机制保证恰好一次。去重处理一般是在业务客户端实现。
RabbitMQ集群
当失去一个RabbitMQ节点时,客户端能够重新连接到集群中的任何其他节点并继续生产或消费。当集群中一个RabbitMQ节点崩溃时,该节点上的所有队列中的消息也会丢失。RabbitMQ集群中的所有节点都会备份所有的元数据信息。基于存储空间和性能的考虑,在RabbitMQ集群中创建队列,集群只会在单个节点而不是在所有节点上创建队列的进程并包含完整的队列信息(元数据、状态、内容)。
多主机多节点
RabbitMQ集群对延迟非常敏感,应当只在本地局域网内使用。在广域网中不应该使用集群,而应该使用Federation或者Shovel来代替。
集群中每个节点都有类型,要么是内存结点,要么是磁盘节点。单节点的集群中必然只有磁盘类型的节点。
在集群中创建队列、exchange或者绑定关系的时候,这些操作直到所有集群节点都成功提交元数据变更后才会返回。磁盘操作能提供可靠性,但是无法提供出色的性能,因此RabbitMQ只要求集群中至少有一个磁盘节点。如果集群中唯一的磁盘节点崩溃,集群仍然可以保持运行,但是直到该节点恢复到集群前,无法更改任何东西。
除非使用的是RabbitMQ的RPC功能,否则创建队列、exchange或者绑定关系等操作的频率很低。因此,为了确保集群信息的可靠性,建议全部使用磁盘节点。