優先權重

priority_weight 定義了執行器佇列中的優先順序。預設的 priority_weight1,並且可以提升為任何整數。此外,每個任務都有一個真正的 priority_weight,該權重是根據其 weight_rule 計算得出的,而 weight_rule 定義了用於任務有效總優先權重的加權方法。

以下是加權方法。預設情況下,Airflow 的加權方法為 downstream

下游 (downstream)

任務的有效權重是所有下游後代的總和。因此,當使用正權重值時,上游任務將具有更高的權重,並且將被更積極地排程。當您有多個 DAG 執行個體,並希望在每個 DAG 繼續處理下游任務之前,讓所有執行的上游任務都完成時,這非常有用。

上游 (upstream)

有效權重是所有上游祖先的總和。這與下游任務具有較高權重,並且在使用正權重值時將被更積極地排程的情況相反。當您有多個 DAG 執行個體,並且希望在開始其他 DAG 執行的上游任務之前完成每個 DAG 時,這非常有用。

絕對 (absolute)

有效權重是指定的確切 priority_weight,沒有額外的加權。當您確切知道每個任務應具有的優先權重時,您可能需要這樣做。此外,當設定為 absolute 時,還有一個額外的好處,即對於非常大的 DAG,可以顯著加快任務建立過程。

priority_weight 參數可以與佇列池結合使用。

注意

由於大多數資料庫引擎都使用 32 位元整數,因此任何計算或定義的 priority_weight 的最大值為 2,147,483,647,最小值為 -2,147,483,648。

自訂權重規則

2.9.0 版本新增。

您可以透過擴展 PriorityWeightStrategy 類別並在插件中註冊它來實作您自己的自訂加權方法。

airflow/example_dags/plugins/decreasing_priority_weight_strategy.py[原始碼]

class DecreasingPriorityStrategy(PriorityWeightStrategy):
    """A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""

    def get_weight(self, ti: TaskInstance):
        return max(3 - ti.try_number + 1, 1)


class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
    name = "decreasing_priority_weight_strategy_plugin"
    priority_weight_strategies = [DecreasingPriorityStrategy]


然後要使用它,您可以建立自訂類別的實例,並在任務的 weight_rule 參數中提供它,或者提供自訂類別的路徑

from custom_weight_rule_module import CustomPriorityWeightStrategy

# provide the class instance
task1 = BashOperator(task_id="task", bash_command="echo 1", weight_rule=CustomPriorityWeightStrategy())

# or provide the path of the class
task1 = BashOperator(
    task_id="task",
    bash_command="echo 1",
    weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)

這是一個實驗性功能

此條目是否有幫助?