叢集政策

如果您想要在叢集層級檢查或變更 DAG 或任務,叢集政策可讓您做到。它們有三個主要目的

  • 檢查 DAG/任務是否符合特定標準

  • 在 DAG/任務上設定預設參數

  • 執行自訂路由邏輯

叢集政策主要有三種類型

  • dag_policy:接受名為 dagDAG 參數。在從 DagBag DagBag 載入 DAG 時執行。

  • task_policy:接受名為 taskBaseOperator 參數。當從 DagBag 剖析任務時建立任務時,會執行此政策。這表示可以在任務政策中變更整個任務定義。它與 DagRun 中執行的特定任務無關。task_policy 定義的政策會套用至未來將執行的所有任務實例。

  • task_instance_mutation_hook:接受名為 task_instanceTaskInstance 參數。task_instance_mutation_hook 不是套用至任務,而是套用至與特定 DagRun 相關的任務實例。它在「worker」中執行,而不是在 DAG 檔案處理器中,就在任務實例執行之前。該政策僅適用於目前執行的該任務執行個體。

DAG 和任務叢集政策可以引發 AirflowClusterPolicyViolation 例外,以指示傳遞給它們的 DAG/任務不合規,不應載入。

當需要有意略過該 DAG 時,它們也可以引發 AirflowClusterPolicySkipDag 例外。與 AirflowClusterPolicyViolation 不同,此例外不會顯示在 Airflow Web UI 上(在內部,它不會記錄在元資料庫的 import_error 表格中。)

叢集政策設定的任何額外屬性都優先於 DAG 檔案中定義的屬性;例如,如果您在 DAG 檔案中的任務上設定 sla,然後您的叢集政策也設定了 sla,則叢集政策的值將優先。

如何定義政策函數

有兩種配置叢集政策的方法

  1. 在 Python 搜尋路徑中的某處建立一個 airflow_local_settings.py 檔案($AIRFLOW_HOME 下的 config/ 資料夾是一個很好的「預設」位置),然後將可呼叫物件新增到與上述一個或多個叢集政策名稱(例如 dag_policy)相符的檔案中。

有關如何配置本機設定的詳細資訊,請參閱 配置本機設定

  1. 透過在自訂模組中使用 setuptools entrypoint,使用 Pluggy 介面。

    版本 2.6 新增。

    此方法更進階,適用於已經熟悉 Python 套件的人員。

    首先在模組中建立您的政策函數

    from airflow.policies import hookimpl
    
    
    @hookimpl
    def task_policy(task) -> None:
        # Mutate task in place
        # ...
        print(f"Hello from {__file__}")
    

    然後將 entrypoint 新增到您的專案規格中。例如,使用 pyproject.tomlsetuptools

    [build-system]
    requires = ["setuptools", "wheel"]
    build-backend = "setuptools.build_meta"
    
    [project]
    name = "my-airflow-plugin"
    version = "0.0.1"
    # ...
    
    dependencies = ["apache-airflow>=2.6"]
    [project.entry-points.'airflow.policy']
    _ = 'my_airflow_plugin.policies'
    

    entrypoint 群組必須是 airflow.policy,並且名稱會被忽略。值應為使用 @hookimpl 標記裝飾的模組(或類別)。

    完成後,並將您的發行版安裝到您的 Airflow 環境中,政策函數將會被各種 Airflow 组件呼叫。(確切的呼叫順序未定義,因此如果您有多個外掛程式,請不要依賴任何特定的呼叫順序)。

需要注意的一個重要事項(對於定義政策函數的任何方法)是,引數名稱必須與下面文件中記錄的完全一致。

可用的政策函數

airflow.policies.task_policy(task)[source]

允許在 DagBag 中載入任務後變更任務。

它允許管理員重新佈線某些任務的參數。或者,您可以引發 AirflowClusterPolicyViolation 例外以停止執行 DAG。

以下是一些如何使用此功能的範例

  • 您可以為使用 SparkOperator 的任務強制執行特定的佇列(例如 spark 佇列),以確保這些任務連接到正確的 worker

  • 您可以強制執行任務逾時政策,確保沒有任務執行時間超過 48 小時

參數

task (airflow.models.baseoperator.BaseOperator) – 要變更的任務

airflow.policies.dag_policy(dag)[source]

允許在 DagBag 中載入 DAG 後變更 DAG。

它允許管理員重新佈線某些 DAG 的參數。或者,您可以引發 AirflowClusterPolicyViolation 例外以停止執行 DAG。

以下是一些如何使用此功能的範例

  • 您可以為 DAG 強制執行預設使用者

  • 檢查每個 DAG 是否都已配置標籤

參數

dag (airflow.models.dag.DAG) – 要變更的 DAG

airflow.policies.task_instance_mutation_hook(task_instance)[source]

允許在 Airflow 排程器將任務實例排入佇列之前變更任務實例。

例如,這可以用於在重試期間修改任務實例。

參數

task_instance (airflow.models.taskinstance.TaskInstance) – 要變更的任務實例

airflow.policies.pod_mutation_hook(pod)[source]

在排程前變更 Pod。

此設定允許在 kubernetes.client.models.V1Pod 物件傳遞到 Kubernetes client 進行排程之前變更它們。

例如,這可以用於將 sidecar 或 init container 新增到 KubernetesExecutor 或 KubernetesPodOperator 啟動的每個 worker Pod。

airflow.policies.get_airflow_context_vars(context)[source]

將 Airflow 上下文變數注入到預設 Airflow 上下文變數中。

此設定允許取得 Airflow 上下文變數,這些變數是鍵值對。然後它們會被注入到預設的 Airflow 上下文變數中,這些變數最終在執行任務時作為環境變數提供,dag_id、task_id、execution_date、dag_run_id、try_number 是保留鍵。

參數

context – 感興趣的 task_instance 的上下文。

範例

DAG 政策

此政策檢查每個 DAG 是否至少定義一個標籤

def dag_policy(dag: DAG):
    """Ensure that DAG has at least one tag and skip the DAG with `only_for_beta` tag."""
    if not dag.tags:
        raise AirflowClusterPolicyViolation(
            f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
        )

    if "only_for_beta" in dag.tags:
        raise AirflowClusterPolicySkipDag(
            f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
        )


注意

為了避免匯入循環,如果您在叢集政策的類型註釋中使用 DAG,請務必從 airflow.models 而不是從 airflow 匯入。

注意

DAG 政策在 DAG 完全載入後套用,因此覆寫 default_args 參數無效。如果您想要覆寫預設運算子設定,請改用任務政策。

任務政策

以下是在每個任務上強制執行最大逾時政策的範例

class TimedOperator(BaseOperator, ABC):
    timeout: timedelta


def task_policy(task: TimedOperator):
    if task.task_type == "HivePartitionSensor":
        task.queue = "sensor_queue"
    if task.timeout > timedelta(hours=48):
        task.timeout = timedelta(hours=48)


您也可以實作保護措施來防止常見錯誤,而不是作為技術安全控制。例如,不要在沒有 Airflow 擁有者的情況下執行任務

def task_must_have_owners(task: BaseOperator):
    if task.owner and not isinstance(task.owner, str):
        raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")

    if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
        raise AirflowClusterPolicyViolation(
            f"""Task must have non-None non-default owner. Current value: {task.owner}"""
        )


如果您有多個檢查要套用,最佳實務是在單獨的 Python 模組中策劃這些規則,並讓單一政策/任務變更 hook 執行多個這些自訂檢查,並彙總各種錯誤訊息,以便可以在 UI 中報告單一 AirflowClusterPolicyViolation(以及資料庫中的匯入錯誤表格)。

例如,您的 airflow_local_settings.py 可能遵循以下模式

TASK_RULES: list[Callable[[BaseOperator], None]] = [
    task_must_have_owners,
]


def _check_task_rules(current_task: BaseOperator):
    """Check task rules for given task."""
    notices = []
    for rule in TASK_RULES:
        try:
            rule(current_task)
        except AirflowClusterPolicyViolation as ex:
            notices.append(str(ex))
    if notices:
        notices_list = " * " + "\n * ".join(notices)
        raise AirflowClusterPolicyViolation(
            f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
            f"Notices:\n"
            f"{notices_list}"
        )


def example_task_policy(task: BaseOperator):
    """Ensure Tasks have non-default owners."""
    _check_task_rules(task)


有關如何配置本機設定的詳細資訊,請參閱 配置本機設定

任務實例變更

以下範例說明如何將第二次(或更多次)重試的任務重新路由到不同的佇列

def task_instance_mutation_hook(task_instance: TaskInstance):
    if task_instance.try_number >= 1:
        task_instance.queue = "retry_queue"


請注意,由於優先權權重是使用權重規則動態決定的,因此您無法在變更 hook 內變更任務實例的 priority_weight

這個條目有幫助嗎?