Celery 執行器

注意

從 Airflow 2.7.0 開始,您需要安裝 celery 提供者套件才能使用此執行器。這可以透過安裝 apache-airflow-providers-celery>=3.3.0 或使用 celery 額外功能安裝 Airflow 來完成:pip install 'apache-airflow[celery]'

CeleryExecutor 是擴展 Worker 數量的方法之一。為了使此功能正常運作,您需要設定 Celery 後端(RabbitMQRedisRedis Sentinel …),安裝必要的依賴項(例如 librabbitmqredis …)並變更您的 airflow.cfg,將 executor 參數指向 CeleryExecutor 並提供相關的 Celery 設定。

有關設定 Celery Broker 的更多資訊,請參閱詳盡的 Celery 文件主題

Celery 執行器的組態參數可以在 Celery 提供者的 組態參考 中找到。

以下是 Worker 的一些必要條件

  • 需要安裝 airflow,並且 CLI 需要在路徑中

  • Airflow 組態設定在叢集中應保持一致

  • 在 Worker 上執行的 Operator 需要在該上下文中滿足其依賴項。例如,如果您使用 HiveOperator,則需要在該機器上安裝 Hive CLI;如果您使用 MySqlOperator,則需要在 PYTHONPATH 中以某種方式提供所需的 Python 函式庫

  • Worker 需要存取其 DAGS_FOLDER,並且您需要自行同步檔案系統。常見的設定是將您的 DAGS_FOLDER 儲存在 Git 儲存庫中,並使用 Chef、Puppet、Ansible 或您用來組態環境中機器的任何工具在機器之間同步它。如果您的所有機器都有一個通用的掛載點,則在那裡共享您的管線檔案也應該可以運作

若要啟動 Worker,您需要設定 Airflow 並啟動 Worker 子命令

airflow celery worker

您的 Worker 應該在任務被觸發時立即開始接收任務。若要停止在機器上執行的 Worker,您可以使用

airflow celery stop

它將嘗試透過將 SIGTERM 訊號傳送至 Celery 主程序(如 Celery 文件 建議)來優雅地停止 Worker。

請注意,您也可以執行 Celery Flower(一個基於 Celery 建構的 Web UI)來監控您的 Worker。您可以使用快捷命令來啟動 Flower Web 伺服器

airflow celery flower

請注意,您必須在系統上已安裝 flower Python 函式庫。建議的方式是安裝 airflow celery bundle。

pip install 'apache-airflow[celery]'

一些注意事項

  • 請確保使用資料庫支援的結果後端

  • 請確保在 [celery_broker_transport_options] 中設定超過最長執行任務 ETA 的可見性逾時

  • 如果您使用 Redis Sentinel 作為 Broker 且 Redis 伺服器受到密碼保護,請確保在 [celery_broker_transport_options] 區段中指定 Redis 伺服器的密碼

  • 請確保在 [worker_umask] 中設定 umask,以設定 Worker 新建立檔案的權限。

  • 任務會消耗資源。請確保您的 Worker 有足夠的資源來執行 worker_concurrency 個任務

  • 佇列名稱限制為 256 個字元,但每個 Broker 後端可能都有自己的限制

有關 Python 和 Airflow 如何管理模組的詳細資訊,請參閱 模組管理

架構

digraph A{ rankdir="TB" node[shape="rectangle", style="rounded"] subgraph cluster { label="Cluster"; {rank = same; dag; database} {rank = same; workers; scheduler; web} workers[label="Workers"] scheduler[label="Scheduler"] web[label="Web server"] database[label="Database"] dag[label="DAG files"] subgraph cluster_queue { label="Celery"; {rank = same; queue_broker; queue_result_backend} queue_broker[label="Queue broker"] queue_result_backend[label="Result backend"] } web->workers[label="1"] web->dag[label="2"] web->database[label="3"] workers->dag[label="4"] workers->database[label="5"] workers->queue_result_backend[label="6"] workers->queue_broker[label="7"] scheduler->dag[label="8"] scheduler->database[label="9"] scheduler->queue_result_backend[label="10"] scheduler->queue_broker[label="11"] } }

Airflow 由幾個組件組成

  • Worker - 執行分配的任務

  • Scheduler - 負責將必要的任務新增到佇列

  • Web 伺服器 - HTTP 伺服器提供對 DAG/任務狀態資訊的存取

  • 資料庫 - 包含有關任務、DAG、變數、連線等的狀態資訊。

  • Celery - 佇列機制

請注意,Celery 的佇列由兩個組件組成

  • Broker - 儲存要執行的命令

  • 結果後端 - 儲存已完成命令的狀態

組件在許多地方相互通訊

  • [1] Web 伺服器 –> Worker - 提取任務執行日誌

  • [2] Web 伺服器 –> DAG 檔案 - 顯示 DAG 結構

  • [3] Web 伺服器 –> 資料庫 - 提取任務的狀態

  • [4] Worker –> DAG 檔案 - 顯示 DAG 結構並執行任務

  • [5] Worker –> 資料庫 - 取得並儲存有關連線組態、變數和 XCOM 的資訊。

  • [6] Worker –> Celery 的結果後端 - 儲存任務的狀態

  • [7] Worker –> Celery 的 Broker - 儲存要執行的命令

  • [8] Scheduler –> DAG 檔案 - 顯示 DAG 結構並執行任務

  • [9] Scheduler –> 資料庫 - 儲存 DAG 執行和相關任務

  • [10] Scheduler –> Celery 的結果後端 - 取得有關已完成任務狀態的資訊

  • [11] Scheduler –> Celery 的 Broker - 放置要執行的命令

任務執行流程

_images/run_task_on_celery_executor.png

循序圖 - 任務執行流程

最初,有兩個程序正在執行

  • SchedulerProcess - 處理任務並使用 CeleryExecutor 執行

  • WorkerProcess - 觀察佇列,等待新任務出現

  • WorkerChildProcess - 等待新任務

還有兩個資料庫可用

  • QueueBroker

  • ResultBackend

在此過程中,會建立兩個程序

  • LocalTaskJobProcess - 其邏輯由 LocalTaskJob 描述。它正在監控 RawTaskProcess。新程序使用 TaskRunner 啟動。

  • RawTaskProcess - 它是一個包含使用者程式碼的程序,例如 execute()

[1] SchedulerProcess 處理任務,當它找到需要完成的任務時,會將其傳送到 QueueBroker
[2] SchedulerProcess 也開始定期查詢 ResultBackend 以取得任務的狀態。
[3] QueueBroker 在意識到任務時,會將有關任務的資訊傳送給一個 WorkerProcess。
[4] WorkerProcess 將單個任務分配給一個 WorkerChildProcess
[5] WorkerChildProcess 執行適當的任務處理函式 - execute_command()。它建立一個新程序 - LocalTaskJobProcess
[6] LocalTaskJobProcess 邏輯由 LocalTaskJob 類別描述。它使用 TaskRunner 啟動新程序。
[7][8] 程序 RawTaskProcessLocalTaskJobProcess 在完成其工作後停止。
[10][12] WorkerChildProcess 將任務結束和後續任務的可用性通知主程序 - WorkerProcess
[11] WorkerProcess 將狀態資訊儲存在 ResultBackend 中。
[13] 當 SchedulerProcess 再次向 ResultBackend 詢問狀態時,它將取得有關任務狀態的資訊。

佇列

使用 CeleryExecutor 時,可以指定任務要傳送到的 Celery 佇列。queue 是 BaseOperator 的屬性,因此任何任務都可以分配給任何佇列。環境的預設佇列在 airflow.cfgoperators -> default_queue 中定義。這定義了未指定時任務分配到的佇列,以及 Airflow Worker 啟動時監聽的佇列。

Worker 可以監聽一個或多個任務佇列。當 Worker 啟動時(使用命令 airflow celery worker),可以提供一組逗號分隔的佇列名稱(不含空格)(例如 airflow celery worker -q spark,quark)。然後,此 Worker 將僅接收連接到指定佇列的任務。

如果您需要專門的 Worker,從資源的角度來看(例如,對於非常輕量級的任務,一個 Worker 可以處理數千個任務而沒有問題),或者從環境的角度來看(您希望 Worker 從 Spark 叢集本身內部執行,因為它需要非常特定的環境和安全性權限),這可能會很有用。

此條目是否有幫助?