執行器

執行器是 任務實例 運行的機制。它們具有通用的 API 且是「可插拔的」,這表示您可以根據您的安裝需求更換執行器。

執行器由 executor 選項在 [core] 區段的 組態檔 中設定。

內建執行器以名稱引用,例如

[core]
executor = KubernetesExecutor

自訂或第三方執行器可以透過提供執行器 Python 類別的模組路徑來設定,例如

[core]
executor = my.custom.executor.module.ExecutorClass

注意

有關 Airflow 組態的更多資訊,請參閱 設定組態選項

如果您想檢查目前設定的執行器,您可以使用 airflow config get-value core executor 命令

$ airflow config get-value core executor
SequentialExecutor

執行器類型

執行器有兩種類型 - 本機 (在 scheduler 程序內) 運行任務的執行器,以及 遠端 (通常透過 workers 池) 運行任務的執行器。Airflow 預設配置了 SequentialExecutor,這是一個本機執行器,也是最簡單的執行選項。但是,SequentialExecutor 不適用於生產環境,因為它不允許並行任務運行,因此,某些 Airflow 功能 (例如運行感測器) 將無法正常工作。您應該改用 LocalExecutor 用於小型、單機生產安裝,或使用其中一個遠端執行器用於多機/雲端安裝。

本機執行器

Airflow 任務在本機的排程器程序中運行。

優點:非常易於使用、快速、非常低的延遲,以及極少設定要求。

缺點:功能有限,且與 Airflow 排程器共用資源。

範例:

遠端執行器

遠端執行器可以進一步分為兩類

佇列/批次執行器

Airflow 任務被發送到中央佇列,遠端 workers 從佇列中提取任務以執行。Workers 通常是持久性的,並且一次運行多個任務。

優點:更穩健,因為您將 workers 與排程器程序分離。Workers 可以是大型主機,可以處理大量任務 (通常是並行處理),這具有成本效益。延遲可以相對較低,因為可以配置 workers 始終運行,以便立即從佇列中獲取任務。

缺點:共用 workers 存在嘈雜鄰居問題,任務競爭共用主機上的資源,或競爭環境/系統的配置方式。如果您的工作負載不穩定,它們也可能很昂貴,您可能會有 workers 閒置、資源過度擴展,或者您必須管理它們的擴展和縮減。

範例:

容器化執行器

Airflow 任務在容器/Pod 內以特設方式執行。每個任務都隔離在其自己的容器化環境中,該環境在 Airflow 任務排隊時部署。

優點:每個 Airflow 任務都隔離到一個容器,因此沒有嘈雜鄰居問題。可以為特定任務自訂執行環境 (系統庫、二進制文件、依賴項、資源量等)。具有成本效益,因為 workers 僅在任務持續時間內存活。

缺點:啟動時存在延遲,因為容器或 Pod 需要在任務開始之前部署。如果您運行許多短/小任務,可能會很昂貴。沒有 workers 需要管理,但是您必須管理類似 Kubernetes 集群的東西。

範例:

注意

新的 Airflow 使用者可能會假設他們需要使用本機或遠端執行器之一運行單獨的執行器程序。這是錯誤的。執行器邏輯在排程器程序內部運行,並且將在本機或非本機運行任務,具體取決於所選的執行器。

同時使用多個執行器

警告

多個執行器配置目前是一項 alpha/實驗性功能,可能會在沒有警告的情況下進行更改。

從 2.10.0 版本開始,Airflow 現在可以與多執行器配置一起運作。每個執行器都有其自己的一組優缺點,通常它們是在延遲、隔離和計算效率以及其他屬性之間進行權衡 (有關執行器的比較,請參閱 此處)。運行多個執行器使您可以更好地利用所有可用執行器的優勢,並避免其缺點。換句話說,您可以為一組特定的任務使用特定的執行器,在這些任務中,其特定的優點和好處對於該用例最有意義。

組態設定

配置多個執行器使用與單個執行器用例相同的配置選項 (如 此處 所述),利用逗號分隔列表表示法來指定多個執行器。

注意

列表中的第一個執行器 (無論是單獨使用還是與其他執行器一起使用) 其行為都與 2.10.0 之前的版本相同。換句話說,這將是環境的預設執行器。任何未指定特定執行器的 Airflow 任務或 DAG 都將使用此環境級別的執行器。列表中的所有其他執行器都將被初始化並準備好在 Airflow 任務或 DAG 上指定時運行任務。如果您未在此配置列表中指定執行器,則無法使用它來運行任務。

一些有效的多個執行器配置範例

[core]
executor = 'LocalExecutor'
[core]
executor = 'LocalExecutor,CeleryExecutor'
[core]
executor = 'KubernetesExecutor,my.custom.module.ExecutorClass'

注意

目前不支援使用 _相同_ 執行器類別的兩個實例。

為了更輕鬆地在任務和 DAG 上指定執行器,執行器配置現在支援別名。然後,您可以使用此別名在您的 DAG 中引用執行器 (請參閱下文)。

[core]
executor = 'LocalExecutor,my.custom.module.ExecutorClass:ShortName'

注意

如果 DAG 指定任務使用未配置的執行器,則 DAG 將無法解析,並且警告對話框將顯示在 Airflow UI 中。請確保您希望使用的所有執行器都在運行 Airflow 組件 (排程器、workers 等) 的任何主機/容器上的 Airflow 配置中指定。

撰寫 DAGs 和任務

要為任務指定執行器,請使用 Airflow 運算子上的 executor 參數

BashOperator(
    task_id="hello_world",
    executor="LocalExecutor",
    bash_command="echo 'hello world!'",
)
@task(executor="LocalExecutor")
def hello_world():
    print("hello world!")

要為整個 DAG 指定執行器,請使用現有的 Airflow 預設參數機制。然後,DAG 中的所有任務都將使用指定的執行器 (除非被特定任務明確覆蓋)

def hello_world():
    print("hello world!")


def hello_world_again():
    print("hello world again!")


with DAG(
    dag_id="hello_worlds",
    default_args={"executor": "LocalExecutor"},  # Applies to all tasks in the DAG
) as dag:
    # All tasks will use the executor from default args automatically
    hw = hello_world()
    hw_again = hello_world_again()

注意

任務將它們配置為在其上運行的執行器儲存在 Airflow 資料庫中。變更在每次解析 DAG 後反映。

監控

當使用單個執行器時,Airflow 指標的行為將與 <2.9 時相同。但是,如果配置了多個執行器,則執行器指標 (executor.open_slotsexecutor.queued_slotsexecutor.running_tasks) 將針對配置的每個執行器發布,執行器名稱附加到指標名稱 (例如 executor.open_slots.<executor class name>)。

記錄的工作方式與單個執行器用例相同。

靜態編碼混合執行器

目前有兩個「靜態編碼」執行器,這些執行器是兩個不同執行器的混合體:LocalKubernetesExecutorCeleryKubernetesExecutor。它們的實作不是 Airflow 核心的原生或內在的。相反,這些混合執行器利用任務實例上的 queue 欄位來指示和持久化要在哪個子執行器上運行。這是對 queue 欄位的誤用,並且在使用這些混合執行器時,使其無法用於其預期目的。

諸如此類的執行器還需要手工製作新的「具體」類別,以建立執行器可能組合的每個排列。隨著創建更多執行器,這是站不住腳的,並且會導致更多的維護開銷。不應需要客製化的編碼工作來使用任何執行器組合。

因此,不再建議使用這些類型的執行器。

撰寫您自己的執行器

所有 Airflow 執行器都實作一個通用介面,以便它們是可插拔的,並且任何執行器都可以存取 Airflow 內的所有功能和整合。主要地,Airflow 排程器使用此介面與執行器互動,但其他組件 (例如記錄、CLI 和回填) 也是如此。公共介面是 BaseExecutor。您可以瀏覽程式碼以獲取最詳細和最新的介面,但下面概述了一些重要的重點。

注意

有關 Airflow 公共介面的更多資訊,請參閱 Airflow 的公共介面

您可能想要撰寫自訂執行器的一些原因包括

  • 不存在適合您特定用例的執行器,例如用於計算的特定工具或服務。

  • 您想要使用利用您首選雲端供應商的計算服務的執行器。

  • 您有一個私有的工具/服務用於任務執行,僅供您或您的組織使用。

重要的 BaseExecutor 方法

這些方法不需要覆蓋即可實作您自己的執行器,但了解它們很有用

  • heartbeat:Airflow 排程器 Job 迴圈將定期在執行器上調用 heartbeat。這是 Airflow 排程器和執行器之間互動的主要點之一。此方法更新一些指標、觸發新排隊的任務執行,並更新正在運行/已完成任務的狀態。

  • queue_command:Airflow Executor 將調用 BaseExecutor 的此方法,以提供要由執行器運行的任務。BaseExecutor 只是將 TaskInstances 新增到執行器內部的排隊任務內部列表中。

  • get_event_buffer:Airflow 排程器調用此方法以檢索執行器正在執行的 TaskInstances 的目前狀態。

  • has_task:排程器使用此 BaseExecutor 方法來確定執行器是否已具有特定的任務實例排隊或正在運行。

  • send_callback:將任何回調發送到執行器上配置的接收器。

必須實作的方法

至少必須覆蓋以下方法,才能使您的執行器受到 Airflow 支援

  • sync:Sync 將在執行器 heartbeat 期間定期調用。實作此方法以更新執行器知道的任務狀態。或者,嘗試執行已從排程器接收的排隊任務。

  • execute_async:異步執行命令。此上下文中的命令是用於運行 Airflow 任務的 Airflow CLI 命令。此方法在執行器 heartbeat 期間調用 (在幾個層之後),執行器 heartbeat 由排程器定期運行。實際上,此方法通常只是將任務排隊到要運行的任務的內部或外部佇列中 (例如 KubernetesExecutor)。但也可以直接執行任務 (例如 LocalExecutor)。這將取決於執行器。

可選的介面方法以實作

以下方法不是必須覆蓋才能擁有功能性 Airflow 執行器。但是,實作它們可以帶來一些強大的功能和穩定性

  • start:Airflow 排程器 (和回填) job 將在初始化執行器物件後調用此方法。執行器所需的任何其他設定都可以在此處完成。

  • end:Airflow 排程器 (和回填) job 將在關閉時調用此方法。完成運行 jobs 所需的任何同步清理都應在此處完成。

  • terminate:更強制地停止執行器,甚至殺死/停止正在進行中的任務,而不是同步等待完成。

  • cleanup_stuck_queued_tasks:如果任務在佇列狀態中卡住的時間超過 task_queued_timeout,則它們將由排程器收集並提供給執行器,以便有機會透過此方法處理它們 (執行任何優雅的清理/關閉),並返回 Task Instances 以顯示給使用者的警告訊息。

  • try_adopt_task_instances:已放棄的任務 (例如來自已死亡的排程器 job) 將提供給執行器,以透過此方法採用或以其他方式處理它們。任何無法採用的任務 (預設情況下,BaseExecutor 假設所有任務都無法採用) 應返回。

  • get_cli_commands:執行器可以透過實作此方法向使用者銷售 CLI 命令,有關更多詳細資訊,請參閱下面的 CLI 區段。

  • get_task_log:執行器可以透過實作此方法向 Airflow 任務日誌銷售日誌訊息,有關更多詳細資訊,請參閱下面的 記錄 區段。

相容性屬性

BaseExecutor 類別介面包含一組屬性,Airflow 核心程式碼使用這些屬性來檢查您的執行器與之相容的功能。在撰寫您自己的 Airflow 執行器時,請務必為您的用例正確設定這些屬性。每個屬性只是一個布林值,用於啟用/停用功能或指示執行器支援/不支援功能

  • supports_pickling:執行器是否支援在執行之前從資料庫讀取 pickled DAGs (而不是從檔案系統讀取 DAG 定義)。

  • supports_sentry:執行器是否支援 Sentry

  • is_local:執行器是遠端還是本機。請參閱上面的 執行器類型 區段。

  • is_single_threaded:執行器是否為單執行緒。這與支援哪些資料庫後端特別相關。單執行緒執行器可以使用任何後端運行,包括 SQLite。

  • is_production:執行器是否應出於生產目的而使用。當使用者使用非生產就緒的執行器時,會向使用者顯示 UI 訊息。

  • change_sensor_mode_to_reschedule:在 poke 模式下運行 Airflow 感測器可能會阻塞執行器的執行緒,在某些情況下也會阻塞 Airflow。

  • serve_logs:執行器是否支援提供日誌,請參閱 任務記錄

CLI (命令列介面)

執行器可以銷售 CLI 命令,這些命令將透過實作 get_cli_commands 方法包含在 airflow 命令列工具中。例如,CeleryExecutorKubernetesExecutor 等執行器利用此機制。這些命令可用於設定所需的 workers、初始化環境或設定其他配置。命令僅針對目前配置的執行器銷售。從執行器實作 CLI 命令銷售的虛擬程式碼範例可以在下面看到

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
    sub_commands = [
        ActionCommand(
            name="command_name",
            help="Description of what this specific command does",
            func=lazy_load_command("path.to.python.function.for.command"),
            args=(),
        ),
    ]

    return [
        GroupCommand(
            name="my_cool_executor",
            help="Description of what this group of commands do",
            subcommands=sub_commands,
        ),
    ]

注意

目前,對於 Airflow 命令命名空間沒有嚴格的規則。開發人員有責任為其 CLI 命令使用足夠獨特的名稱,以免與其他 Airflow 執行器或組件造成衝突。

注意

在創建新的執行器或更新任何現有執行器時,請務必不要在模組級別導入或執行任何昂貴的操作/程式碼。執行器類別在多個位置導入,如果它們導入速度很慢,這將對您的 Airflow 環境的效能產生負面影響,尤其是對於 CLI 命令。

記錄

執行器可以銷售日誌訊息,這些訊息將透過實作 get_task_logs 方法包含在 Airflow 任務日誌中。如果執行環境在任務失敗的情況下具有額外上下文,這可能會有所幫助,這可能是由於執行環境本身而不是 Airflow 任務程式碼造成的。包含來自執行環境的設定/關閉記錄也可能有所幫助。KubernetesExecutor 利用此功能來包含來自運行特定 Airflow 任務的 Pod 的日誌,並將它們顯示在該 Airflow 任務的日誌中。從執行器實作任務日誌銷售的虛擬程式碼範例可以在下面看到

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
    messages = []
    log = []
    try:
        res = helper_function_to_fetch_logs_from_execution_env(ti, try_number)
        for line in res:
            log.append(remove_escape_codes(line.decode()))
        if log:
            messages.append("Found logs from execution environment!")
    except Exception as e:  # No exception should cause task logs to fail
        messages.append(f"Failed to find logs from execution environment: {e}")
    return messages, ["\n".join(log)]

下一步

一旦您創建了一個新的執行器類別,實作了 BaseExecutor 介面,您就可以透過將 core.executor 配置值設定為您的執行器的模組路徑來配置 Airflow 以使用它

[core]
executor = my_company.executors.MyCustomExecutor

注意

有關 Airflow 組態的更多資訊,請參閱 設定組態選項,有關在 Airflow 中管理 Python 模組的更多資訊,請參閱 模組管理

此條目是否有幫助?