優先權重¶
priority_weight
定義了執行器佇列中的優先順序。預設的 priority_weight
為 1
,並且可以提升為任何整數。此外,每個任務都有一個真正的 priority_weight
,該權重是根據其 weight_rule
計算得出的,而 weight_rule
定義了用於任務有效總優先權重的加權方法。
以下是加權方法。預設情況下,Airflow 的加權方法為 downstream
。
下游 (downstream)
任務的有效權重是所有下游後代的總和。因此,當使用正權重值時,上游任務將具有更高的權重,並且將被更積極地排程。當您有多個 DAG 執行個體,並希望在每個 DAG 繼續處理下游任務之前,讓所有執行的上游任務都完成時,這非常有用。
上游 (upstream)
有效權重是所有上游祖先的總和。這與下游任務具有較高權重,並且在使用正權重值時將被更積極地排程的情況相反。當您有多個 DAG 執行個體,並且希望在開始其他 DAG 執行的上游任務之前完成每個 DAG 時,這非常有用。
絕對 (absolute)
有效權重是指定的確切 priority_weight
,沒有額外的加權。當您確切知道每個任務應具有的優先權重時,您可能需要這樣做。此外,當設定為 absolute
時,還有一個額外的好處,即對於非常大的 DAG,可以顯著加快任務建立過程。
priority_weight
參數可以與佇列池結合使用。
注意
由於大多數資料庫引擎都使用 32 位元整數,因此任何計算或定義的 priority_weight
的最大值為 2,147,483,647,最小值為 -2,147,483,648。
自訂權重規則¶
2.9.0 版本新增。
您可以透過擴展 PriorityWeightStrategy
類別並在插件中註冊它來實作您自己的自訂加權方法。
class DecreasingPriorityStrategy(PriorityWeightStrategy):
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
def get_weight(self, ti: TaskInstance):
return max(3 - ti.try_number + 1, 1)
class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
name = "decreasing_priority_weight_strategy_plugin"
priority_weight_strategies = [DecreasingPriorityStrategy]
然後要使用它,您可以建立自訂類別的實例,並在任務的 weight_rule
參數中提供它,或者提供自訂類別的路徑
from custom_weight_rule_module import CustomPriorityWeightStrategy
# provide the class instance
task1 = BashOperator(task_id="task", bash_command="echo 1", weight_rule=CustomPriorityWeightStrategy())
# or provide the path of the class
task1 = BashOperator(
task_id="task",
bash_command="echo 1",
weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)
這是一個實驗性功能。