器→工具, 工具软件, 术→技巧, 研发

分布式消息队列Pulsar

钱魏Way · · 148 次浏览
!文章内容如有错误或排版问题,请提交反馈,非常感谢!

Pulsar简介

Apache Pulsar是一个开源的分布式消息系统和流处理平台,设计用于高性能、可扩展和持久化的消息传递。它最初由 Yahoo开发,后来在2016年捐赠给 Apache软件基金会,并成为顶级项目。Pulsar以其多租户支持、分层架构和灵活的消息模型而闻名。

核心特性

  • 分层架构
    • 服务层(Pulsar Broker):处理客户端的生产和消费请求,管理元数据和协调分布式任务。
    • 存储层(Apache BookKeeper):负责消息的持久化存储。通过将数据分段存储在多个 BookKeeper服务器上,实现高可用性和水平扩展。
  • 多租户支持
    • Pulsar原生支持多租户、命名空间和主题的管理。这使得在一个集群中可以安全地隔离不同应用和用户的资源。
  • 消息模型
    • 主题(Topic):基本的消息通道,支持持久化和非持久化模式。
    • 订阅(Subscription):支持多种订阅模式,包括独占、共享和失败转移,灵活处理消费模式。
  • 高可用性和水平扩展
    • Pulsar的架构允许它通过添加更多的 Broker和 BookKeeper服务器来水平扩展。
    • 通过复制和分段存储,提供数据的高可用性和可靠性。
  • 多协议支持
    • 除了原生的 Pulsar协议,Pulsar还支持 Kafka协议(通过 Pulsar的 Kafka on Pulsar模块)和其他协议(如 AMQP、MQTT)。
  • 延迟消息和定时消息
    • Pulsar支持延迟消息投递和定时消息投递,提供了灵活的消息调度功能。
  • 流处理
    • Pulsar Functions提供轻量级的流处理框架,可以直接在 Pulsar集群中处理消息流。
    • 支持与 Apache Flink、Apache Storm等流处理框架集成。

优势

  • 多租户和命名空间:适合企业环境中不同团队和应用共享同一集群。
  • 自动负载均衡:通过自动分区和负载均衡减少运维开销。
  • 低延迟和高吞吐量:在高并发场景下提供高性能。
  • 灵活的部署选项:支持本地部署、云部署以及 Kubernetes集成。

用例

  • 实时分析:用于实时数据流处理和分析,如点击流、传感器数据和金融交易。
  • 日志聚合:集中收集和处理分布式系统中的日志数据。
  • 事件流处理:支持复杂事件处理和实时监控。
  • 消息队列:作为传统消息队列系统的替代方案,支持微服务通信和异步处理。

生态系统和工具

  • Pulsar Functions:轻量级的函数计算框架,用于处理和转换消息流。
  • Pulsar SQL:通过 SQL查询对 Pulsar中的数据进行分析。
  • Pulsar Manager:提供 Web界面进行集群管理和监控。
  • Pulsar IO:用于与外部系统集成的连接器框架。

Pulsar的应用

  • 作为普通的 Pub-Sub模型的消息队列使用
  • 支持 Function(Stream),整合到 Stream平台

相关概念和术语

构建 Pulsar的目的是为了支持多租户(multi-tenant)应用场景。Pulsar的多租户机制包含了两种资源:资产(property)和命名空间(namespace)。资产代表系统里的租户。假设有一个 Pulsar集群用于支持多个应用程序(就像 Yahoo那样),集群里的每个资产可以代表一个组织的团队、一个核心的功能或一个产品线。一个资产可以包含多个命名空间,一个命名空间可以包含任意个主题。

命名空间是 Pulsar最基本的管理单元。在命名空间层面,我们可以设置权限、调整复制选项、管理跨集群的数据复制、控制消息的过期时间或执行其他关键任务。命名空间里的主题会继承命名空间的配置,所以我们可以一次性对同一个命名空间内的所有主题进行配置。命名空间可以分为两种:

  • 本地(local)——本地命名空间只在集群内可见。
  • 全局(global)——命名空间对多个集群可见,可以是同一个数据中心内的集群,也可以是跨地域数据中心的集群。该功能取决于是否启用了集群复制功能。

虽然本地命名空间和全局命名空间的作用域不同,但它们都可以在不同的团队或不同的组织内共享。如果应用程序获得了命名空间的写入权限,就可以往该命名空间内的所有主题写入数据。如果写入的主题不存在,就会创建该主题。

每个命名空间可以包含一到多个主题,每个主题可以有多个订阅者,每个订阅者可以接收所有发布到该主题的消息。为了给应用程序提供更大的灵活性,Pulsar提供了三种订阅类型,它们可以共存在同一个主题上:

  • 独享(exclusive)订阅——同时只能有一个消费者。
  • 共享(shared)订阅——可以由多个消费者订阅,每个消费者接收其中的一部分消息。
  • 失效备援(failover)订阅——允许多个消费者连接到同一个主题上,但只有一个消费者能够接收消息。只有在当前消费者发生失效时,其他消费者才开始接收消息。

Pulsar的订阅机制解耦了消息的生产者和消费者,在不增加复杂性和开发工作量的情况下为应用程序提供了更大的弹性。

数据分区

写入主题的数据可能只有几个 MB,也有可能是几个 TB。所以,在某些情况下主题的吞吐量很低,有时候又很高,完全取决于消费者的数量。那么碰到有些主题吞吐量很高而有些又很低的情况该怎么处理?为了解决这个问题,Pulsar 将一个主题的数据分布到多台机器上,也就是所谓的分区。

在处理海量数据时,为了保证高吞吐量,分区是一种很常见的手段。默认情况下,Pulsar 的主题是不进行分区的,但通过命令行工具或 API 可以很容易地创建分区主题,并指定分区的数量。

在创建好分区主题之后,Pulsar 可以自动对数据进行分区,不会影响到生产者和消费者。也就是说,一个应用程序向一个主题写入数据,对主题分区之后,不需要修改应用程序的代码。分区只是一个运维操作,应用程序不需要关心分区是如何进行的。

主题的分区操作由一个叫作 broker 的进程来处理,Pulsar 集群里的每个节点都会运行自己的 broker。

主题分区不会影响到应用程序,除此之外,Pulsar 还提供了几种消息路由策略,帮助我们更好地跨分区、跨消费者分布数据。

  • 单个分区——生产者随机挑选一个分区,并将数据写入该分区。该策略与非分区主题提供的保证是一样的,不过如果有多个生产者向同一个主题写入数据,该策略就会很有用。
  • 轮询(round robin)分区——生产者通过轮询的方式将数据平均地分布到各个分区上。比如,第一个消息写入第一个分区,第二个消息写入第二个分区,并以此类推。
  • 哈希(hash)分区——每个消息会带上一个键,要写入哪个分区取决于它所带的键。这种分区方式可以保证次序。
  • 自定义分区——生产者使用自定义函数生成分区对应的数值,然后根据这个数值将消息写入对应的分区。

持久性

Pulsar broker 在收到消息并进行确认之后,就必须确保消息在任何情况下都不会丢失。与其他消息系统不同的是,Pulsar 使用 Apache BookKeeper 来保证持久性。BookKeeper 提供了低延迟的持久化存储。Pulsar 在收到消息之后,将消息发送给多个 BookKeeper 节点(具体由复制系数来定),节点将数据写入预写式日志(write ahead log),同时在内存里也保存一份。节点在对消息进行确认之前,强制将日志写入到持久化的存储上,因此即使出现电力故障,数据也不会丢失。因为 Pulsar broker 将数据发给了多个节点,所以只会在大多数节点(quorum)确认写入成功之后它才会将确认消息发给生产者。Pulsar 就是通过这种方式来保证即使在出现了硬件故障、网络故障或其他故障的情况下仍然能够保证数据不丢失。

Pulsar 的架构

Pulsar 采用“存储和服务分离”的两层架构(这是 Pulsar 区别于其他 MQ 系统最重要的一点,也是所谓的“下一代消息系统”的核心):

  • Broker:提供发布和订阅的服务(Pulsar 的组件)
  • Bookie:提供存储能力(BookKeeper 的存储组件)

优势是 Broker 成为了 stateless 的组件,可以水平扩容。高可靠,一致性等通过 BookKeeper 去保证。

上图是 Pulsar Cluster 的架构:

  • 采用 ZooKeeper 存储元数据,集群配置,作为 coordination
    • local zk 负责 Pulsar Cluster 内部的配置等
    • global zk 则用于 Pulsar Cluster 之间的数据复制等
  • 采用 Bookie 作为存储设备(大多数 MQ 系统都采用本地磁盘或者 DB 作为存储设备)
  • Broker 负责负载均衡和消息的读取、写入等
  • Global replicators 负责集群间的数据复制

Apache Pulsar 和其他消息系统最根本的不同是采用分层架构。Apache Pulsar 集群由两层组成:无状态服务层,由一组接收和传递消息的 Broker 组成;以及一个有状态持久层,由一组名为 bookies 的 Apache BookKeeper 存储节点组成,可持久化地存储消息。

在 Pulsar 客户端中提供生产者和消费者(Producer & Consumer)接口,应用程序使用 Pulsar 客户端连接到 Broker 来发布和消费消息。Pulsar 客户端不直接与存储层 Apache BookKeeper 交互。客户端也没有直接的 Zookeeper 访问权限。这种隔离,为 Pulsar 实现安全的多租户统一身份验证模型提供了基础。

Apache Pulsar 为客户端提供多种语言的支持,包括 Java,C++,Python,Go 和 Websockets。Apache Pulsar 还提供了一组兼容 Kafka 的 API,用户可以通过简单地更新依赖关系并将客户端指向 Pulsar 集群来迁移现有的 Kafka 应用程序,这样现有的 Kafka 应用程序可以立即与 Apache Pulsar 一起使用,无需更改任何代码。

BookKeeper 层:持久化存储层

Apache BookKeeper 是 Apache Pulsar 的持久化存储层。Apache Pulsar 中的每个主题分区本质上都是存储在 Apache BookKeeper 中的分布式日志。每个分布式日志又被分为 Segment 分段。每个 Segment 分段作为 Apache BookKeeper 中的一个 Ledger,均匀分布并存储在 BookKeeper 群集中的多个 Bookie(Apache BookKeeper 的存储节点)中。

Segment 的创建时机包括以下几种:基于配置的 Segment 大小;基于配置的滚动时间;或者当 Segment 的所有者被切换。通过 Segment 分段的方式,主题分区中的消息可以均匀和平衡地分布在群集中的所有 Bookie 中。这意味着主题分区的大小不仅受一个节点容量的限制;相反,它可以扩展到整个 BookKeeper 集群的总容量。下面的图说明了一个分为 x 个 Segment 段的主题分区。每个 Segment 段存储 3 个副本。所有 Segment 都分布并存储在 4 个 Bookie 中。

Segment 为中心的存储

存储服务的分层的架构和以 Segment 为中心的存储是 Apache Pulsar(使用 Apache BookKeeper)的两个关键设计理念。这两个基础为 Pulsar 提供了许多重要的好处:

  • 无限制的主题分区存储
  • 即时扩展,无需数据迁移
    • 无缝 Broker 故障恢复
    • 无缝集群扩展
    • 无缝的存储(Bookie)故障恢复
  • 独立的可扩展性

无限制的主题分区存储

由于主题分区被分割成 Segment 并在 Apache BookKeeper 中以分布式方式存储,因此主题分区的容量不受任何单一节点容量的限制。相反,主题分区可以扩展到整个 BookKeeper 集群的总容量,只需添加 Bookie 节点即可扩展集群容量。这是 Apache Pulsar 支持存储无限大小的流数据,并能够以高效,分布式方式处理数据的关键。使用 Apache BookKeeper 的分布式日志存储,对于统一消息服务和存储至关重要。

即时扩展,无需数据迁移

由于消息服务和消息存储分为两层,因此将主题分区从一个 Broker 移动到另一个 Broker 几乎可以瞬时内完成,而无需任何数据重新平衡(将数据从一个节点重新复制到另一个节点)。这一特性对于高可用的许多方面至关重要,例如集群扩展;对 Broker 和 Bookie 失败的快速应对。我将使用例子在下文更详细地进行解释。

无缝 Broker 故障恢复

下图说明了 Pulsar 如何处理 Broker 失败的示例。在例子中 Broker 2 因某种原因(例如停电)而断开。Pulsar 检测到 Broker 2 已关闭,并立即将 Topic1-Part2 的所有权从 Broker 2 转移到 Broker 3。在 Pulsar 中数据存储和数据服务分离,所以当代理 3 接管 Topic1-Part2 的所有权时,它不需要复制 Partiton 的数据。如果有新数据到来,它立即附加并存储为 Topic1-Part2 中的 Segment x+1。Segment x+1 被分发并存储在 Bookie 1, 2 和 4 上。因为它不需要重新复制数据,所以所有权转移立即发生而不会牺牲主题分区的可用性。

无缝集群容量扩展

下图说明了 Pulsar 如何处理集群的容量扩展。当 Broker 2 将消息写入 Topic1-Part2 的 Segment X 时,将 Bookie X 和 Bookie Y 添加到集群中。Broker 2 立即发现新加入的 Bookies X 和 Y。然后 Broker 将尝试将 Segment X+1 和 X+2 的消息存储到新添加的 Bookie 中。新增加的 Bookie 立刻被使用起来,流量立即增加,而不会重新复制任何数据。除了机架感知和区域感知策略之外,Apache BookKeeper 还提供资源感知的放置策略,以确保流量在群集中的所有存储节点之间保持平衡。

无缝的存储(Bookie)故障恢复

下图说明了 Pulsar(通过 Apache BookKeeper)如何处理 bookie 的磁盘故障。这里有一个磁盘故障导致存储在 bookie 2 上的 Segment 4 被破坏。Apache BookKeeper 后台会检测到这个错误并进行复制修复。

Apache BookKeeper 中的副本修复是 Segment(甚至是 Entry)级别的多对多快速修复,这比重新复制整个主题分区要精细,只会复制必须的数据。这意味着 Apache BookKeeper 可以从 bookie 3 和 bookie 4 读取 Segment 4 中的消息,并在 bookie 1 处修复 Segment 4。所有的副本修复都在后台进行,对 Broker 和应用透明。

即使有 Bookie 节点出错的情况发生时,通过添加新的可用的 Bookie 来替换失败的 Bookie,所有 Broker 都可以继续接受写入,而不会牺牲主题分区的可用性。

独立的可扩展性

由于消息服务层和持久存储层是分开的,因此 Apache Pulsar 可以独立地扩展存储层和服务层。这种独立的扩展,更具成本效益:

当您需要支持更多的消费者或生产者时,您可以简单地添加更多的 Broker。主题分区将立即在 Brokers 中做平衡迁移,一些主题分区的所有权立即转移到新的 Broker。

当您需要更多存储空间来将消息保存更长时间时,您只需添加更多 Bookie。通过智能资源感知和数据放置,流量将自动切换到新的 Bookie 中。Apache Pulsar 中不会涉及到不必要的数据搬迁,不会将旧数据从现有存储节点重新复制到新存储节点。

GEO-REPLICATOIN

多个 Broker 节点组成一个 Pulsar Cluster;多个 Pulsar Cluster 组成一个 Pulsar Instance。

Pulsar 通过 GEO-REPLICATION 支持一个 Instance 内在不同的地域发送和消费消息。

上图中,Producer P1、P2、P3 在不同的 Cluster 发送给 Topic T1 的消息,会在 Cluster 之间进行复制,Consumer C1、C2 可以在自己所在的 Cluster 消费到所有的消息。

当消息被写入 Pulsar 时,首先消息被持久化在 local cluster,之后异步的发送到其他 cluster。在没有链接问题的情况下,通常复制的 latency 相近于网络的 RTT。

Pulsar 与 Kafka 的对比

Apache Pulsar 和 Apache Kafka 是两种流行的分布式消息系统和流处理平台。它们都用于处理实时数据流,但在架构、功能和使用场景上有一些显著的区别。

以下是对 Pulsar 和 Kafka 的详细对比:

架构

  • Apache Kafka
    • 单层架构:Kafka 使用一个单层的分布式日志系统,集成了消息存储和消费的功能。
    • Broker-centric:数据存储在 Kafka broker 上,消费和生产通过 broker 进行。
    • 分区和副本:通过分区和副本实现高可用性和可扩展性。
  • Apache Pulsar
    • 分层架构:Pulsar 使用分层架构,分为存储层(BookKeeper)和服务层(Pulsar broker)。
    • Segmented Storage:Pulsar 使用 Apache BookKeeper 来管理消息的持久化存储,支持水平扩展和高可用性。
    • 多租户支持:原生支持多租户和命名空间管理。

性能和扩展性

  • Apache Kafka
    • 高吞吐量:Kafka 以其高吞吐量和低延迟著称,非常适合大规模数据流处理。
    • 分区限制:Kafka 的性能可能受到分区数量的限制,因为每个分区需要单独的文件句柄和线程。
  • Apache Pulsar
    • 水平扩展:通过 BookKeeper 的分段存储,Pulsar 可以轻松扩展到数千个主题和分区。
    • 低延迟:在高并发场景下,Pulsar 能够保持低延迟。

功能特性

  • Apache Kafka
    • 生态系统丰富:拥有强大的生态系统,包括 Kafka Streams 和 ksqlDB,用于流处理。
    • 强大的社区支持:Kafka 社区活跃,有大量的工具和第三方支持。
  • Apache Pulsar
    • 多租户和命名空间:支持多租户和命名空间管理,适合复杂的企业环境。
    • 原生多主题:支持多主题订阅和复杂的消息路由。
    • 即发即弃模式:支持消息的即发即弃(非持久化)和持久化存储。

易用性和管理

  • Apache Kafka
      成熟性:作为一个成熟的项目,Kafka的安装和使用文档丰富,易于上手。
    • 管理工具:提供多种管理和监控工具,但需要对分区和副本进行细致的配置和管理。
  • Apache Pulsar
    • 自动分区管理:Pulsar可以自动管理分区和负载均衡,减少了运维负担。
    • UI管理工具:提供Pulsar Manager和Pulsar Dashboard进行可视化管理。

使用场景

  • Apache Kafka:适用于需要高吞吐量和低延迟的场景,如日志聚合、事件流处理和实时分析。
  • Apache Pulsar:适合需要多租户支持、复杂消息路由和水平扩展的企业级应用场景。

Pulsar应用实例

实时数仓

首先看一个典型的、简化的实时数仓场景:给定业务库中全量商品的订单表,统计截止到当前的各个商品的订单总量。

这里面有两点需要注意:

  • 订单表中有订单状态,在统计订单量的时候需要过滤掉无效订单。
  • 订单状态随时可能发生变化。

上面两点给实时数仓的开发带来了很大的复杂性。源头的业务库中的数据可变,在实时流处理的时候需要考虑到这种变化,并对实时计算结果进行调整。

输入输出样例如下图所示:

上图左边是业务库中订单粒度的原始表,我们期望聚合成右边的以商品为粒度的商品总订单数的统计表。

另外,为了不影响线上业务,不允许直接查询线上业务库得到结果,需要以业务库为数据源建立数据仓库来支持数据分析需求。

当然,有很多成熟的方案可以解决这个问题。例如经典的Lambda架构,其核心思想是分为离线和实时两条链路:离线链路计算历史数据,实时链路计算当日数据。最后把历史数据和当日数据merge起来。如下图所示:

Lambda架构是比较成熟的方案,但也存在一些问题,如下:

  • 同时维护离线、实时链路,链路复杂,资源消耗大,维护成本高。
  • 对于部分订单状态发生变化的情况,难以很好处理。例如历史订单在当日(今日)发生了失效,状态从有效变为了无效,这时处理起来会有一些复杂性,需要考虑对离线历史数据的实时调整。
  • 离线计算和实时计算结果需要merge,需要精确把握时间点,离线和实时的计算结果的时间范围需要做到不重、不漏。
  • 对于需要从多个源表获取数据,且多个源表的字段值有可能发生变化的情况,则更为复杂。这里限于篇幅,不展开讲了。感兴趣的读者可以构造一些情况来推演一下相关的处理逻辑,会发现里面确实会有许多复杂的情况,涉及到流join、数据的消费顺序等。可以梳理一下其中遇到的问题。

除了Lambda架构,还有另一个方案基于upsert离线表(如Hudi表)的计算。其核心思想是在Hudi表中近实时同步业务库中的数据(通过消费binlog数据),在Hudi表(相当于一个订单粒度的近实时表)的基础上,每隔一段时间(如15分钟)按照离线链路聚合数据的方式全量计算一次聚合结果,并将生成的结果同步到OLAP引擎中供查询。聚合计算的源头Hudi表是近实时更新的,聚合计算过程是近实时触发的,因此OLAP引擎中的结果表的时效性也是近实时的。这个方案的数据处理链路如下图所示:

这个方案的一个好处是,复用离线数据开发的逻辑到Hudi表的近实时全量计算逻辑中,以较低的成本来实现近实时的统计分析,但也会有一些问题,列举如下:

  • 需要较高频率的离线全量计算,消耗计算资源。
  • 对离线存储资源仍有消耗。
  • 不是纯实时(秒级)更新,而是一个近实时的过程。

针对以上实时数仓的场景,Pulsar具备解决方案。具体来说,线上业务库的订单表输出binlog到Pulsar消息队列中。这个消息队列有全量的数据,其中冷数据可以offload到对象存储中。接下来可以使用Pulsar SQL每15分钟针对Pulsar中的全量数据计算一次聚合结果,并将计算结果写入OLAP引擎中供查询。这个方案类似于上面提到的Hudi方案,不同之处在于利用了Pulsar SQL,相当于可以直接去查询消息队列中存储的数据。

整个计算链路如下图所示:

好处是:

  • 可以利用Pulsar的分级存储特性,将冷数据写入对象存储。
  • Pulsar消息队列的存储,既可以作为中间数据的存储,也可以作为离线ODS层数据的存储,节省存储资源,链路简化。Pulsar的分级存储和Pulsar SQL等特性使得直接在消息队列存储中做计算成为可能,进而简化数据处理链路。

通过上面的讨论,我们看到了在火山引擎EMR中,可以将其中的一些大数据组件和Pulsar结合起来使用,解决实时数仓开发中的一些问题。

批流一体

埋点日志数据存在实时处理和离线处理的需求:

  • 离线链路:用于天级报表、离线训练数据等场景。
  • 实时链路:用于实时分析、推荐等场景。

一个经典方案,类似于上文提到的Lambda架构,需要维护离线和实时两套数据链路,如下图所示:

这样的方案在实施上比较成熟,但是占用资源较多,维护成本较高。

而基于Pulsar也可以有一类方案,聚焦在实时链路。埋点日志数据上报到Pulsar中,用实时任务去写下游的DWD和DWS层(到Pulsar中)。整个Pulsar的实时链路也支持数据offload到对象存储。数据也可以直接写到OLAP层。如果有离线数据计算的需求,可以用Pulsar SQL直接对接Pulsar中存储的数据。整个数据链路如下图所示:

  • 基于Pulsar的分级存储和Pulsar SQL等特性,可以直接把Pulsar中的数据作为离线链路的ODS层。
  • Pulsar的下游可以直接对接实时处理逻辑。
    • 若基于Pulsar中的原始日志数据,建立实时数仓,实时计算ODS层数据生成DWD层数据到Pulsar topic中,则Pulsar topic中的DWD层数据可以同时直接用于后续的离线计算和实时处理。
    • DWS层同理。

参考链接:

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注