Celery 執行器¶
注意
從 Airflow 2.7.0 開始,您需要安裝 celery
提供者套件才能使用此執行器。這可以透過安裝 apache-airflow-providers-celery>=3.3.0
或使用 celery
額外功能安裝 Airflow 來完成:pip install 'apache-airflow[celery]'
。
CeleryExecutor
是擴展 Worker 數量的方法之一。為了使此功能正常運作,您需要設定 Celery 後端(RabbitMQ、Redis、Redis Sentinel …),安裝必要的依賴項(例如 librabbitmq
、redis
…)並變更您的 airflow.cfg
,將 executor 參數指向 CeleryExecutor
並提供相關的 Celery 設定。
有關設定 Celery Broker 的更多資訊,請參閱詳盡的 Celery 文件主題。
Celery 執行器的組態參數可以在 Celery 提供者的 組態參考 中找到。
以下是 Worker 的一些必要條件
需要安裝
airflow
,並且 CLI 需要在路徑中Airflow 組態設定在叢集中應保持一致
在 Worker 上執行的 Operator 需要在該上下文中滿足其依賴項。例如,如果您使用
HiveOperator
,則需要在該機器上安裝 Hive CLI;如果您使用MySqlOperator
,則需要在PYTHONPATH
中以某種方式提供所需的 Python 函式庫Worker 需要存取其
DAGS_FOLDER
,並且您需要自行同步檔案系統。常見的設定是將您的DAGS_FOLDER
儲存在 Git 儲存庫中,並使用 Chef、Puppet、Ansible 或您用來組態環境中機器的任何工具在機器之間同步它。如果您的所有機器都有一個通用的掛載點,則在那裡共享您的管線檔案也應該可以運作
若要啟動 Worker,您需要設定 Airflow 並啟動 Worker 子命令
airflow celery worker
您的 Worker 應該在任務被觸發時立即開始接收任務。若要停止在機器上執行的 Worker,您可以使用
airflow celery stop
它將嘗試透過將 SIGTERM
訊號傳送至 Celery 主程序(如 Celery 文件 建議)來優雅地停止 Worker。
請注意,您也可以執行 Celery Flower(一個基於 Celery 建構的 Web UI)來監控您的 Worker。您可以使用快捷命令來啟動 Flower Web 伺服器
airflow celery flower
請注意,您必須在系統上已安裝 flower
Python 函式庫。建議的方式是安裝 airflow celery bundle。
pip install 'apache-airflow[celery]'
一些注意事項
請確保使用資料庫支援的結果後端
請確保在
[celery_broker_transport_options]
中設定超過最長執行任務 ETA 的可見性逾時如果您使用 Redis Sentinel 作為 Broker 且 Redis 伺服器受到密碼保護,請確保在
[celery_broker_transport_options]
區段中指定 Redis 伺服器的密碼請確保在
[worker_umask]
中設定 umask,以設定 Worker 新建立檔案的權限。任務會消耗資源。請確保您的 Worker 有足夠的資源來執行
worker_concurrency
個任務佇列名稱限制為 256 個字元,但每個 Broker 後端可能都有自己的限制
有關 Python 和 Airflow 如何管理模組的詳細資訊,請參閱 模組管理。
架構¶
Airflow 由幾個組件組成
Worker - 執行分配的任務
Scheduler - 負責將必要的任務新增到佇列
Web 伺服器 - HTTP 伺服器提供對 DAG/任務狀態資訊的存取
資料庫 - 包含有關任務、DAG、變數、連線等的狀態資訊。
Celery - 佇列機制
請注意,Celery 的佇列由兩個組件組成
Broker - 儲存要執行的命令
結果後端 - 儲存已完成命令的狀態
組件在許多地方相互通訊
[1] Web 伺服器 –> Worker - 提取任務執行日誌
[2] Web 伺服器 –> DAG 檔案 - 顯示 DAG 結構
[3] Web 伺服器 –> 資料庫 - 提取任務的狀態
[4] Worker –> DAG 檔案 - 顯示 DAG 結構並執行任務
[5] Worker –> 資料庫 - 取得並儲存有關連線組態、變數和 XCOM 的資訊。
[6] Worker –> Celery 的結果後端 - 儲存任務的狀態
[7] Worker –> Celery 的 Broker - 儲存要執行的命令
[8] Scheduler –> DAG 檔案 - 顯示 DAG 結構並執行任務
[9] Scheduler –> 資料庫 - 儲存 DAG 執行和相關任務
[10] Scheduler –> Celery 的結果後端 - 取得有關已完成任務狀態的資訊
[11] Scheduler –> Celery 的 Broker - 放置要執行的命令
任務執行流程¶

循序圖 - 任務執行流程¶
最初,有兩個程序正在執行
SchedulerProcess - 處理任務並使用 CeleryExecutor 執行
WorkerProcess - 觀察佇列,等待新任務出現
WorkerChildProcess - 等待新任務
還有兩個資料庫可用
QueueBroker
ResultBackend
在此過程中,會建立兩個程序
LocalTaskJobProcess - 其邏輯由 LocalTaskJob 描述。它正在監控 RawTaskProcess。新程序使用 TaskRunner 啟動。
RawTaskProcess - 它是一個包含使用者程式碼的程序,例如
execute()
。
execute_command()
。它建立一個新程序 - LocalTaskJobProcess。LocalTaskJob
類別描述。它使用 TaskRunner 啟動新程序。佇列¶
使用 CeleryExecutor 時,可以指定任務要傳送到的 Celery 佇列。queue
是 BaseOperator 的屬性,因此任何任務都可以分配給任何佇列。環境的預設佇列在 airflow.cfg
的 operators -> default_queue
中定義。這定義了未指定時任務分配到的佇列,以及 Airflow Worker 啟動時監聽的佇列。
Worker 可以監聽一個或多個任務佇列。當 Worker 啟動時(使用命令 airflow celery worker
),可以提供一組逗號分隔的佇列名稱(不含空格)(例如 airflow celery worker -q spark,quark
)。然後,此 Worker 將僅接收連接到指定佇列的任務。
如果您需要專門的 Worker,從資源的角度來看(例如,對於非常輕量級的任務,一個 Worker 可以處理數千個任務而沒有問題),或者從環境的角度來看(您希望 Worker 從 Spark 叢集本身內部執行,因為它需要非常特定的環境和安全性權限),這可能會很有用。