資料感知排程

版本 2.4 新增。

快速開始

除了基於時間排程 DAG 之外,您還可以根據任務何時更新資料集來排程 DAG 執行。

from airflow.datasets import Dataset

with DAG(...):
    MyOperator(
        # this task updates example.csv
        outlets=[Dataset("s3://dataset-bucket/example.csv")],
        ...,
    )


with DAG(
    # this DAG should be run when example.csv is updated (by dag1)
    schedule=[Dataset("s3://dataset-bucket/example.csv")],
    ...,
):
    ...
../_images/dataset-scheduled-dags.png

什麼是「資料集」?

Airflow 資料集是資料的邏輯分組。上游生產者任務可以更新資料集,而資料集更新有助於排程下游消費者 DAG。

統一資源識別符 (URI) 定義資料集

from airflow.datasets import Dataset

example_dataset = Dataset("s3://dataset-bucket/example.csv")

Airflow 不對 URI 所代表的資料內容或位置做任何假設,並將 URI 視為字串。這表示 Airflow 會將任何正規表示式(例如 input_\d+.csv)或檔案 glob 模式(例如 input_2022*.csv)視為嘗試從一個宣告建立多個資料集,並且它們將無法運作。

您必須使用有效的 URI 建立資料集。Airflow 核心和提供者定義了各種您可以使用的 URI 方案,例如 file (核心)、postgres (由 Postgres 提供者提供) 和 s3 (由 Amazon 提供者提供)。第三方提供者和外掛程式也可能提供他們自己的方案。這些預先定義的方案具有各自的語意,預期應遵循這些語意。

什麼是有效的 URI?

從技術上講,URI 必須符合 RFC 3986 中的有效字元集,基本上是 ASCII 字母數字字元,加上 %-_.~。若要識別無法以 URI 安全字元表示的資源,請使用 百分比編碼 對資源名稱進行編碼。

URI 也區分大小寫,因此 s3://example/datasets3://Example/Dataset 會被視為不同。請注意,URI 的主機部分也區分大小寫,這與 RFC 3986 不同。

請勿使用 airflow 方案,該方案保留給 Airflow 的內部使用。

Airflow 始終偏好在方案中使用小寫,並且 URI 的主機部分需要區分大小寫,才能正確區分資源。

# invalid datasets:
reserved = Dataset("airflow://example_dataset")
not_ascii = Dataset("èxample_datašet")

如果您想要使用不包含其他語意約束的方案來定義資料集,請使用帶有前綴 x- 的方案。Airflow 會跳過對具有這些方案的 URI 的任何語意驗證。

# valid dataset, treated as a plain string
my_ds = Dataset("x-my-thing://foobarbaz")

識別符不必是絕對的;它可以是無方案的相對 URI,甚至只是一個簡單的路徑或字串

# valid datasets:
schemeless = Dataset("//example/dataset")
csv_file = Dataset("example_dataset")

非絕對識別符被視為純字串,不對 Airflow 帶有任何語意意義。

關於資料集的額外資訊

如果需要,您可以在資料集中包含一個額外的字典

example_dataset = Dataset(
    "s3://dataset/example.csv",
    extra={"team": "trainees"},
)

這可以用於為資料集提供自訂描述,例如誰擁有目標檔案的所有權,或該檔案的用途。額外資訊不會影響資料集的身份。這表示 DAG 將由具有相同 URI 的資料集觸發,即使額外字典不同

with DAG(
    dag_id="consumer",
    schedule=[Dataset("s3://dataset/example.csv", extra={"different": "extras"})],
):
    ...

with DAG(dag_id="producer", ...):
    MyOperator(
        # triggers "consumer" with the given extra!
        outlets=[Dataset("s3://dataset/example.csv", extra={"team": "trainees"})],
        ...,
    )

注意

安全注意事項:資料集 URI 和額外欄位未加密,它們以明文形式儲存在 Airflow 的中繼資料資料庫中。請勿在資料集 URI 或額外鍵值中儲存任何敏感值,尤其是憑證!

如何在您的 DAG 中使用資料集

您可以使用資料集在您的 DAG 中指定資料依賴關係。以下範例顯示在 producer DAG 中的 producer 任務成功完成後,Airflow 如何排程 consumer DAG。只有在任務成功完成時,Airflow 才會將資料集標記為 updated。如果任務失敗或被跳過,則不會發生更新,並且 Airflow 不會排程 consumer DAG。

example_dataset = Dataset("s3://dataset/example.csv")

with DAG(dag_id="producer", ...):
    BashOperator(task_id="producer", outlets=[example_dataset], ...)

with DAG(dag_id="consumer", schedule=[example_dataset], ...):
    ...

您可以在 資料集檢視 中找到資料集與 DAG 之間關係的列表

多個資料集

由於 schedule 參數是一個列表,DAG 可以需要多個資料集。在 DAG 使用的所有資料集自上次 DAG 執行以來至少更新一次後,Airflow 才會排程 DAG

with DAG(
    dag_id="multiple_datasets_example",
    schedule=[
        example_dataset_1,
        example_dataset_2,
        example_dataset_3,
    ],
    ...,
):
    ...

如果一個資料集在所有使用的資料集更新之前更新多次,下游 DAG 仍然只執行一次,如此圖所示

graph dataset_event_timeline { graph [layout=neato] { node [margin=0 fontcolor=blue width=0.1 shape=point label=""] e1 [pos="1,2.5!"] e2 [pos="2,2.5!"] e3 [pos="2.5,2!"] e4 [pos="4,2.5!"] e5 [pos="5,2!"] e6 [pos="6,2.5!"] e7 [pos="7,1.5!"] r7 [pos="7,1!" shape=star width=0.25 height=0.25 fixedsize=shape] e8 [pos="8,2!"] e9 [pos="9,1.5!"] e10 [pos="10,2!"] e11 [pos="11,1.5!"] e12 [pos="12,2!"] e13 [pos="13,2.5!"] r13 [pos="13,1!" shape=star width=0.25 height=0.25 fixedsize=shape] } { node [shape=none label="" width=0] end_ds1 [pos="14,2.5!"] end_ds2 [pos="14,2!"] end_ds3 [pos="14,1.5!"] } { node [shape=none margin=0.25 fontname="roboto,sans-serif"] example_dataset_1 [ pos="-0.5,2.5!"] example_dataset_2 [ pos="-0.5,2!"] example_dataset_3 [ pos="-0.5,1.5!"] dag_runs [label="DagRuns created" pos="-0.5,1!"] } edge [color=lightgrey] example_dataset_1 -- e1 -- e2 -- e4 -- e6 -- e13 -- end_ds1 example_dataset_2 -- e3 -- e5 -- e8 -- e10 -- e12 -- end_ds2 example_dataset_3 -- e7 -- e9 -- e11 -- end_ds3 }

將額外資訊附加到發射資料集事件

版本 2.10.0 新增。

具有資料集出口的任務可以選擇在發射資料集事件之前附加額外資訊。這與 關於資料集的額外資訊 不同。資料集上的額外資訊靜態地描述了資料集 URI 指向的實體;相反,資料集事件上的額外資訊應用於註解觸發資料變更,例如資料庫中有多少行被更新更改,或其涵蓋的日期範圍。

將額外資訊附加到資料集事件的最簡單方法是從任務中 yield 一個 Metadata 物件

from airflow.datasets import Dataset
from airflow.datasets.metadata import Metadata

example_s3_dataset = Dataset("s3://dataset/example.csv")


@task(outlets=[example_s3_dataset])
def write_to_s3():
    df = ...  # Get a Pandas DataFrame to write.
    # Write df to dataset...
    yield Metadata(example_s3_dataset, {"row_count": len(df)})

Airflow 自動收集所有產生的中繼資料,並使用相應中繼資料物件的額外資訊填充資料集事件。

這也可以在傳統運算子中完成。最好的方法是子類化運算子並覆寫 execute。或者,也可以在任務的 pre_executepost_execute hook 中新增額外資訊。但是,如果您選擇使用 hook,請記住,當任務重試時,它們不會重新執行,並且可能導致額外資訊與某些情境中的實際資料不符。

實現相同目的的另一種方法是直接在任務的執行環境中存取 outlet_events

@task(outlets=[example_s3_dataset])
def write_to_s3(*, outlet_events):
    outlet_events[example_s3_dataset].extra = {"row_count": len(df)}

這裡幾乎沒有魔法—Airflow 只是將產生的值寫入完全相同的存取器。這也適用於傳統運算子,包括 executepre_executepost_execute

從先前發射的資料集事件中獲取資訊

版本 2.10.0 新增。

在任務的 outlets 中定義的資料集事件(如前一節所述)可以由在其 inlets 中宣告相同資料集的任務讀取。資料集事件條目包含 extra(詳情請參閱前一節)、timestamp(指示事件何時從任務發射)和 source_task_instance(將事件連結回其來源)。

可以使用執行環境中的 inlet_events 存取器讀取入口資料集事件。繼續前一節中的 write_to_s3 任務

@task(inlets=[example_s3_dataset])
def post_process_s3_file(*, inlet_events):
    events = inlet_events[example_s3_dataset]
    last_row_count = events[-1].extra["row_count"]

inlet_events 映射中的每個值都是一個類似序列的物件,它按 timestamp 對給定資料集的過去事件進行排序,從最早到最新。它支援 Python 大部分列表介面,因此您可以使用 [-1] 存取最後一個事件,[-2:] 存取最後兩個事件等等。存取器是惰性的,只有當您存取其中的項目時才會命中資料庫。

從觸發資料集事件中獲取資訊

觸發的 DAG 可以使用 triggering_dataset_events 範本或參數從觸發它的資料集中獲取資訊。請參閱 範本參考 了解更多資訊。

範例

example_snowflake_dataset = Dataset("snowflake://my_db/my_schema/my_table")

with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
    SQLExecuteQueryOperator(
        task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_dataset], ...
    )

with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_dataset], ...):
    SQLExecuteQueryOperator(
        task_id="query",
        conn_id="snowflake_default",
        sql="""
          SELECT *
          FROM my_db.my_schema.my_table
          WHERE "updated_at" >= '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_start }}'
          AND "updated_at" < '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_end }}';
        """,
    )

    @task
    def print_triggering_dataset_events(triggering_dataset_events=None):
        for dataset, dataset_list in triggering_dataset_events.items():
            print(dataset, dataset_list)
            print(dataset_list[0].source_dag_run.dag_id)

    print_triggering_dataset_events()

請注意,此範例使用 (.values() | first | first) 來獲取給定 DAG 的一個資料集中的第一個,以及該資料集的一個 DatasetEvent 中的第一個。如果您有多個資料集,且可能有多個 DatasetEvent,則實作可能會非常複雜。

透過 REST API 操作佇列的資料集事件

版本 2.9 新增。

在此範例中,當任務更新資料集「dataset-1」和「dataset-2」時,DAG waiting_for_dataset_1_and_2 將被觸發。一旦「dataset-1」更新,Airflow 就會建立一個記錄。這確保 Airflow 知道在「dataset-2」更新時觸發 DAG。我們將此類記錄稱為佇列的資料集事件。

with DAG(
    dag_id="waiting_for_dataset_1_and_2",
    schedule=[Dataset("dataset-1"), Dataset("dataset-2")],
    ...,
):
    ...

引入 queuedEvent API 端點來操作此類記錄。

  • 獲取 DAG 的佇列資料集事件:/datasets/queuedEvent/{uri}

  • 獲取 DAG 的佇列資料集事件:/dags/{dag_id}/datasets/queuedEvent

  • 刪除 DAG 的佇列資料集事件:/datasets/queuedEvent/{uri}

  • 刪除 DAG 的佇列資料集事件:/dags/{dag_id}/datasets/queuedEvent

  • 獲取資料集的佇列資料集事件:/dags/{dag_id}/datasets/queuedEvent/{uri}

  • 刪除資料集的佇列資料集事件:DELETE /dags/{dag_id}/datasets/queuedEvent/{uri}

有關如何使用 REST API 以及這些端點所需的參數,請參閱 Airflow API

使用條件表達式的高階資料集排程

Apache Airflow 包含使用具有資料集的條件表達式的高階排程功能。此功能允許您根據資料集更新為 DAG 執行定義複雜的依賴關係,使用邏輯運算子來更精確地控制工作流程觸發器。

資料集的邏輯運算子

Airflow 支援兩個邏輯運算子來組合資料集條件

  • AND (``&``):指定只有在所有指定的資料集都已更新後才應觸發 DAG。

  • OR (``|``):指定當任何指定的資料集更新時應觸發 DAG。

這些運算子使您能夠配置您的 Airflow 工作流程以使用更複雜的資料集更新條件,使其更具動態性和彈性。

使用範例

基於多個資料集更新的排程

若要排程僅在兩個特定資料集都已更新時才執行的 DAG,請使用 AND 運算子 (&)

dag1_dataset = Dataset("s3://dag1/output_1.txt")
dag2_dataset = Dataset("s3://dag2/output_1.txt")

with DAG(
    # Consume dataset 1 and 2 with dataset expressions
    schedule=(dag1_dataset & dag2_dataset),
    ...,
):
    ...

基於任何資料集更新的排程

若要在兩個資料集中的任一個更新時觸發 DAG 執行,請套用 OR 運算子 (|)

with DAG(
    # Consume dataset 1 or 2 with dataset expressions
    schedule=(dag1_dataset | dag2_dataset),
    ...,
):
    ...

複雜的條件邏輯

對於需要更複雜條件的情境,例如在一個資料集更新時或在另外兩個資料集都更新時觸發 DAG,請組合 OR 和 AND 運算子

dag3_dataset = Dataset("s3://dag3/output_3.txt")

with DAG(
    # Consume dataset 1 or both 2 and 3 with dataset expressions
    schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
    ...,
):
    ...

透過 DatasetAlias 動態資料事件發射和資料集建立

資料集別名可用於發射與別名關聯的資料集事件。下游可以依賴已解析的資料集。此功能允許您根據資料集更新為 DAG 執行定義複雜的依賴關係。

如何使用 DatasetAlias

DatasetAlias 只有一個單一引數 name,用於唯一識別資料集。任務必須首先宣告別名作為出口,並使用 outlet_events 或 yield Metadata 來向其新增事件。

以下範例針對 S3 URI f"s3://bucket/my-task" 建立資料集事件,並帶有可選的額外資訊 extra。如果資料集不存在,Airflow 將動態建立它並記錄警告訊息。

透過 outlet_events 在任務執行期間發射資料集事件

from airflow.datasets import DatasetAlias


@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[DatasetAlias("my-task-outputs")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})

透過 yield Metadata 在任務執行期間發射資料集事件

from airflow.datasets.metadata import Metadata


@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_metadata():
    s3_dataset = Dataset("s3://bucket/my-task")
    yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")

即使資料集多次新增到別名,或新增到多個別名,也只會為新增的資料集發射一個資料集事件。但是,如果傳遞了不同的 extra 值,則可以發射多個資料集事件。在以下範例中,將發射兩個資料集事件。

from airflow.datasets import DatasetAlias


@task(
    outlets=[
        DatasetAlias("my-task-outputs-1"),
        DatasetAlias("my-task-outputs-2"),
        DatasetAlias("my-task-outputs-3"),
    ]
)
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[DatasetAlias("my-task-outputs-1")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
    # This line won't emit an additional dataset event as the dataset and extra are the same as the previous line.
    outlet_events[DatasetAlias("my-task-outputs-2")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})
    # This line will emit an additional dataset event as the extra is different.
    outlet_events[DatasetAlias("my-task-outputs-3")].add(Dataset("s3://bucket/my-task"), extra={"k2": "v2"})

基於資料集別名的排程

由於新增到別名的資料集事件只是簡單的資料集事件,因此依賴於實際資料集的下游 DAG 可以正常讀取其資料集事件,而無需考慮關聯的別名。下游 DAG 也可以依賴資料集別名。編寫語法是透過名稱引用 DatasetAlias,並揀選關聯的資料集事件以進行排程。請注意,只有當別名解析為 Dataset("s3://bucket/my-task") 時,才會由具有 outlets=DatasetAlias("xxx") 的任務觸發 DAG。每當具有出口 DatasetAlias("out") 的任務在執行時至少與一個資料集關聯時,DAG 就會執行,而與資料集的身份無關。如果特定給定任務執行的別名未關聯任何資料集,則不會觸發下游 DAG。這也表示我們可以進行條件式資料集觸發。

資料集別名在 DAG 解析期間解析為資料集。因此,如果將「min_file_process_interval」組態設定為高值,則資料集別名可能無法解析。若要解決此問題,您可以觸發 DAG 解析。

with DAG(dag_id="dataset-producer"):

    @task(outlets=[Dataset("example-alias")])
    def produce_dataset_events():
        pass


with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(Dataset("s3://bucket/my-task"))


with DAG(dag_id="dataset-consumer", schedule=Dataset("s3://bucket/my-task")):
    ...

with DAG(dag_id="dataset-alias-consumer", schedule=DatasetAlias("example-alias")):
    ...

在提供的範例中,一旦執行 DAG dataset-alias-producer,資料集別名 DatasetAlias("example-alias") 將解析為 Dataset("s3://bucket/my-task")。但是,DAG dataset-alias-consumer 將必須等待下一次 DAG 重新解析才能更新其排程。為了解決這個問題,當資料集別名 DatasetAlias("example-alias") 解析為這些 DAG 之前未依賴的資料集時,Airflow 將重新解析依賴於該別名的 DAG。因此,在執行 DAG dataset-alias-producer 後,將觸發「dataset-consumer」和「dataset-alias-consumer」DAG。

透過已解析的資料集別名從先前發射的資料集事件中獲取資訊

從先前發射的資料集事件中獲取資訊 中所述,入口資料集事件可以使用執行環境中的 inlet_events 存取器讀取,您也可以使用資料集別名來存取由它們觸發的資料集事件。

with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(
            Dataset("s3://bucket/my-task"), extra={"row_count": 1}
        )


with DAG(dag_id="dataset-alias-consumer", schedule=None):

    @task(inlets=[DatasetAlias("example-alias")])
    def consume_dataset_alias_events(*, inlet_events):
        events = inlet_events[DatasetAlias("example-alias")]
        last_row_count = events[-1].extra["row_count"]

結合資料集和基於時間的排程

DatasetTimetable 整合

您可以使用 DatasetOrTimeSchedule 基於資料集事件和基於時間的排程來排程 DAG。這允許您在 DAG 需要由資料更新觸發並根據固定時間表定期執行時建立工作流程。

有關 DatasetOrTimeSchedule 的更多詳細資訊,請參閱 DatasetOrTimeSchedule 中的相應章節。

這個條目有幫助嗎?