器→工具, 开源项目

任务编排与调度工具Airflow

钱魏Way · · 52 次浏览

Airflow简介

Apache Airflow 是一个用于编排和调度复杂工作流的开源平台,广泛应用于数据工程和数据科学领域。它提供了一种灵活的方式来定义、调度和监控数据管道。

Airflow 是一个编排、调度和监控 workflow 的平台,由 Airbnb 公司 2014年10月 开源,2019年1月从 Apache 基金会毕业,成为新的 Apache 顶级项目。Airflow 将 workflow 编排为 tasks 组成的 DAGs,调度器在一组 workers 上按照指定的依赖关系执行 tasks。同时,Airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且 Airflow 提供了监控和报警系统。Airflow 使用 DAG (有向无环图) 来定义工作流,配置作业依赖关系非常方便,从管理方便和使用简单角度来讲,Airflow 远超过其他的任务调度工具。

  • 编排:任务间的依赖关系
  • 调度:时间调度 和 任务调度;通过控制 Broker 不同 Queue,来分发任务执行
  • 监控:DAGs 和 Task 的状态

核心概念

  • DAG(有向无环图):DAG 是 Airflow 中的核心概念,表示一组有依赖关系的任务。每个 DAG 是一个 Python 脚本,定义了任务及其执行顺序。
  • 任务(Task):任务是 DAG 中的一个节点,表示一个具体的操作。Airflow 使用 Operator 来定义任务的类型,例如 BashOperator、PythonOperator、EmailOperator 等。
  • Operator(操作符):Operator 是 Airflow 中的构建块,用于定义任务的具体行为。Airflow 提供了多种内置 Operator 来执行不同类型的任务。
  • 执行器(Executor):执行器决定了任务的执行方式。Airflow 支持多种执行器,包括本地执行器、Celery 执行器和 Kubernetes 执行器,分别适用于不同的并行执行需求。
  • 调度器(Scheduler):调度器负责按照定义的计划(如时间间隔、特定时间点)触发 DAG 的执行。
  • Web UI:Airflow 提供了一个直观的 Web 用户界面,用于监控和管理 DAG 的执行状态。用户可以查看 DAG 的拓扑结构、任务状态、日志等。

特点和优势

  • 灵活性:由于 DAG 是用 Python 编写的,用户可以利用 Python 的强大功能和库来定义复杂的工作流逻辑。
  • 可扩展性:Airflow 支持分布式调度和执行,适合大规模任务的并行处理。通过不同的执行器,Airflow 可以在多台机器上运行任务。
  • 可视化:Web UI 提供了对工作流的可视化管理,用户可以直观地查看任务依赖关系、执行历史和性能指标。
  • 丰富的社区和插件:Airflow 拥有活跃的社区和丰富的插件生态,支持与各种外部系统和服务的集成,如 AWS、GCP、Hadoop、Spark 等。
  • 动态 DAG 生成:Airflow 允许动态生成 DAG,使得在运行时可以根据外部条件动态调整任务。

使用场景

  • ETL 管道:通过 Airflow 编排数据提取、转换和加载(ETL)过程,确保数据在不同系统之间的流动。
  • 数据科学工作流:调度和管理机器学习模型的训练、测试和部署任务。
  • 定时任务:执行定时的批处理任务,如数据备份、报告生成等。
  • 依赖任务管理:管理复杂的任务依赖关系,确保任务按照预定的顺序执行。

部署与架构

  • 组件:Airflow 的架构由几个关键组件组成:Web Server、Scheduler、Worker 和 Metadata Database。Web Server 提供用户界面,Scheduler 负责调度任务,Worker 执行任务,Metadata Database 存储 DAG 和任务的状态信息。
  • 部署方式:Airflow 可以在单机上运行以进行开发和测试,也可以通过 Celery 或 Kubernetes 执行器进行分布式部署以处理大规模工作流。
  • 可扩展性:通过添加更多的 Worker 节点,Airflow 可以水平扩展以支持更多并发任务。

安全性

  • 身份认证:Airflow 支持多种身份认证机制,如 LDAP、OAuth 和密码认证,以控制用户访问。
  • 权限管理:通过基于角色的访问控制(RBAC),可以对用户和角色分配不同的权限级别。
  • 日志和审计:Airflow 记录详细的任务执行日志和用户操作日志,便于监控和审计。

Apache Airflow 的灵活性和强大功能使其成为数据工程师和数据科学家构建和管理复杂工作流的首选工具之一。通过其模块化和可扩展的架构,Airflow 可以适应各种规模和复杂度的任务调度需求。

Airflow架构详解

  • scheduler,它处理触发计划的工作流,并将任务提交给executor运行。
  • executor,处理正在运行的任务。在默认的 Airflow 安装中,它运行在scheduler中,但大多数适合生产的executor实际上将任务执行推送给workers。
  • webserver,它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的行为。
  • DAG Directory,由scheduler和executor(以及executor所有的worker)读取
  • Metadata Database,供scheduler, executor和webserver用来存储状态。

Scheduler

基础概念

  • JOB:最上层的工作。分为 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 创建,BackfillJob 由 Backfill 创建,LocalTaskJob 由前面两种 Job 创建。
  • DAG:有向无环图,用来表示工作流。
  • DAG Run:工作流实例,表示某个工作流的一次运行(状态)。
  • Task:任务,工作流的基本组成部分。
  • TaskInstance:任务实例,表示某个任务的一次运行(状态)。

heartbeat 

在airflow中,heartbeat是相当重要的一个概念,heartbeat不能简单的理解成“心跳”,heartbeat是airflow中各个组件协同、通信、交互、工作的一种方式,在shceduler中,有下面几种重要的heatbeat

heartbeat 作用
scheduler -> executor 触发运行的task,并更新task状态
scheduler -> DagFileProcessorAgent 检测DagFileProcessorManager的存活状态,并处理manager返回的消息
scheduler -> scheduler 检查scheduler是否存在问题,更新schedulerjob数据库的信息

功能逻辑

调度器实际上就是一个 airflow.jobs.SchedulerJob 实例 job 持续运行 run 方法。job.run() 在开始时将自身的信息加入到 job 表中,并维护状态和心跳,预期能够正常结束,将结束时间也更新到表中。 但是实际上往往因为异常中断,导致结束时间为空。不管是如何进行的退出,SchedulerJob 退出时会关闭所有子进程。

这里简单介绍下 Scheduler 的功能逻辑:

  • 遍历 dags 路径下的所有 dag 文件, 启动一定数量的进程(进程池),并且给每个进程指派一个 dag 文件。 每个 DagFileProcessor 解析分配给它的dag文件,并根据解析结果在 DB中创建 DagRuns 和 TaskInstance。
  • 在 scheduler_loop 中,检查与活动 DagRun 关联的 TaskInstance 的状态,解析 TaskInstance 之间的任何依赖,标识需要被执行的 TaskInstance,然后将它们添加至 executor 队列,将新排列的 TaskInstance 状态更新为QUEUED状态。
  • 每个可用的 executor 从队列中取一个 TaskInstance,然后开始执行它,将此 TaskInstance 的数据库记录更新为SCHEDULED。
  • 当一个 TaskInstance 完成运行,关联的 executor 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。
  • 一旦所有的dag处理完毕后,就会进行下一轮循环处理。这里还有一个细节就是上一轮的某个dag的处理时间可能很长,导致到下一轮处理的时候这个dag还没有处理完成。 Airflow 的处理逻辑是在这一轮不为这个dag创建进程,这样就不会阻塞进程去处理其余dag

启动过程

scheduler在启动时,通过airflow/__main__.py入口方法传入参数”scheduler”启动,启动时会创建Job对象,Job的概念在上面有描述,开始执行job.run()方法后进入核心逻辑,核心步骤如下

  • 打印关键日志:”Starting the scheduler”,表示开始启动
  • 启动DagFileProcessorAgent
    • 扫描文件,导入模块
    • dag定义文件中的搜集dag
    • 创建dagrun
    • 创建任务实例
  • 进入核心的_run_scheduler_loop方法,循环调用,主要功能如下
    • .通过 DagFileProcessorAgent 获取 DAG 解析结果
    • 查找和排队可执行任务
      • 更改数据库中的TaskInstance状态
      • 把任务放在executor中排队
    • 心跳检测executor
      • 异步执行executor中排队的任务
      • 同步运行任务的状态
    • 执行_do_scheduling,dag级别的调度的决策,TaskInstance状态管理,task同步到executor等功能,后面会详解
    • 开启心跳检查
      • Executor心跳
      • DagFileProcessorAgent心跳
      • Scheduler心跳

Executor

Airflow本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:

  • SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
  • LocalExecutor:多进程本地执行任务
  • CeleryExecutor:分布式调度,生产常用
  • DaskExecutor :动态任务调度,主要用于数据分析

在当前项目使用CeleryExecutor作为执行器。

celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如redis或者rabbitmq

CeleryExecutor

  • Worker- 执行分配的任务
  • Scheduler——负责将必要的任务添加到队列中
  • WebServer- HTTP 服务器提供对 DAG/任务状态信息的访问
  • Database- 包含有关任务、DAG、变量、连接等状态的信息。
  • Celery- 队列机制

Celery 的队列由两个部分组成:

  • Broker- 存储要执行的命令
  • Result backend- 存储已完成命令的状态

上图组件在很多地方相互通信

  • [1]Web server –> Workers – 拉取任务执行日志
  • [2]Web server –> DAG files – 解析展示DAG结构
  • [3]Web server –> Database – 拉取任务状态
  • [4]Workers –> DAG files – 解析DAG结构并执行task
  • [5]Workers –> Database – 获取和存储有关连接配置、变量
  • [6]Workers –> Celery’s result backend – 存储task运行状态
  • [7]Workers –> Celery’s broker – 存储要执行的命令
  • [8]Scheduler –> DAG files – 解析DAG结构并执行task
  • [9]Scheduler –> Database – 存储 dagrun和相关task
  • [10]Scheduler –> Celery’s result backend – 获取已经执行完成的task信息
  • [11]Scheduler –> Celery’s broker – put要执行的命令

任务执行流程

初始状态有两个进程运行:

  • SchedulerProcess – 处理tasks并使用 CeleryExecutor 运行
  • WorkerProcess – 观察队列等待新的tasks出现

两个抽象的数据库,使用celery:

  • QueueBroker
  • ResultBackend

在运行task过程中,创建了两个进程:

  • LocalTaskJobProcess – 它的逻辑由 LocalTask​​Job 描述,同时监控RawTaskProcess的状态,新的进程将由TaskRunner启动.
  • RawTaskProcess – 它用来处理用户实际的代码,比如用户执行的一行shell
[1] SchedulerProcess处理任务,当它发现需要完成的任务时,将其发送到QueueBroker。
[2] SchedulerProcess也会周期性的查询ResultBackend以获取任务的状态。
[3] QueueBroker当它意识到有新任务产生时,会将有关它的信息发送到 WorkerProcess 
[4] WorkerProcess将单个任务分配给一个WorkerChildProcess。
[5] WorkerChildProcess执行适当的任务处理功能 - 关键方法:execute_command()。并创建个新的进程 - LocalTaskJobProcess。
[6] LocalTaskJobProcess 逻辑由LocalTaskJob类描述。它使用 TaskRunner 启动的新进程。
[7] RowTaskProcess 执行用户实际的代码,这一步是最底层真正执行task的操作
[8][9] Process RawTaskProcess和LocalTaskJobProcess在完成task后停止。
[10] WorkerChildProcess通知主进程-WorkerProcess任务结束以及后续任务的可用性。
[11] WorkerProcess将状态信息保存在ResultBackend 中。
[12] 当SchedulerProcess再次询问ResultBackend状态时,将会获取到任务状态的信息。

Scheduler源码分析

_run_scheduler_loop

调度的核心逻辑在airflow.jobs.scheduler_job.SchedulerJob._run_scheduler_loop(),从方法名可以知道是一个循环,基础逻辑是从数据库找出需要调度的dag,创建dagrun,创建TaskInstance,管理生命周期状态等等功能。我们一步步深入研究它内部的最主要逻辑

def _run_scheduler_loop(self) -> None:
        """
        The actual scheduler loop. The main steps in the loop are:
            #. Harvest DAG parsing results through DagFileProcessorAgent
            #. Find and queue executable tasks
                #. Change task instance state in DB
                #. Queue tasks in executor
            #. Heartbeat executor
                #. Execute queued tasks in executor asynchronously
                #. Sync on the states of running tasks

        Following is a graphic representation of these steps.

        .. image:: ../docs/apache-airflow/img/scheduler_loop.jpg

        :rtype: None
        """
        if not self.processor_agent:
            raise ValueError("Processor agent is not started.")
        is_unit_test: bool = conf.getboolean('core', 'unit_test_mode')
        # 1.创建timer,scheduler自身的定时器,与用户代码无关
        timers = EventScheduler()
 
        # Check on start up, then every configured interval
        self.adopt_or_reset_orphaned_tasks()
 
        timers.call_regular_interval(
            conf.getfloat('scheduler', 'orphaned_tasks_check_interval', fallback=300.0),
            self.adopt_or_reset_orphaned_tasks,
        )
 
        timers.call_regular_interval(
            conf.getfloat('scheduler', 'pool_metrics_interval', fallback=5.0),
            self._emit_pool_metrics,
        )
 
        timers.call_regular_interval(
            conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval', fallback=15.0),
            self._clean_tis_without_dagrun,
        )
 
        for loop_count in itertools.count(start=1):
            with Stats.timer() as timer:
 
                if self.using_sqlite:
                    self.processor_agent.run_single_parsing_loop()
                    # For the sqlite case w/ 1 thread, wait until the processor
                    # is finished to avoid concurrent access to the DB.
                    self.log.debug("Waiting for processors to finish since we're using sqlite")
                    self.processor_agent.wait_until_finished()
 
                with create_session() as session:
                    # 2.执行调度dag相关的关键步骤,返回queued状态的taskinstance个数
                    num_queued_tis = self._do_scheduling(session)
                    # 3.executor通过heartbeat实际触发执行command的步骤
                    self.executor.heartbeat()
                    session.expunge_all()
                    num_finished_events = self._process_executor_events(session=session)
                # 4.检测DagFileProcessorManager的存活状态,并处理manager返回的消息
                 self.processor_agent.heartbeat()
 
                # Heartbeat the scheduler periodically
                # 5.通过heartbeat更新job的条目,检查时间戳判断scheduler是否有问题
                 self.heartbeat(only_if_necessary=True)
 
                # Run any pending timed events
                next_event = timers.run(blocking=False)
                self.log.debug("Next timed event is in %f", next_event)
 
            self.log.debug("Ran scheduling loop in %.2f seconds", timer.duration)
 
            if not is_unit_test and not num_queued_tis and not num_finished_events:
                # If the scheduler is doing things, don't sleep. This means when there is work to do, the
                # scheduler will run "as quick as possible", but when it's stopped, it can sleep, dropping CPU
                # usage when "idle"
                time.sleep(min(self._processor_poll_interval, next_event))
 
            if loop_count >= self.num_runs > 0:
                self.log.info(
                    "Exiting scheduler loop as requested number of runs (%d - got to %d) has been reached",
                    self.num_runs,
                    loop_count,
                )
                break
            if self.processor_agent.done:
                self.log.info(
                    "Exiting scheduler loop as requested DAG parse count (%d) has been reached after %d"
                    " scheduler loops",
                    self.num_times_parse_dags,
                    loop_count,
                )
                break
  • 创建timer,scheduler自身的定时器,与用户代码无关
  • 执行调度dag相关的关键步骤,返回queued状态的taskinstance个数
  • executor通过heartbeat实际触发执行command的步骤
  • 检测DagFileProcessorManager的存活状态,并处理manager返回的消息
  • 通过heartbeat更新scheduler在数据库的信息,检查时间戳判断scheduler是否有问题

_do_scheduling

上面代码中,_do_scheduling()是_run_scheduler_loop()的核心调用方法,(对于Dag,DagRun,Task,TaskInstance等概念已在前面说明,需了解清楚)。所有数据库的操作,都由session对象维护。描述中以airflow自带到demo example_bash_operator 举例说明,DAG图如下

核心逻辑功能如下:

  • 为需要被调度的Dag创建DagRun,表示当前Dag需要被执行一次
  • 更新数据库中Dag的状态,把queued状态的Dag改成running状态

  • 检查DAG的正确性,包含db是否存在、是否序列化,以决定当前DagRun是否可以被执行
  • 修改数据库数据,把Task的状态更改为Scheduled状态

  • 更新数据库后发送回调给DagFileProcessor,以确保context运行时是最新的
  • 检查Executor中slot是否有剩余可用,计算公式为:slots_available = parallelism – running – queued_tasks,如果不可用,直接退出,甚至不会有重试,会打印log:”Executor full, skipping critical section“
  • 第6步中检查发现Executor中slot有剩余可用时,会把数据库中标记为scheduled状态的task标记为queued状态,其中涉及3个细节
  • 在没有超过max_active_runs或pool的限制的情况下会根据TaskInstance的优先级决定执行顺序
    • 根据池限制、dag 并发性、执行程序状态和优先级查找准备好执行的TaskInstance
    • log中会打印”xx tasks up for execution“,也打印了所有TaskInstance到名称,此时真正物理到task还未被真正执行
    • 紧接着会经过一系列的资源检查,如pool、dag并发数、task并发数,确保TaskInstance能被调度,检查成功后,也会更新metrics如:starving_tasks、scheduler.tasks.starving、scheduler.tasks.running、scheduler.tasks.executable
  • 更改TaskInstance的状态,把数据库里面scheduled状态更改为queued,返回要执行的TaskInstance个数
  • 将TaskInstance放入Executor中
  • 其余task级别的逻辑和状态维护均在executor中维护

状态转移追踪

任务状态

DAG级别状态:

状态 释义
QUEUED 已排队
SUCCESS 成功
RUNNING 运行中
FAILED 失败

TASK级别状态:

状态 释义
NONE
REMOVED 已移除
RUNNING 运行中
SCHEDULED 已调度
QUEUED 已排队
SHUTDOWN 关闭
UP_FOR_RETRY 等待重试
UP_FOR_RESCHEDULE 等待被重新调度
UPSTREAM_FAILED 上游失败
SKIPPED 已跳过
SENSING

状态转移

以airlow自带的demo example_bash_operator为例,描述dag的生命周期,状态类别可参考上面列表,状态分为DagRun级别和TaskInstance级别

查询DagRun和TaskInstance的状态有两种方法:

  • 查询airflow的元数据库,分别对应dag_run和task_instance表
  • 查询airflow的webui,可以清晰的看到DagRun和TaskInstance的当前状态

step1

手动运行dag或定时触发

上一次状态 当前状态 状态关键日志 查询状态sql(airflow元数据库)
DagRun QUEUED select * from dag_run where dag_id=”example_bash_operator” and state=”queued”
TaskInstance NONE select * from task_instance where dag_id=”example_bash_operator” state=null

step2

上一次状态 当前状态 状态关键日志 查询状态sql(airflow元数据库)
DagRun QUEUED RUNNING select * from dag_run where dag_id=”example_bash_operator” and state=”running”
TaskInstance NONE SCHEDULED select * from task_instance where dag_id=”example_bash_operator” and state=”scheduled”

step3

上一次状态 当前状态 状态关键日志 查询状态sql(airflow元数据库)
DagRun RUNNING RUNNING select * from dag_run where dag_id=”example_bash_operator” and state=”running”
TaskInstance SCHEDULED QUEUED {scheduler_job.py:517} INFO – Sending TaskInstanceKey(dag_id=’example_bash_operator’, task_id=’runme_0′, execution_date=datetime.datetime(2021, 9, 13, 12, 42, 38, 886807, tzinfo=Timezone(‘UTC’)), try_number=1) to executor with priority 3 and queue default select * from task_instance where dag_id=”example_bash_operator” and state=”queued”

step4

上一次状态 当前状态 状态关键日志 查询状态sql(airflow元数据库)
DagRun RUNNING RUNNING select * from dag_run where dag_id=”example_bash_operator” and state=”running”
TaskInstance QUEUED RUNNING [2021-09-13 23:21:28,217] {standard_task_runner.py:52} INFO – Started process 5894 to run task

[2021-09-13 23:23:32,590] {standard_task_runner.py:76} INFO – Running: [‘airflow’, ‘tasks’……

[2021-09-13 23:23:33,772] {standard_task_runner.py:77} INFO – Job 448: Subtask runme_0

select * from task_instance where dag_id=”example_bash_operator” and state=”running”

step5

上一次状态 当前状态 状态关键日志 查询状态sql(airflow元数据库)
DagRun RUNNING SUCCESS [2021-09-14 09:25:45,976] {dagrun.py:434} INFO – Marking run <DagRun example_bash_operator @ 2021-09-14 01:25:39.909845+00:00: manual__2021-09-14T01:25:39.909845+00:00, externally triggered: True> successful select * from dag_run where dag_id=”example_bash_operator” and state=”success”
TaskInstance RUNNING SUCCESS
  1. [2021-09-14 09:25:43,387] {subprocess.py:82} INFO – Command exited with return code 0
  2. [2021-09-14 09:25:43,413] {taskinstance.py:1218} INFO – Marking task as SUCCESS. dag_id=example_bash_operator, task_id=runme_0……
select * from task_instance where dag_id=”example_bash_operator” and state=”success”

日志记录和监控架构

参考链接:

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注