Pools¶
當過多的程序同時衝擊某些系統時,這些系統可能會不堪負荷。Airflow pools 可用於限制任意任務集上的執行並行性。pools 清單在 UI 中管理 (Menu -> Admin -> Pools
),方法是給 pools 一個名稱並為其分配一定數量的 worker 插槽。在那裡,您還可以決定 pool 是否應將延遲任務納入其佔用插槽的計算中。
然後,可以透過在使用任務時使用 pool
參數,將任務與現有的 pools 之一建立關聯
aggregate_db_message_job = BashOperator(
task_id="aggregate_db_message_job",
execution_timeout=timedelta(hours=3),
pool="ep_data_pipeline_db_msg_agg",
bash_command=aggregate_db_message_job_cmd,
dag=dag,
)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
當插槽填滿時,任務將照常排程。任務佔用的插槽數量可以透過 pool_slots
進行配置(請參閱以下章節)。一旦達到容量,可執行的任務就會排隊,並且它們的狀態將在 UI 中顯示為這樣。當插槽空出時,排隊的任務會根據任務及其後代的優先權重開始執行。
請注意,如果任務未被賦予 pool,它們將被分配到預設的 pool default_pool
,該 pool 初始化為 128 個插槽,並且可以透過 UI 或 CLI 進行修改(但無法移除)。
使用多個 pool 插槽¶
Airflow 任務預設情況下每個將佔用一個 pool 插槽,但如果需要,可以使用 pool_slots
參數將其配置為佔用更多插槽。當屬於同一個 pool 的多個任務不具有相同的「計算權重」時,這特別有用。
例如,考慮一個具有 2 個插槽的 pool,Pool(pool='maintenance', slots=2)
,以及以下任務
BashOperator(
task_id="heavy_task",
bash_command="bash backup_data.sh",
pool_slots=2,
pool="maintenance",
)
BashOperator(
task_id="light_task1",
bash_command="bash check_files.sh",
pool_slots=1,
pool="maintenance",
)
BashOperator(
task_id="light_task2",
bash_command="bash remove_files.sh",
pool_slots=1,
pool="maintenance",
)
由於重型任務配置為使用 2 個 pool 插槽,因此它在運行時會耗盡 pool。因此,任何輕型任務都必須排隊並等待重型任務完成才能執行。在這裡,在資源使用方面,重型任務相當於兩個輕型任務同時運行。
此實作可以防止系統資源不堪負荷,在這種情況下(在本範例中),當重型任務和輕型任務同時運行時可能會發生這種情況。另一方面,兩個輕型任務可以同時運行,因為它們每個只佔用一個 pool 插槽,而重型任務將不得不等待兩個 pool 插槽可用才能執行。
警告
Pools 和 SubDAGs 的互動方式與您最初預期的不同。SubDAGs 將不會遵守您在頂層為它們設定的任何 pool;pools 必須直接在 SubDAG內部的任務上設定。