器→工具, 工具软件, 术→技巧, 研发

分布式消息队列Pulsar

钱魏Way · · 6 次浏览

Pulsar简介

Apache Pulsar 是一个开源的分布式消息系统和流处理平台,设计用于高性能、可扩展和持久化的消息传递。它最初由 Yahoo 开发,后来在 2016 年捐赠给 Apache 软件基金会,并成为顶级项目。Pulsar 以其多租户支持、分层架构和灵活的消息模型而闻名。

核心特性

  • 分层架构
    • 服务层(Pulsar Broker):处理客户端的生产和消费请求,管理元数据和协调分布式任务。
    • 存储层(Apache BookKeeper):负责消息的持久化存储。通过将数据分段存储在多个 BookKeeper 服务器上,实现高可用性和水平扩展。
  • 多租户支持
    • Pulsar 原生支持多租户、命名空间和主题的管理。这使得在一个集群中可以安全地隔离不同应用和用户的资源。
  • 消息模型
    • 主题(Topic):基本的消息通道,支持持久化和非持久化模式。
    • 订阅(Subscription):支持多种订阅模式,包括独占、共享和失败转移,灵活处理消费模式。
  • 高可用性和水平扩展
    • Pulsar 的架构允许它通过添加更多的 Broker 和 BookKeeper 服务器来水平扩展。
    • 通过复制和分段存储,提供数据的高可用性和可靠性。
  • 多协议支持
    • 除了原生的 Pulsar 协议,Pulsar 还支持 Kafka 协议(通过 Pulsar 的 Kafka on Pulsar 模块)和其他协议(如 AMQP、MQTT)。
  • 延迟消息和定时消息
    • Pulsar 支持延迟消息投递和定时消息投递,提供了灵活的消息调度功能。
  • 流处理
    • Pulsar Functions 提供轻量级的流处理框架,可以直接在 Pulsar 集群中处理消息流。
    • 支持与 Apache Flink、Apache Storm 等流处理框架集成。

优势

  • 多租户和命名空间:适合企业环境中不同团队和应用共享同一集群。
  • 自动负载均衡:通过自动分区和负载均衡减少运维开销。
  • 低延迟和高吞吐量:在高并发场景下提供高性能。
  • 灵活的部署选项:支持本地部署、云部署以及 Kubernetes 集成。

用例

  • 实时分析:用于实时数据流处理和分析,如点击流、传感器数据和金融交易。
  • 日志聚合:集中收集和处理分布式系统中的日志数据。
  • 事件流处理:支持复杂事件处理和实时监控。
  • 消息队列:作为传统消息队列系统的替代方案,支持微服务通信和异步处理。

生态系统和工具

  • Pulsar Functions:轻量级的函数计算框架,用于处理和转换消息流。
  • Pulsar SQL:通过 SQL 查询对 Pulsar 中的数据进行分析。
  • Pulsar Manager:提供 Web 界面进行集群管理和监控。
  • Pulsar IO:用于与外部系统集成的连接器框架。

Pulsar的应用

  • 作为普通的Pub-Sub模型的消息队列使用
  • 支持Function(Stream),整合到Stream平台

相关概念和术语

构建 Pulsar 的目的是为了支持多租户(multi-tenant)应用场景。Pulsar 的多租户机制包含了两种资源:资产(property)和命名空间(namespace)。资产代表系统里的租户。假设有一个 Pulsar 集群用于支持多个应用程序(就像 Yahoo 那样),集群里的每个资产可以代表一个组织的团队、一个核心的功能或一个产品线。一个资产可以包含多个命名空间,一个命名空间可以包含任意个主题。

命名空间是 Pulsar 最基本的管理单元。在命名空间层面,我们可以设置权限、调整复制选项、管理跨集群的数据复制、控制消息的过期时间或执行其他关键任务。命名空间里的主题会继承命名空间的配置,所以我们可以一次性对同一个命名空间内的所有主题进行配置。命名空间可以分为两种:

  • 本地(local)——本地命名空间只在集群内可见。
  • 全局(global)——命名空间对多个集群可见,可以是同一个数据中心内的集群,也可以是跨地域数据中心的集群。该功能取决于是否启用了集群复制功能。

虽然本地命名空间和全局命名空间的作用域不同,但它们都可以在不同的团队或不同的组织内共享。如果应用程序获得了命名空间的写入权限,就可以往该命名空间内的所有主题写入数据。如果写入的主题不存在,就会创建该主题。

每个命名空间可以包含一到多个主题,每个主题可以有多个订阅者,每个订阅者可以接收所有发布到该主题的消息。为了给应用程序提供更大的灵活性,Pulsar 提供了三种订阅类型,它们可以共存在同一个主题上:

  • 独享(exclusive)订阅——同时只能有一个消费者。
  • 共享(shared)订阅——可以由多个消费者订阅,每个消费者接收其中的一部分消息。
  • 失效备援(failover)订阅——允许多个消费者连接到同一个主题上,但只有一个消费者能够接收消息。只有在当前消费者发生失效时,其他消费者才开始接收消息。

Pulsar 的订阅机制解耦了消息的生产者和消费者,在不增加复杂性和开发工作量的情况下为应用程序提供了更大的弹性。

数据分区

写入主题的数据可能只有几个 MB,也有可能是几个 TB。所以,在某些情况下主题的吞吐量很低,有时候又很高,完全取决于消费者的数量。那么碰到有些主题吞吐量很高而有些又很低的情况该怎么处理?为了解决这个问题,Pulsar 将一个主题的数据分布到多台机器上,也就是所谓的分区。

在处理海量数据时,为了保证高吞吐量,分区是一种很常见的手段。默认情况下,Pulsar 的主题是不进行分区的,但通过命令行工具或 API 可以很容易地创建分区主题,并指定分区的数量。

在创建好分区主题之后,Pulsar 可以自动对数据进行分区,不会影响到生产者和消费者。也就是说,一个应用程序向一个主题写入数据,对主题分区之后,不需要修改应用程序的代码。分区只是一个运维操作,应用程序不需要关心分区是如何进行的。

主题的分区操作由一个叫作 broker 的进程来处理,Pulsar 集群里的每个节点都会运行自己的 broker。

主题分区不会影响到应用程序,除此之外,Pulsar 还提供了几种消息路由策略,帮助我们更好地跨分区、跨消费者分布数据。

  • 单个分区——生产者随机挑选一个分区,并将数据写入该分区。该策略与非分区主题提供的保证是一样的,不过如果有多个生产者向同一个主题写入数据,该策略就会很有用。
  • 轮询(round robin)分区——生产者通过轮询的方式将数据平均地分布到各个分区上。比如,第一个消息写入第一个分区,第二个消息写入第二个分区,并以此类推。
  • 哈希(hash)分区——每个消息会带上一个键,要写入哪个分区取决于它所带的键。这种分区方式可以保证次序。
  • 自定义分区——生产者使用自定义函数生成分区对应的数值,然后根据这个数值将消息写入对应的分区。

持久性

Pulsar broker 在收到消息并进行确认之后,就必须确保消息在任何情况下都不会丢失。与其他消息系统不同的是,Pulsar 使用 Apache BookKeeper 来保证持久性。BookKeeper 提供了低延迟的持久化存储。Pulsar 在收到消息之后,将消息发送给多个 BookKeeper 节点(具体由复制系数来定),节点将数据写入预写式日志(write ahead log),同时在内存里也保存一份。节点在对消息进行确认之前,强制将日志写入到持久化的存储上,因此即使出现电力故障,数据也不会丢失。因为 Pulsar broker 将数据发给了多个节点,所以只会在大多数节点(quorum)确认写入成功之后它才会将确认消息发给生产者。Pulsar 就是通过这种方式来保证即使在出现了硬件故障、网络故障或其他故障的情况下仍然能够保证数据不丢失。

Pulsar的架构

Pulsar采用“存储和服务分离”的两层架构(这是Pulsar区别于其他MQ系统最重要的一点,也是所谓的“下一代消息系统”的核心):

  • Broker:提供发布和订阅的服务(Pulsar的组件)
  • Bookie:提供存储能力(BookKeeper的存储组件)

优势是Broker成为了stateless的组件,可以水平扩容。高可靠,一致性等通过BookKeeper去保证。

上图是Pulsar Cluster的架构:

  • 采用ZooKeeper存储元数据,集群配置,作为coordination
    • local zk负责Pulsar Cluster内部的配置等
    • global zk则用于Pulsar Cluster之间的数据复制等
  • 采用Bookie作为存储设备(大多数MQ系统都采用本地磁盘或者DB作为存储设备)
  • Broker负责负载均衡和消息的读取、写入等
  • Global replicators负责集群间的数据复制

Apache Pulsar和其他消息系统最根本的不同是采用分层架构。 Apache Pulsar集群由两层组成:无状态服务层,由一组接收和传递消息的Broker组成;以及一个有状态持久层,由一组名为bookies的Apache BookKeeper存储节点组成,可持久化地存储消息。

在Pulsar客户端中提供生产者和消费者(Producer & Consumer)接口,应用程序使用Pulsar客户端连接到Broker来发布和消费消息。Pulsar客户端不直接与存储层Apache BookKeeper交互。 客户端也没有直接的Zookeeper访问权限。这种隔离,为Pulsar实现安全的多租户统一身份验证模型提供了基础。

Apache Pulsar为客户端提供多种语言的支持,包括Java,C ++,Python,Go和Websockets。Apache Pulsar还提供了一组兼容Kafka的API,用户可以通过简单地更新依赖关系并将客户端指向Pulsar集群来迁移现有的Kafka应用程序,这样现有的Kafka应用程序可以立即与Apache Pulsar一起使用,无需更改任何代码。

BookKeeper层:持久化存储层

Apache BookKeeper是Apache Pulsar的持久化存储层。 Apache Pulsar中的每个主题分区本质上都是存储在Apache BookKeeper中的分布式日志。每个分布式日志又被分为Segment分段。 每个Segment分段作为Apache BookKeeper中的一个Ledger,均匀分布并存储在BookKeeper群集中的多个Bookie(Apache BookKeeper的存储节点)中。

Segment的创建时机包括以下几种:基于配置的Segment大小;基于配置的滚动时间;或者当Segment的所有者被切换。通过Segment分段的方式,主题分区中的消息可以均匀和平衡地分布在群集中的所有Bookie中。 这意味着主题分区的大小不仅受一个节点容量的限制; 相反,它可以扩展到整个BookKeeper集群的总容量。下面的图说明了一个分为x个Segment段的主题分区。 每个Segment段存储3个副本。 所有Segment都分布并存储在4个Bookie中。

Segment为中心的存储

存储服务的分层的架构 和 以Segment为中心的存储 是Apache Pulsar(使用Apache BookKeeper)的两个关键设计理念。 这两个基础为Pulsar提供了许多重要的好处:

  • 无限制的主题分区存储
  • 即时扩展,无需数据迁移
    • 无缝Broker故障恢复
    • 无缝集群扩展
    • 无缝的存储(Bookie)故障恢复
  • 独立的可扩展性

无限制的主题分区存储

由于主题分区被分割成Segment并在Apache BookKeeper中以分布式方式存储,因此主题分区的容量不受任何单一节点容量的限制。 相反,主题分区可以扩展到整个BookKeeper集群的总容量,只需添加Bookie节点即可扩展集群容量。 这是Apache Pulsar支持存储无限大小的流数据,并能够以高效,分布式方式处理数据的关键。 使用Apache BookKeeper的分布式日志存储,对于统一消息服务和存储至关重要。

即时扩展,无需数据迁移

由于消息服务和消息存储分为两层,因此将主题分区从一个Broker移动到另一个Broker几乎可以瞬时内完成,而无需任何数据重新平衡(将数据从一个节点重新复制到另一个节点)。 这一特性对于高可用的许多方面至关重要,例如集群扩展;对Broker和Bookie失败的快速应对。 我将使用例子在下文更详细地进行解释。

无缝Broker故障恢复

下图说明了Pulsar如何处理Broker失败的示例。 在例子中Broker 2因某种原因(例如停电)而断开。 Pulsar检测到Broker 2已关闭,并立即将Topic1-Part2的所有权从Broker 2转移到Broker 3。在Pulsar中数据存储和数据服务分离,所以当代理3接管Topic1-Part2的所有权时,它不需要复制Partiton的数据。 如果有新数据到来,它立即附加并存储为Topic1-Part2中的Segment x + 1。 Segment x + 1被分发并存储在Bookie1, 2和4上。因为它不需要重新复制数据,所以所有权转移立即发生而不会牺牲主题分区的可用性。

无缝集群容量扩展

下图说明了Pulsar如何处理集群的容量扩展。 当Broker 2将消息写入Topic1-Part2的Segment X时,将Bookie X和Bookie Y添加到集群中。 Broker 2立即发现新加入的Bookies X和Y。然后Broker将尝试将Segment X + 1和X + 2的消息存储到新添加的Bookie中。 新增加的Bookie立刻被使用起来,流量立即增加,而不会重新复制任何数据。 除了机架感知和区域感知策略之外,Apache BookKeeper还提供资源感知的放置策略,以确保流量在群集中的所有存储节点之间保持平衡。

无缝的存储(Bookie)故障恢复

下图说明了Pulsar(通过Apache BookKeeper)如何处理bookie的磁盘故障。 这里有一个磁盘故障导致存储在bookie 2上的Segment 4被破坏。Apache BookKeeper后台会检测到这个错误并进行复制修复。

Apache BookKeeper中的副本修复是Segment(甚至是Entry)级别的多对多快速修复,这比重新复制整个主题分区要精细,只会复制必须的数据。 这意味着Apache BookKeeper可以从bookie 3和bookie 4读取Segment 4中的消息,并在bookie 1处修复Segment 4。所有的副本修复都在后台进行,对Broker和应用透明。

即使有Bookie节点出错的情况发生时,通过添加新的可用的Bookie来替换失败的Bookie,所有Broker都可以继续接受写入,而不会牺牲主题分区的可用性。

独立的可扩展性

由于消息服务层和持久存储层是分开的,因此Apache Pulsar可以独立地扩展存储层和服务层。这种独立的扩展,更具成本效益:

当您需要支持更多的消费者或生产者时,您可以简单地添加更多的Broker。主题分区将立即在Brokers中做平衡迁移,一些主题分区的所有权立即转移到新的Broker。

当您需要更多存储空间来将消息保存更长时间时,您只需添加更多Bookie。通过智能资源感知和数据放置,流量将自动切换到新的Bookie中。 Apache Pulsar中不会涉及到不必要的数据搬迁,不会将旧数据从现有存储节点重新复制到新存储节点。

GEO-REPLICATOIN

多个Broker节点组成一个Pulsar Cluster;多个Pulsar Cluster组成一个Pulsar Instance。

Pulsar通过GEO-REPLICATION支持一个Instance内在不同的地域发送和消费消息。

上图中,Producer P1、P2、P3在不同的Cluster发送给Topic T1的消息,会在Cluster之间进行复制,Consumer C1、C2可以在自己所在的Cluster消费到所有的消息。

当消息被写入Pulsar时,首先消息被持久化在local cluster,之后异步的发送到其他cluster。在没有链接问题的情况下,通常复制的latency相近于网络的RTT。

Pulsar与Kafka的对比

Apache Pulsar 和 Apache Kafka 是两种流行的分布式消息系统和流处理平台。它们都用于处理实时数据流,但在架构、功能和使用场景上有一些显著的区别。

以下是对 Pulsar 和 Kafka 的详细对比:

架构

  • Apache Kafka
    • 单层架构:Kafka 使用一个单层的分布式日志系统,集成了消息存储和消费的功能。
    • Broker-centric:数据存储在 Kafka broker 上,消费和生产通过 broker 进行。
    • 分区和副本:通过分区和副本实现高可用性和可扩展性。
  • Apache Pulsar
    • 分层架构:Pulsar 使用分层架构,分为存储层(BookKeeper)和服务层(Pulsar broker)。
    • Segmented Storage:Pulsar 使用 Apache BookKeeper 来管理消息的持久化存储,支持水平扩展和高可用性。
    • 多租户支持:原生支持多租户和命名空间管理。

性能和扩展性

  • Apache Kafka
    • 高吞吐量:Kafka 以其高吞吐量和低延迟著称,非常适合大规模数据流处理。
    • 分区限制:Kafka 的性能可能受到分区数量的限制,因为每个分区需要单独的文件句柄和线程。
  • Apache Pulsar
    • 水平扩展:通过 BookKeeper 的分段存储,Pulsar 可以轻松扩展到数千个主题和分区。
    • 低延迟:在高并发场景下,Pulsar 能够保持低延迟。

功能特性

  • Apache Kafka
    • 生态系统丰富:拥有强大的生态系统,包括 Kafka Streams 和 ksqlDB,用于流处理。
    • 强大的社区支持:Kafka 社区活跃,有大量的工具和第三方支持。
  • Apache Pulsar
    • 多租户和命名空间:支持多租户和命名空间管理,适合复杂的企业环境。
    • 原生多主题:支持多主题订阅和复杂的消息路由。
    • 即发即弃模式:支持消息的即发即弃(非持久化)和持久化存储。

易用性和管理

  • Apache Kafka
    • 成熟性:作为一个成熟的项目,Kafka 的安装和使用文档丰富,易于上手。
    • 管理工具:提供多种管理和监控工具,但需要对分区和副本进行细致的配置和管理。
  • Apache Pulsar
    • 自动分区管理:Pulsar 可以自动管理分区和负载均衡,减少了运维负担。
    • UI 管理工具:提供 Pulsar Manager 和 Pulsar Dashboard 进行可视化管理。

使用场景

  • Apache Kafka:适用于需要高吞吐量和低延迟的场景,如日志聚合、事件流处理和实时分析。
  • Apache Pulsar:适合需要多租户支持、复杂消息路由和水平扩展的企业级应用场景。

Pulsar应用实例

实时数仓

首先看一个典型的、简化的实时数仓场景:给定业务库中全量商品的订单表,统计截止到当前的各个商品的订单总量。

这里面有两点需要注意:

  • 订单表中有订单状态,在统计订单量的时候需要过滤掉无效订单。
  • 订单状态随时可能发生变化。

上面两点给实时数仓的开发带来了很大的复杂性。源头的业务库中的数据可变,在实时流处理的时候需要考虑到这种变化,并对实时计算结果进行调整。

输入输出样例如下图所示:

上图左边是业务库中订单粒度的原始表,我们期望聚合成右边的以商品为粒度的商品总订单数的统计表。

另外,为了不影响线上业务,不允许直接查询线上业务库得到结果,需要以业务库为数据源建立数据仓库来支持数据分析需求。

当然,有很多成熟的方案可以解决这个问题。例如经典的 Lambda 架构,其核心思想是分为离线和实时两条链路:离线链路计算历史数据,实时链路计算当日数据。最后把历史数据和当日数据 merge 起来。如下图所示:

Lambda 架构是比较成熟的方案,但也存在一些问题,如下:

  • 同时维护离线、实时链路,链路复杂,资源消耗大,维护成本高。
  • 对于部分订单状态发生变化的情况,难以很好处理。例如历史订单在当日(今日)发生了失效,状态从有效变为了无效,这时处理起来会有一些复杂性,需要考虑对离线历史数据的实时调整。
  • 离线计算和实时计算结果需要 merge,需要精确把握时间点,离线和实时的计算结果的时间范围需要做到不重、不漏。
  • 对于需要从多个源表获取数据,且多个源表的字段值有可能发生变化的情况,则更为复杂。这里限于篇幅,不展开讲了。感兴趣的读者可以构造一些情况来推演一下相关的处理逻辑,会发现里面确实会有许多复杂的情况,涉及到流 join、数据的消费顺序等。可以梳理一下其中遇到的问题。

除了 Lambda 架构,还有另一个方案基于 upsert 离线表(如 Hudi 表)的计算。其核心思想是在 Hudi 表中近实时同步业务库中的数据(通过消费 binlog 数据),在 Hudi 表(相当于一个订单粒度的近实时表)的基础上,每隔一段时间(如 15 分钟)按照离线链路聚合数据的方式全量计算一次聚合结果,并将生成的结果同步到 OLAP 引擎中供查询。聚合计算的源头 Hudi 表是近实时更新的,聚合计算过程是近实时触发的,因此 OLAP 引擎中的结果表的时效性也是近实时的。这个方案的数据处理链路如下图所示:

这个方案的一个好处是,复用离线数据开发的逻辑到 Hudi 表的近实时全量计算逻辑中,以较低的成本来实现近实时的统计分析,但也会有一些问题,列举如下:

  • 需要较高频率的离线全量计算,消耗计算资源。
  • 对离线存储资源仍有消耗。
  • 不是纯实时(秒级)更新,而是一个近实时的过程。

针对以上实时数仓的场景, Pulsar 具备解决方案。具体来说,线上业务库的订单表输出 binlog 到 Pulsar 消息队列中。这个消息队列有全量的数据,其中冷数据可以 offload 到对象存储中。接下来可以使用 Pulsar SQL 每 15 分钟针对 Pulsar 中的全量数据计算一次聚合结果,并将计算结果写入 OLAP 引擎中供查询。这个方案类似于上面提到的 Hudi 方案,不同之处在于利用了 Pulsar SQL,相当于可以直接去查询消息队列中存储的数据。

整个计算链路如下图所示:

好处是:

  • 可以利用 Pulsar 的分级存储特性,将冷数据写入对象存储。
  • Pulsar 消息队列的存储,既可以作为中间数据的存储,也可以作为离线 ODS 层数据的存储,节省存储资源,链路简化。Pulsar 的分级存储和 Pulsar SQL 等特性使得直接在消息队列存储中做计算成为可能,进而简化数据处理链路。

通过上面的讨论,我们看到了在火山引擎 EMR 中,可以将其中的一些大数据组件和 Pulsar 结合起来使用,解决实时数仓开发中的一些问题。

批流一体

埋点日志数据存在实时处理和离线处理的需求:

  • 离线链路:用于天级报表、离线训练数据等场景。
  • 实时链路:用于实时分析、推荐等场景。

一个经典方案,类似于上文提到的 Lambda 架构,需要维护离线和实时两套数据链路,如下图所示:

这样的方案在实施上比较成熟,但是占用资源较多,维护成本较高。

而基于 Pulsar 也可以有一类方案,聚焦在实时链路。埋点日志数据上报到 Pulsar 中,用实时任务去写下游的 DWD 和 DWS 层(到 Pulsar 中)。整个 Pulsar 的实时链路也支持数据 offload 到对象存储。数据也可以直接写到 OLAP 层。如果有离线数据计算的需求,可以用 Pulsar SQL 直接对接 Pulsar 中存储的数据。整个数据链路如下图所示:

  • 基于 Pulsar 的分级存储和 Pulsar SQL 等特性,可以直接把 Pulsar 中的数据作为离线链路的 ODS 层。
  • Pulsar 的下游可以直接对接实时处理逻辑。
    • 若基于 Pulsar 中的原始日志数据,建立实时数仓,实时计算 ODS 层数据生成 DWD 层数据到 Pulsar topic 中,则 Pulsar topic 中的 DWD 层数据可以同时直接用于后续的离线计算和实时处理。
    • DWS 层同理。

参考链接:

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注