器→工具, 工具软件, 术→技巧, 研发

开源消息队列RabbitMQ

钱魏Way · · 3 次浏览

RabbitMQ简介

RabbitMQ 是一个开源的消息代理软件,最初由 LShift 公司开发,后来由 Pivotal Software(现为 VMware 的一部分)维护。它基于 AMQP(Advanced Message Queuing Protocol)协议构建,但也支持其他协议,如 MQTT 和 STOMP。RabbitMQ 以其易用性、灵活性和可靠性在全球范围内广泛应用,尤其是在需要复杂路由和消息传递模式的系统中。

核心特性

  • 协议支持
    • RabbitMQ 原生支持 AMQP 0-9-1 协议,这是一个功能强大的消息传递协议。
    • 还支持其他协议,包括 MQTT、STOMP 和 HTTP,使其能够与多种应用和系统集成。
  • 灵活的消息路由
    • 提供多种交换机类型(如 direct、topic、fanout、headers),支持复杂的消息路由。
    • 通过绑定键和路由键,可以实现灵活的消息分发策略。
  • 高可用性和持久性
    • 支持消息和队列的持久化,确保在代理重启后数据不会丢失。
    • 通过镜像队列实现高可用性,确保消息在多个节点上复制。
  • 扩展性和集群支持
    • 支持通过集群扩展来提高系统的吞吐量和可靠性。
    • 提供分布式部署,支持在不同节点之间的消息传递。
  • 易于管理和监控
    • 提供强大的管理工具和 Web 控制台,支持实时监控和管理队列、交换机和连接。
    • 支持插件机制,可以通过插件扩展功能,如管理插件、监控插件等。
  • 多语言客户端支持
    • 提供多种编程语言的客户端库,包括 Java、Python、Ruby、C#、JavaScript 等,方便与不同语言的应用集成。
  • 插件系统
    • RabbitMQ 的插件系统允许用户添加自定义功能或扩展现有功能,如消息追踪、不同协议支持等。

应用场景

  • 任务队列:用于异步处理后台任务,如图像处理、数据分析等。
  • 微服务通信:在微服务架构中,RabbitMQ 用于服务之间的异步通信和事件驱动的交互。
  • 实时消息传递:用于需要低延迟和高吞吐量的实时应用,如在线聊天、实时监控等。
  • 事件通知系统:在复杂的事件驱动系统中,用于事件的分发和处理。

优势

  • 简单易用:易于安装和配置,提供直观的管理界面。
  • 灵活性:支持复杂的消息路由和多种协议,适应不同的应用需求。
  • 可靠性:通过持久化和镜像队列实现高可用性和数据可靠性。
  • 社区支持:拥有活跃的社区和丰富的文档资源,支持持续更新和改进。

AMQP协议简介

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种开放的应用层协议,为消息导向的中间件提供了统一的消息传递机制。AMQP 旨在解决不同消息系统之间的互操作性问题,提供一个标准化的消息协议,支持跨平台、跨语言的消息传递。

核心概念

  • Broker(消息代理):消息代理负责接收、存储和转发消息。它是 AMQP 网络中的核心组件。
  • Message(消息):消息是通过 AMQP 网络传递的数据单元。每个消息包括一个可选的头和消息体。
  • Producer(生产者):生产者是创建并发送消息到消息代理的应用程序。
  • Consumer(消费者):消费者是从消息代理接收和处理消息的应用程序。
  • Queue(队列):队列是消息的存储位置。生产者将消息发送到队列,消费者从队列中接收消息。
  • Exchange(交换机):交换机负责接收来自生产者的消息,并根据路由键将消息分发到一个或多个队列。
  • Binding(绑定):绑定定义了交换机和队列之间的关系,包括如何将消息路由到队列。

协议层次结构

AMQP 协议由多个层次组成,每一层次负责不同的功能:

  • 框架层:定义了网络连接的建立、维护和关闭的基本规则。
  • 传输层:负责数据的传输,处理消息的帧结构。
  • 消息层:处理消息的路由、队列、发布和订阅等功能。
  • 事务层:提供事务支持,确保消息传递的一致性和完整性。

AMQP中的类

Connection类

AMQP是一个长连接协议。Connection被设计为长期使用的,可以携带多个cannel。Connection的生命周期是:

  • 客户端打开到服务端的TCP/IP连接,发送protocol头。这是唯一的,客户端发送的数据不能被解析为方法的。
  • 服务端返回其协议版本、属性(比如支持的安全机制列表)。the Start method
  • 客户端选择安全机制Start-Ok
  • 服务端开始认证过程, 它使用SASL的质询-响应模型(challenge-response model). 它向客户端发送一个质询Secure
  • 客户端向服务端发送一个认证响应Secure-Ok 。比如,如果使用 plain 认证机制,则响应会包含登录名和密码
  • 客户端重复质询Secure或转到协商步骤,发送一系列参数,如最大帧大小Tune
  • 客户端接受,或者调低这些参数Tune-Ok
  • 客户端正式打开连接,并选择一个VhostOpen
  • 服务端确认VHost有效Open-Ok
  • 客户端可以按照预期使用连接
  • 当一个节点打算结束连接Close
  • 另一个节点需要结束握手Close-Ok
  • 服务端和客户端关闭Socket连接。

如果在发送或者收到 Open 或者 Open-Ok 之前,某一个节点发现了一个错误,则必须直接关闭Socket,且不发送任何数据。

Channel类

AMQP是一个多通道协议。Channel提供了一种方式,在比较重的TCP/IP连接上建立多个轻量级的连接。这会让协议更加的防火墙友好,因为端口使用是可预知的。它也意味着流量调整和其他QoS特性也很容易支持。

Channels相互是独立的,可以同步执行不同的功能。可用带宽会在当前活动之间共享.

这里期望也鼓励多线程客户端程序应该使用 每个线程一个channel 的模型。不过,一个客户端在一个或多个AMQP服务端上打开多个连接也是可以的。

Channel的生命周期为:

  • 客户端打开一个新通道Open
  • 服务端确认新通道准备就绪Open-Ok
  • 客户端和服务端按预期来使用通道.
  • 一个节点关闭了通道Close
  • 另一个节点对通道关闭进行握手Close-Ok

Exchange类

Exchange类能够让应用操作服务端的交换器。这个类能够让程序自己设置路由,而不是通过某些配置。注意:大部分程序并不需要这个级别的复杂度,过去的中间件也不只支持这个语义。

Exchange的生命周期为:

  • 客户端让服务端确保该exchange存在Declare 。客户端可以细化为:“如果交换器不存在则进行创建” 或 “如果交换器不存在,警告我,不需要创建”
  • 客户端向Exchange发消息
  • 客户端也可以选择删掉ExchangeDelete

Queue类

该类用于让程序管理服务端上的消息队列。几乎所有的消费者应用都是基本步骤,至少要验证使用的消息队列是否存在。

一个持久化消息队列的生命周期非常简单

  • 客户端断言这个消息队列存在Declare(设置 passive 参数)
  • 服务端确认消息队列存在Declare-Ok
  • 客户端消息队列中读消息

一个临时消息队列的生命周期会更有趣些:

  • 客户端创建消息队列Declare(不提供队列名称,服务器会分配一个名称)。服务端确认 Declare-Ok
  • 客户端在消息队列上启动一个消费者
  • 客户端取消消费,可以是显示取消,也可以是通过关闭通道或者连接连接隐式取消的
  • 当最后一个消费者从消息队列中消失的时候,在过了礼貌性超时后,服务端会删除消息队列

AMQP实现了Topic订阅的分发模型。这可以让订阅在合作的订阅者间进行负载均衡。涉及到额外的绑定阶段的生命周期:

  • 客户端创建一个队列Declare,服务端确认Declare-Ok
  • 客户端绑定消息队列到一个topic exchange上Bind,服务端确认Bind-Ok
  • 客户端像之前一样使用消息队列。

Basic类

Basic实现本规范中描述的消息功能。支持如下语义:

  • 从客户端→服务端发消息。异步Publish
  • 开始或者停止消费Consume,Cancel
  • 从服务端到客户端发消息。异步Deliver,Return
  • 确认消息Ack,Reject
  • 同步的从消息队列中读取消息Get

事务类

AMQP支持两种类型的事务:

  • 自动事务。每个发布的消息和应答都处理为独立事务.
  • 服务端本地事务:服务器会缓存发布的消息和应答,并会根据需要由client来提交它们.

Transaction 类(“tx”) 使应用程序可访问第二种类型,即服务器事务。这个类的语义是:

  • 应用程序要求服务端事务,在需要的每个channel里Select
  • 应用程序做一些工作Publish, Ack
  • 应用程序提交或回滚工作Commit,Roll-back
  • 应用程序正常工作,循环往复。

事务包含发布消息和ack,不包含分发。所以,回滚并不能重入队列或者重新分发任何消息。客户端有权在事务中确认这些消息。

交换机类型

AMQP 支持多种交换机类型,每种类型实现不同的消息路由策略:

  • Direct Exchange:根据消息的路由键进行精确匹配,将消息路由到与路由键完全匹配的队列。
  • Fanout Exchange:将消息广播到所有绑定的队列,不考虑路由键。
  • Topic Exchange:支持基于模式匹配的路由,路由键可以使用通配符(如* 和 #)进行匹配。
  • Headers Exchange:基于消息头的属性进行路由,忽略路由键。

Direct交换器

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。

  • 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
  • 当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。

Fanout交换器

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。所以扇型交换机常用来处理消息的广播路由(broadcast routing)。

  • 一个消息队列没有使用任何参数绑定交换器
  • 生产者向交换器发了一条消息
  • 这个消息无条件的发送到该消息队列

Topic 交换器

主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。

如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:”*” 、 “#”。匹配规则如下:

  • 消息路由键和绑定路由键都是一个由“.”分隔的字符串,“.”分隔开来的每一段称为一个单词,如:bb.cc。
  • *:只能匹配一个单词。比如*可以匹配到”a.b”、”a.c”,但是匹配不了”a.b.c”。
  • #:可以匹配零个或多个单词。比如”rabbit.#”既可以匹配到”rabbit.a.b”、”rabbit.a”,也可以匹配到”rabbit.a.b.c”。

主题交换机经常用来实现各种分发/订阅模式及其变种。当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。

Headers 交换器

有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

  • 消息队列使用Header的参数表来绑定。不适用RoutingKey
  • 生产者向交换器发送消息,Header中包含了指定的键值对
  • 如果匹配,则传给消息队列。

头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。

我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。我们可以通过”x-match”参数来设置部分匹配还是全部匹配,当”x-match”设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当”x-match”设置为“all”的时候,就需要消息头的所有值都匹配成功。

比如:

format=json,type=log,x-match=all
format=line,type=log,x-match=any

如果 x-match 为all,则必须都匹配才行。如果x-match为any,则有任意一个header匹配即可。

AMQP的传输架构

AMQP是一个二进制协议。有不同类型的帧frame 构成。帧会携带协议的方法以及其他信息。所有的帧都有相同的基本结构,即:帧头,payload,帧尾。payload格式取决于帧的类型。

我们假设使用的是面向流的可靠网络层(比如TCP/IP)。单个Socket连接上可以有多个独立的控制线程,也就是通道Channel 。不同的通道共享一个连接,每个通道上的帧都是按严格的顺序排列,这样可以用一个状态机来解析协议。

传输层(wire-level)的格式被设计为扩展性强、且足够通用,可以用于任何更高层的协议(不仅仅是AMQP)。我们假设AMQP是会被扩展、优化的。

主要涉及这几个部分:数据类型、协议协商、分帧方式、帧细节、方法帧、内容帧、心跳帧、错误处理、通道与连接的关闭

数据类型

AMQP的数据类型用于方法帧中,他们有

  • 整数(1-8个字节),表示大小,数量,范围等。全都是无符号整数
  • Bits。用于表示为开/关值,会被封包为字节。
  • 短字符串。用于存放短的文本属性。最多255字节,解析时不用担心缓冲区溢出。
  • 长字符串:用于存放二进制数据块
  • 字段表(Field Table),用于存放键值对

协议协商

客户端连接时,和服务端协商可接受的配置。当两个节点达成一致后,连接才能继续使用。通过协商,可以让我们断言假设和前提条件。主要协商这几方面的信息

  • 实现的协议和版本。服务端可能会在同一端口提供多种协议的支持
  • 加密参数和验证
  • 最大帧尺寸、Channel的数量、某些操作的限制。

如果协商达成一致,双方会根据协商预分配缓冲区避免死锁。传入的帧如果满足协商条件,则认为其实安全的。如果超过了,那么另一方必须断开连接。

分帧方式

TCP/IP是流协议。没有内置的分帧机制。现有的协议一般有这几种方式进行分帧:

  • 每个连接只发送一个帧。简单,但是慢。
  • 在流中加入分隔符来分帧。简单,但是解析较慢(因为需要不断的读取,去寻找分隔符)
  • 计算帧的尺寸,并在每个帧之前发送尺寸。简单且快速。也是AMQP的选择

帧细节

帧头包括:帧类型、通道、尺寸。

帧尾包含错误检测信息。

处理一个帧的步骤:

  • 读帧头,检查帧类型和Channel
  • 根据帧类型,读取payload并处理
  • 读帧尾校验

在实现时,性能很重要的时候,我们会使用 read-ahead buffering 或者 gathering reads 去避免读帧时进行三次系统调用。

方法帧

处理方式:

  • 读取方法帧的payload
  • 解包为结构
  • 检查方法在当前上下文中是否允许
  • 检查参数是否有效
  • 执行方法。

方法帧是由AMQP数据字段组成。编码代码直接从协议规范中生成,速度非常快。

内容帧

内容是端到端直接发送的应用数据。内容由一系列属性和二进制数据组成。其中一系列的属性组成了 ”内容帧的帧头“。而二进制数据,可以是任意大小,它可能被拆分成多个块发送,每个块是一个 content-body帧

一些方法(比如 Basic.Publish,Basic.Deliver)是会携带内容的。一个内容帧的帧头如下结构:

这里把 content-body 作为单独的帧,这样就可以支持Zero-copy技术,这部分内容就不需要被编码。把内容属性放到自己的帧里,这样收件人就可以选择性的丢弃不想处理的内容。

通道与连接的关闭

对于客户端,只要发送了 Open 就认为连接和通道是打开的。对于服务端则是 Open-Ok 。如果一个节点想要关闭通道和连接必须要进行握手。

如果突然或者意外关闭,没办法立刻被检测到,可能会导致丢失返回值。所以需要在关闭之前进行握手。在一个节点发送 Close 后,另一个节点必须发送 Close-Ok 来回复。然后双方可以关闭通道或者连接。如果节点忽略了 Close 操作,当双方同时发送 Close 时,可能会导致死锁。

RabbitMQ概述

消息模型

RabbitMQ 支持多种消息模型,通过不同的交换机类型和队列配置来实现灵活的消息路由和传递机制。

以下是 RabbitMQ 支持的主要消息模型:

简单队列(Simple Queue)

  • 描述:这是最基本的消息模型,生产者将消息发送到队列,消费者从队列中接收消息。每条消息只会被一个消费者处理。
  • 适用场景:适用于简单的任务队列和点对点通信。

一个Producer发送消息到Queue中,一个Consumer从Queue读取消息并打印。

工作队列(Work Queue)

  • 描述:工作队列模型用于分发任务到多个消费者(即工作者)。它有助于提高任务处理的并发性和效率。
  • 特性:
    • 消息持久化:确保在 RabbitMQ 重启后任务不会丢失。
    • 公平调度:通过关闭自动确认,确保每个消费者在处理完任务并确认后才接收新任务。
  • 适用场景:适用于需要负载均衡的后台任务处理。

将消息分配给多个Consumer进行处理,可以避免在执行资源密集性任务时同步处理导致阻塞等待的问题,从而在一定程度上提升并行能力,通常称该类Consumer为Work,多个Work在后台接收分配到的任务并处理。

发布/订阅(Publish/Subscribe)

  • 描述:在这个模型中,消息被发送到一个交换机,然后广播到所有绑定的队列。每个绑定的队列可以有一个或多个消费者。
  • 特性:使用 Fanout Exchange:消息被广播到所有绑定的队列。
  • 适用场景:适用于需要将消息广播到多个消费者的场景,如日志广播、通知系统。

前两个场景都是把一个消息传递给一个Consumer/Worker,而这里的Publish/Subscribe需要把消息传递给多个Consumer。这里Producer不会将消息直接发送到队列,事实上,Producer也并不知道消息会传递给任何的Queue,而是将消息发送到一个Exchange上,Exchange的作用在于收到Producer的消息并推送给绑定的Queue,这样Exchange就将消息传递给绑定的Queues及其以对应的Consumer了,这里使用的Exchange是fanout,其实就是广播功能。Exchange通常分为四种:

  • fanout:该类型路由规则非常简单,会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,相当于广播功能
  • direct:该类型路由规则会将消息路由到binding key与routing key完全匹配的Queue中,在场景四中会用到
  • topic:与direct类型相似,只是规则没有那么严格,可以模糊匹配和多条件匹配,在场景五中会进一步解释
  • headers:该类型不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配

比如一个打印Log的系统,当所有接收Log的Consumer接收到消息后都可以打印Log。

路由(Routing)

  • 描述:路由模型允许根据路由键将消息发送到不同的队列。
  • 特性:使用 Direct Exchange:消息根据路由键发送到与之匹配的队列。
  • 适用场景:适用于需要将消息定向发送到特定消费者的场景,如按服务类型分发任务。

在上一场景中,只是一个简单的Log系统,相当于广播功能,更进一步,可以针对不同的日志发送到不同的Consumer进行不同的处理,比如有的写文件,有的打印控制台等,那么就需要定义Routing路由了。为了使用Routing功能,可以将Exchange定义为direct类型,需要设置routing_key绑定到Queue上,然后就可以发送消息了。

主题(Topics)

  • 描述:主题模型允许使用通配符进行复杂的路由。
  • 特性:使用 Topic Exchange:支持基于模式匹配的路由,路由键可以使用*(匹配一个词)和 #(匹配零个或多个词)作为通配符。
  • 适用场景:适用于需要复杂路由的场景,如多层级的日志记录、动态订阅。

在以上场景中,使用direct类型的Exchange可以实现对不同消息的路由,但只是支持单一的条件,为了支持多种条件,比如不仅针对Log的级别,还要针对其来源进行分发消息,这时候可以使用Topic来实现了,其中的逻辑和direct的类似,只是routing_key支持多个用点.分隔的值用于匹配路由信息,其中路由Key可以使用*替代一个词,#匹配0个或多个词。

RPC(Remote procedure call)

在前面的场景二中,能够实现使用Work Queues分发处理运算任务,但如果需要将任务发送到远程服务器上执行处理,然后等待返回运算结果呢?那就需要RPC远程过程回调了。这里描述的场景将和之前的完全不一样,需要构建一个Client和RPC Server,Client作为远程调用的发起者携带一些如reply_to,correlation_id等的特定信息发送请求到rpc_queue上,RPC Server接收到请求后执行处理函数并将运算结果返回给Callback Queue,Client就可以接收到对应correlationId的返回结果了。

存储机制

RabbitMQ 的存储层负责管理消息的持久化和队列的存储,以确保在系统重启或故障时不会丢失重要数据。RabbitMQ 使用了一种称为 “Message Store” 的机制来管理其存储需求。

存储机制

  • 持久化与非持久化消息
    • 持久化消息:在消息发布时,生产者可以将消息标记为持久化,这意味着消息会被写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。
    • 非持久化消息:这些消息仅存储在内存中,如果 RabbitMQ 重启,这些消息将会丢失。
  • 队列的持久化
    • 队列本身也可以被声明为持久化的。持久化队列的元数据会被存储在磁盘上,以确保即使在服务器重启后,队列依然存在。
    • 需要注意的是,持久化队列并不保证队列中的消息也是持久化的,消息的持久化需要单独指定。

存储结构

  • Message Store(消息存储)
    • RabbitMQ 的消息存储机制负责将持久化消息写入磁盘。它使用了一种高效的文件系统结构,通常包括一组持久化日志文件。
    • 消息存储是事务性的,这意味着消息的写入和读取操作是原子性的,能够确保数据的一致性。
  • Index Files(索引文件)
    • 为了快速访问消息,RabbitMQ 使用索引文件来记录消息在持久化日志文件中的位置。
    • 索引文件通常存储在内存中,以加速消息的查找和检索。
  • Queue Journal(队列日志)
    • RabbitMQ 使用队列日志来记录队列的状态变化和消息的生命周期事件,如消息的发布、传递和确认。
    • 队列日志有助于在系统故障后恢复队列的状态。

存储优化

  • 写优化:RabbitMQ 使用批量写入技术将多条消息一起写入磁盘,以减少磁盘 I/O 操作并提高写入性能。
  • 内存缓存:RabbitMQ 会在内存中缓存最近使用的消息,以减少磁盘读取,提高消息的处理速度。
  • 垃圾回收:为了管理磁盘空间,RabbitMQ 会定期进行垃圾回收,清除已经被确认处理的消息日志文件。

配置和管理

  • 持久化配置
    • 在声明队列和消息时,可以通过参数指定持久化选项。例如,在声明队列时使用durable=True 参数,在发布消息时使用 delivery_mode=2。
  • 磁盘使用监控
    • RabbitMQ 提供磁盘使用的监控功能,可以通过管理插件查看磁盘的使用情况。
    • 可以配置磁盘使用的阈值,以便在磁盘空间不足时触发警告或限制消息发布。
  • 备份与恢复
    • 为了确保数据安全,建议定期备份 RabbitMQ 的数据文件。
    • 在需要恢复时,可以通过恢复备份文件重建 RabbitMQ 的状态。

队列类型

RabbitMQ 提供了多种类型的队列,以满足不同的消息传递需求。每种队列类型都有其独特的特性和适用场景。

标准队列(Standard Queue)

  • 描述:这是 RabbitMQ 中最常用的队列类型,提供了基本的 FIFO(先进先出)消息传递功能。
  • 特性
    • 支持消息持久化。
    • 支持消息确认(acknowledgment)。
    • 支持消息优先级。
  • 适用场景:适用于大多数常规消息传递场景,如任务队列、工作队列等。

优先级队列(Priority Queue)

  • 描述:优先级队列允许为消息设置不同的优先级,从而决定消息的处理顺序。
  • 特性
    • 消息可以根据其优先级进行排序和传递。
    • 可以在声明队列时指定最大优先级。
  • 适用场景:适用于需要根据重要性或紧急程度处理消息的场景。

延迟队列(Delayed Queue)

  • 描述:延迟队列允许消息在指定的时间延迟后再进行投递。
  • 特性
    • 消息可以在发布时指定延迟时间。
    • 需要使用插件(如 rabbitmq_delayed_message_exchange)来实现。
  • 适用场景:适用于需要定时任务或延迟消息处理的场景。

惰性队列(Lazy Queue)

  • 描述:惰性队列设计用于处理非常大的队列,尽量将消息存储在磁盘上而不是内存中。
  • 特性
    • 优化内存使用,适合处理大量消息。
    • 减少内存占用,尤其是在消息堆积时。
  • 适用场景:适用于需要处理大量消息且希望降低内存消耗的场景。

队列镜像(Mirrored Queue)

  • 描述:镜像队列是 RabbitMQ 的高可用性解决方案,队列的内容会被复制到集群中的其他节点。
  • 特性
    • 提高队列的可用性和容错能力。
    • 在主节点故障时,自动切换到镜像节点。
  • 适用场景:适用于需要高可用性和容错能力的场景。

延伸队列(Quorum Queue)

  • 描述:延伸队列是一种现代化的高可用队列类型,基于 Raft 共识算法。
  • 特性
    • 提供更好的分布式一致性和高可用性。
    • 自动管理节点之间的复制和故障转移。
  • 适用场景:适用于需要强一致性和高可用性的企业级应用。

RabbitMQ的使用

使用 RabbitMQ 进行消息传递涉及几个关键步骤,包括安装、配置、编写生产者和消费者代码,以及管理和监控。

安装 RabbitMQ

安装 Erlang

RabbitMQ 是用 Erlang 编写的,因此首先需要安装 Erlang。

# Ubuntu/Debian:
sudo apt-get update
sudo apt-get install erlang
# CentOS/RHEL:
sudo yum install erlang

安装 RabbitMQ

# Ubuntu/Debian:
sudo apt-get update
sudo apt-get install rabbitmq-server
# CentOS/RHEL:
sudo yum install rabbitmq-server

启动和启用 RabbitMQ

# 启动 RabbitMQ 服务并设置开机自启:
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

配置 RabbitMQ

开启管理插件

RabbitMQ 提供了一个 Web 管理界面,可以通过以下命令启用:sudo rabbitmq-plugins enable rabbitmq_management

访问管理界面:http://<your-server-ip>:15672,默认用户名和密码是 guest/guest。

创建用户和权限

为了安全起见,建议创建一个新的用户并分配权限:

sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl set_user_tags myuser administrator
sudo rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"

编写生产者和消费者代码

安装客户端库

RabbitMQ 支持多种编程语言,这里以 Python 为例。

安装 pika 库:pip install pika

编写生产者代码

生产者负责发送消息到 RabbitMQ 交换机。

import pika

# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='hello')

# 发送消息
message = 'Hello, RabbitMQ!'
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

print(f" [x] Sent '{message}'")

# 关闭连接
connection.close()

编写消费者代码

消费者负责从队列中接收和处理消息。

import pika

# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='hello')

# 定义回调函数
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")

# 设置消费者
channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

运行生产者和消费者

  • 运行生产者 保存生产者代码为py,然后运行:python producer.py
  • 运行消费者 保存消费者代码为 py,然后运行:python consumer.py

管理和监控

使用 Web 管理界面

通过浏览器访问 http://<your-server-ip>:15672,登录后可以看到以下信息:

  • Overview:概览页面,显示服务器状态和统计信息。
  • Queues:队列列表,显示每个队列的状态和消息数。
  • Exchanges:交换机列表,显示每个交换机的类型和绑定。
  • Connections:连接列表,显示当前的连接情况。
  • Channels:通道列表,显示每个通道的状态和活动。

使用命令行工具

RabbitMQ 提供了一些命令行工具,用于管理和监控。

  • 查看队列状态:sudo rabbitmqctl list_queues
  • 查看连接状态:sudo rabbitmqctl list_connections
  • 查看通道状态:sudo rabbitmqctl list_channels

高级功能

复杂路由

RabbitMQ 支持多种交换机类型,可以实现复杂的路由逻辑。

  • Direct Exchange:基于路由键进行精确匹配。
  • Fanout Exchange:广播消息到所有绑定的队列。
  • Topic Exchange:基于模式匹配进行路由。
  • Headers Exchange:基于消息头进行路由。

消息持久化

确保消息在 RabbitMQ 重启后仍然存在。

  • 声明持久化队列:queue_declare(queue=’task_queue’, durable=True)
  • 发送持久化消息:basic_publish(exchange=”,routing_key=’task_queue’,body=message,properties=pika.BasicProperties(delivery_mode=2, # make message persistent))

消费者确认

确保消费者成功处理消息后再从队列中删除。

  • 设置手动确认:basic_consume(queue=’task_queue’,auto_ack=False,on_message_callback=callback)
  • 在回调函数中确认消息:
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

参考链接:

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注