排程表

對於具有基於時間的排程(相對於事件驅動)的 DAG,DAG 的內部「排程表」驅動排程。排程表也決定為 DAG 建立的每次運行的資料間隔和邏輯日期。

使用 cron 表達式或 timedelta 物件排程的 DAG 在內部會轉換為始終使用排程表。

如果 cron 表達式或 timedelta 足以滿足您的使用案例,您無需擔心編寫自訂排程表,因為 Airflow 具有處理這些情況的預設排程表。但對於更複雜的排程需求,您可以建立自己的排程表類別,並將其傳遞給 DAG 的 schedule 參數。

以下是一些自訂排程表實作有用的範例

  • 每天在不同時間發生的任務運行。例如,天文學家可能會發現,在黎明時運行任務以處理從前一晚期間收集的資料很有用。

  • 不遵循格里曆的排程。例如,為農曆中的每個月建立一次運行。這在概念上與日出案例類似,但時間尺度不同。

  • 滾動視窗或重疊的資料間隔。例如,您可能希望每天運行一次,但使每次運行涵蓋前七天的期間。可以使用 cron 表達式來破解這個問題,但自訂資料間隔提供了更自然的表示。

  • 資料間隔之間存在「間隙」而不是連續間隔,因為 cron 表達式和 timedelta 排程都表示連續間隔。請參閱資料間隔

Airflow 允許您在外掛程式中編寫自訂排程表,並由 DAG 使用。您可以在使用排程表自訂 DAG 排程操作指南中找到示範自訂排程表的範例。

注意

作為一般規則,始終在您的程式碼中盡可能晚地存取變數、連線或任何其他需要存取資料庫的內容。請參閱排程表以取得更多最佳實務。

內建排程表

Airflow 內建了幾個常見的排程表,以涵蓋最常見的使用案例。其他排程表可能在外掛程式中提供。

CronTriggerTimetable

一個接受 cron 表達式的排程表,並根據它觸發 DAG 運行。

from airflow.timetables.trigger import CronTriggerTimetable


@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)  # At 01:00 on Wednesday
def example_dag():
    pass

您也可以為排程表提供靜態資料間隔。可選的 interval 參數必須是 datetime.timedeltadateutil.relativedelta.relativedelta。當使用這些參數時,觸發的 DAG 運行的資料間隔跨越指定的持續時間,並在觸發時間結束

from datetime import timedelta

from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    # Runs every Friday at 18:00 to cover the work week (9:00 Monday to 18:00 Friday).
    schedule=CronTriggerTimetable(
        "0 18 * * 5",
        timezone="UTC",
        interval=timedelta(days=4, hours=9),
    ),
    ...,
)
def example_dag():
    pass

DeltaDataIntervalTimetable

一個使用時間增量排程資料間隔的排程表。您可以透過將 datetime.timedeltadateutil.relativedelta.relativedelta 提供給 DAG 的 schedule 參數來選擇它。

此排程表專注於資料間隔值,不一定將執行日期與任意界限(例如一天或一小時的開始)對齊。

@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
    pass

CronDataIntervalTimetable

一個接受 cron 表達式的排程表,根據每個 cron 觸發點之間的間隔建立資料間隔,並在每個資料間隔結束時觸發 DAG 運行。

透過將有效的 cron 表達式作為字串提供給 DAG 的 schedule 參數來選擇此排程表,如DAGs 文件中所述。

@dag(schedule="0 1 * * 3")  # At 01:00 on Wednesday.
def example_dag():
    pass

EventsTimetable

傳遞 datetime 列表,以便 DAG 在之後運行。這對於根據體育賽事、計劃的溝通活動和其他任意且不規則但可預測的排程進行計時非常有用。

事件列表必須是有限且大小合理的,因為每次解析 DAG 時都必須載入它。或者,使用 restrict_to_events 標誌來強制手動運行 DAG,該 DAG 使用最近或第一個事件的時間作為資料間隔。否則,手動運行會以等於手動運行開始時間的 data_interval_startdata_interval_end 開始。您也可以使用 description 參數命名事件集,這將顯示在 Airflow UI 中。

from airflow.timetables.events import EventsTimetable


@dag(
    schedule=EventsTimetable(
        event_dates=[
            pendulum.datetime(2022, 4, 5, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 17, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 22, 20, 50, tz="America/Chicago"),
        ],
        description="My Team's Baseball Games",
        restrict_to_events=False,
    ),
    ...,
)
def example_dag():
    pass

基於資料集事件並結合時間的排程

將條件資料集表達式與基於時間的排程結合使用,可增強排程彈性。

DatasetOrTimeSchedule 是一種特殊的排程表,允許根據基於時間的排程和資料集事件來排程 DAG。它還有助於建立排定的運行(根據傳統排程表)和資料集觸發的運行,這兩者是獨立運作的。

此功能在 DAG 需要在資料集更新時以及定期間隔運行的情況下特別有用。它確保工作流程對資料變更保持響應,並持續運行定期檢查或更新。

以下是使用 DatasetOrTimeSchedule 的 DAG 範例

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
    )
    # Additional arguments here, replace this comment with actual arguments
)
def example_dag():
    # DAG tasks go here
    pass

排程表比較

兩種 cron 排程表之間的差異

Airflow 有兩個排程表 CronTriggerTimetableCronDataIntervalTimetable 接受 cron 表達式。

但是,兩者之間存在差異: - CronTriggerTimetable 不處理資料間隔,而 CronDataIntervalTimetable 處理。 - CronTriggerTimetableCronDataIntervalTimetablerun_id 中的時間戳記、logical_date 根據它們如何處理資料間隔而有不同的定義,如 DAG 運行被觸發的時間 中所述。

是否考慮資料間隔

CronTriggerTimetable 包含資料間隔。這表示 data_interval_startdata_interval_end(以及舊版 execution_date)的值相同;即 DAG 運行被觸發的時間。

但是,CronDataIntervalTimetable 確實包含資料間隔。這表示 data_interval_startdata_interval_end(以及舊版 execution_date)的值不同。data_interval_start 是 DAG 運行被觸發的時間,data_interval_end 是間隔的結束時間。

補追 行為

無論您使用 CronTriggerTimetable 還是 CronDataIntervalTimetable,當 catchupTrue 時,沒有差異。

在某些情況下,您可能希望為 catchup 使用 False,以防止運行不必要的 DAG: - 如果您建立一個新的 DAG,其開始日期在過去,並且不想運行過去的 DAG。如果 catchupTrue,Airflow 會運行該時間間隔內本應運行的所有 DAG。 - 如果您暫停現有的 DAG,然後在稍後日期重新啟動它,並且不想如果 catchupTrue

在這些情況下,run_id 中的 logical_date 基於 CronTriggerTimetableCronDataIntervalTimetable 如何處理資料間隔。

請參閱 補追 以取得有關在使用 catchup 時如何觸發 DAG 運行的更多資訊。

DAG 運行被觸發的時間

CronTriggerTimetableCronDataIntervalTimetable 在同一時間觸發 DAG 運行。但是,每個排程表的 run_id 的時間戳記不同。

例如,假設有一個 cron 表達式 @daily0 0 * * *,排定在每天凌晨 12 點運行。如果您在 1 月 31 日下午 3 點啟用使用這兩個排程表的 DAG, - CronTriggerTimetable 會在 2 月 1 日凌晨 12 點觸發新的 DAG 運行。run_id 時間戳記為 2 月 1 日午夜。 - CronDataIntervalTimetable 會立即觸發新的 DAG 運行,因為從 1 月 31 日凌晨 12 點開始的每日時間間隔的 DAG 運行尚未發生。run_id 時間戳記為 1 月 31 日午夜,因為那是資料間隔的開始時間。

這是另一個範例,顯示在跳過 DAG 運行時的差異。

假設有兩個正在運行的 DAG,它們具有 cron 表達式 @daily0 0 * * *,並且使用兩個不同的排程表。如果您在 1 月 31 日下午 3 點暫停 DAG,並在 2 月 2 日下午 3 點重新啟用它們, - CronTriggerTimetable 會跳過本應在 2 月 1 日和 2 日觸發的 DAG 運行。下一個 DAG 運行將在 2 月 3 日凌晨 12 點觸發。 - CronDataIntervalTimetable 只會跳過本應在 2 月 1 日觸發的 DAG 運行。在您重新啟用 DAG 後,會立即觸發 2 月 2 日的 DAG 運行。

在這些範例中,您可以看到 CronTriggerTimetable 觸發 DAG 運行的方式比 CronDataIntervalTimetable 更直觀,也更符合人們對 cron 行為的期望。

cron 和 delta 資料間隔排程表之間的差異:

DeltaDataIntervalTimetableCronDataIntervalTimetable 之間進行選擇取決於您的使用案例。如果您在 2 月 1 日 01:05 啟用 DAG,下表總結了根據 3 個參數建立的 DAG 運行及其涵蓋的資料間隔:schedulestart_datecatchup

schedule

start_date

catchup

涵蓋的間隔

備註

*/30 * * * *

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

與使用 timedelta 物件的行為相同。

*/30 * * * *

year-02-01

False

  • 00:30 - 01:00

*/30 * * * *

year-02-01 00:10

True

  • 00:30 - 01:00

間隔 00:00 - 00:30 不在開始日期之後,因此被跳過。

*/30 * * * *

year-02-01 00:10

False

  • 00:30 - 01:00

無論開始日期為何,資料間隔都與小時/天/等界限對齊。

datetime.timedelta(minutes=30)

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

與使用 cron 表達式的行為相同。

datetime.timedelta(minutes=30)

year-02-01

False

  • 00:35 - 01:05

間隔未與開始日期對齊,而是與目前時間對齊。

datetime.timedelta(minutes=30)

year-02-01 00:10

True

  • 00:10 - 00:40

間隔與開始日期對齊。下一個將在 5 分鐘後觸發,涵蓋 00:40 - 01:10。

datetime.timedelta(minutes=30)

year-02-01 00:10

False

  • 00:35 - 01:05

間隔與目前時間對齊。下一次運行將在 30 分鐘後觸發。

這個條目有幫助嗎?