叢集政策¶
如果您想要在叢集層級檢查或變更 DAG 或任務,叢集政策可讓您做到。它們有三個主要目的
檢查 DAG/任務是否符合特定標準
在 DAG/任務上設定預設參數
執行自訂路由邏輯
叢集政策主要有三種類型
task_policy
:接受名為task
的BaseOperator
參數。當從 DagBag 剖析任務時建立任務時,會執行此政策。這表示可以在任務政策中變更整個任務定義。它與 DagRun 中執行的特定任務無關。task_policy
定義的政策會套用至未來將執行的所有任務實例。task_instance_mutation_hook
:接受名為task_instance
的TaskInstance
參數。task_instance_mutation_hook
不是套用至任務,而是套用至與特定 DagRun 相關的任務實例。它在「worker」中執行,而不是在 DAG 檔案處理器中,就在任務實例執行之前。該政策僅適用於目前執行的該任務執行個體。
DAG 和任務叢集政策可以引發 AirflowClusterPolicyViolation
例外,以指示傳遞給它們的 DAG/任務不合規,不應載入。
當需要有意略過該 DAG 時,它們也可以引發 AirflowClusterPolicySkipDag
例外。與 AirflowClusterPolicyViolation
不同,此例外不會顯示在 Airflow Web UI 上(在內部,它不會記錄在元資料庫的 import_error
表格中。)
叢集政策設定的任何額外屬性都優先於 DAG 檔案中定義的屬性;例如,如果您在 DAG 檔案中的任務上設定 sla
,然後您的叢集政策也設定了 sla
,則叢集政策的值將優先。
如何定義政策函數¶
有兩種配置叢集政策的方法
在 Python 搜尋路徑中的某處建立一個
airflow_local_settings.py
檔案($AIRFLOW_HOME 下的config/
資料夾是一個很好的「預設」位置),然後將可呼叫物件新增到與上述一個或多個叢集政策名稱(例如dag_policy
)相符的檔案中。
有關如何配置本機設定的詳細資訊,請參閱 配置本機設定。
透過在自訂模組中使用 setuptools entrypoint,使用 Pluggy 介面。
版本 2.6 新增。
此方法更進階,適用於已經熟悉 Python 套件的人員。
首先在模組中建立您的政策函數
from airflow.policies import hookimpl @hookimpl def task_policy(task) -> None: # Mutate task in place # ... print(f"Hello from {__file__}")
然後將 entrypoint 新增到您的專案規格中。例如,使用
pyproject.toml
和setuptools
[build-system] requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" [project] name = "my-airflow-plugin" version = "0.0.1" # ... dependencies = ["apache-airflow>=2.6"] [project.entry-points.'airflow.policy'] _ = 'my_airflow_plugin.policies'
entrypoint 群組必須是
airflow.policy
,並且名稱會被忽略。值應為使用@hookimpl
標記裝飾的模組(或類別)。完成後,並將您的發行版安裝到您的 Airflow 環境中,政策函數將會被各種 Airflow 组件呼叫。(確切的呼叫順序未定義,因此如果您有多個外掛程式,請不要依賴任何特定的呼叫順序)。
需要注意的一個重要事項(對於定義政策函數的任何方法)是,引數名稱必須與下面文件中記錄的完全一致。
可用的政策函數¶
- airflow.policies.task_policy(task)[source]¶
允許在 DagBag 中載入任務後變更任務。
它允許管理員重新佈線某些任務的參數。或者,您可以引發
AirflowClusterPolicyViolation
例外以停止執行 DAG。以下是一些如何使用此功能的範例
您可以為使用
SparkOperator
的任務強制執行特定的佇列(例如spark
佇列),以確保這些任務連接到正確的 worker您可以強制執行任務逾時政策,確保沒有任務執行時間超過 48 小時
- 參數
task (airflow.models.baseoperator.BaseOperator) – 要變更的任務
- airflow.policies.dag_policy(dag)[source]¶
允許在 DagBag 中載入 DAG 後變更 DAG。
它允許管理員重新佈線某些 DAG 的參數。或者,您可以引發
AirflowClusterPolicyViolation
例外以停止執行 DAG。以下是一些如何使用此功能的範例
您可以為 DAG 強制執行預設使用者
檢查每個 DAG 是否都已配置標籤
- 參數
dag (airflow.models.dag.DAG) – 要變更的 DAG
- airflow.policies.task_instance_mutation_hook(task_instance)[source]¶
允許在 Airflow 排程器將任務實例排入佇列之前變更任務實例。
例如,這可以用於在重試期間修改任務實例。
- 參數
task_instance (airflow.models.taskinstance.TaskInstance) – 要變更的任務實例
範例¶
DAG 政策¶
此政策檢查每個 DAG 是否至少定義一個標籤
def dag_policy(dag: DAG):
"""Ensure that DAG has at least one tag and skip the DAG with `only_for_beta` tag."""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
)
if "only_for_beta" in dag.tags:
raise AirflowClusterPolicySkipDag(
f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
)
注意
為了避免匯入循環,如果您在叢集政策的類型註釋中使用 DAG
,請務必從 airflow.models
而不是從 airflow
匯入。
注意
DAG 政策在 DAG 完全載入後套用,因此覆寫 default_args
參數無效。如果您想要覆寫預設運算子設定,請改用任務政策。
任務政策¶
以下是在每個任務上強制執行最大逾時政策的範例
class TimedOperator(BaseOperator, ABC):
timeout: timedelta
def task_policy(task: TimedOperator):
if task.task_type == "HivePartitionSensor":
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)
您也可以實作保護措施來防止常見錯誤,而不是作為技術安全控制。例如,不要在沒有 Airflow 擁有者的情況下執行任務
def task_must_have_owners(task: BaseOperator):
if task.owner and not isinstance(task.owner, str):
raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")
if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
raise AirflowClusterPolicyViolation(
f"""Task must have non-None non-default owner. Current value: {task.owner}"""
)
如果您有多個檢查要套用,最佳實務是在單獨的 Python 模組中策劃這些規則,並讓單一政策/任務變更 hook 執行多個這些自訂檢查,並彙總各種錯誤訊息,以便可以在 UI 中報告單一 AirflowClusterPolicyViolation
(以及資料庫中的匯入錯誤表格)。
例如,您的 airflow_local_settings.py
可能遵循以下模式
TASK_RULES: list[Callable[[BaseOperator], None]] = [
task_must_have_owners,
]
def _check_task_rules(current_task: BaseOperator):
"""Check task rules for given task."""
notices = []
for rule in TASK_RULES:
try:
rule(current_task)
except AirflowClusterPolicyViolation as ex:
notices.append(str(ex))
if notices:
notices_list = " * " + "\n * ".join(notices)
raise AirflowClusterPolicyViolation(
f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
f"Notices:\n"
f"{notices_list}"
)
def example_task_policy(task: BaseOperator):
"""Ensure Tasks have non-default owners."""
_check_task_rules(task)
有關如何配置本機設定的詳細資訊,請參閱 配置本機設定。
任務實例變更¶
以下範例說明如何將第二次(或更多次)重試的任務重新路由到不同的佇列
def task_instance_mutation_hook(task_instance: TaskInstance):
if task_instance.try_number >= 1:
task_instance.queue = "retry_queue"
請注意,由於優先權權重是使用權重規則動態決定的,因此您無法在變更 hook 內變更任務實例的 priority_weight
。