使用 Timetable 自訂 DAG 排程

在我們的範例中,假設一家公司想要在每個工作日之後執行一個任務,以處理在工作日期間收集的資料。第一個直覺的答案可能是 schedule="0 0 * * 1-5"(週一至週五的午夜),但這表示在週五收集的資料不會在週五結束後立即處理,而是在下週一處理,並且該執行的間隔將從週五午夜到週一午夜。此外,上述排程字串無法略過假日的處理。我們想要的是

  • 為每週一、週二、週三、週四和週五排程執行。執行的資料間隔將涵蓋每天的午夜到隔天的午夜(例如 2021-01-01 00:00:00 至 2021-01-02 00:00:00)。

  • 每次執行都會在資料間隔結束後立即建立。涵蓋週一的執行發生在週二午夜,依此類推。涵蓋週五的執行發生在週六午夜。週日和週一午夜不會發生執行。

  • 不要在定義的假日排程執行。

為了簡化,我們在此範例中僅處理 UTC 日期時間。

注意

自訂 timetable 傳回的所有日期時間值必須是「感知型」,即包含時區資訊。此外,它們必須使用 pendulum 的日期時間和時區類型。

Timetable 註冊

timetable 必須是 Timetable 的子類別,並且註冊為 plugin 的一部分。以下是我們實作新 timetable 的骨架

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable


class AfterWorkdayTimetable(Timetable):
    pass


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]

接下來,我們將開始將程式碼放入 AfterWorkdayTimetable。實作完成後,我們應該能夠在我們的 DAG 檔案中使用 timetable

import pendulum

from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable


with DAG(
    dag_id="example_after_workday_timetable_dag",
    start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
    schedule=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
):
    ...

定義排程邏輯

當 Airflow 的排程器遇到 DAG 時,它會呼叫兩種方法之一,以了解何時排程 DAG 的下一次執行。

  • next_dagrun_info:排程器使用此方法來了解 timetable 的常規排程,即我們範例中的「每個工作日一次,在結束時執行」部分。

  • infer_manual_data_interval:當 DAG 執行是手動觸發時(例如從 Web UI),排程器會使用此方法來了解如何反向推斷排程外的執行的資料間隔。

我們將從 infer_manual_data_interval 開始,因為它是兩者中較簡單的一個

airflow/example_dags/plugins/workday.py[原始碼]

def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
    start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
    # Skip backwards over weekends and holidays to find last run
    start = self.get_next_workday(start, incr=-1)
    return DataInterval(start=start, end=(start + timedelta(days=1)))

此方法接受一個引數 run_after,這是一個 pendulum.DateTime 物件,指示 DAG 何時從外部觸發。由於我們的 timetable 為每個完整工作日建立資料間隔,因此此處推斷的資料間隔通常應從 run_after 前一天的午夜開始,但如果 run_after 落在週日或週一(即前一天是週六或週日),則應進一步推回至前一個週五。一旦我們知道間隔的開始時間,結束時間就只是它之後的完整一天。然後,我們建立一個 DataInterval 物件來描述此間隔。

接下來是 next_dagrun_info 的實作

airflow/example_dags/plugins/workday.py[原始碼]

def next_dagrun_info(
    self,
    *,
    last_automated_data_interval: DataInterval | None,
    restriction: TimeRestriction,
) -> DagRunInfo | None:
    if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
        last_start = last_automated_data_interval.start
        next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
    # Otherwise this is the first ever run on the regular schedule...
    elif (earliest := restriction.earliest) is None:
        return None  # No start_date. Don't schedule.
    elif not restriction.catchup:
        # If the DAG has catchup=False, today is the earliest to consider.
        next_start = max(earliest, DateTime.combine(Date.today(), Time.min))
    elif earliest.time() != Time.min:
        # If earliest does not fall on midnight, skip to the next day.
        next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
    else:
        next_start = earliest
    # Skip weekends and holidays
    next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))

    if restriction.latest is not None and next_start > restriction.latest:
        return None  # Over the DAG's scheduled end; don't schedule.
    return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))

此方法接受兩個引數。last_automated_data_interval 是一個 DataInterval 實例,指示此 DAG 先前非手動觸發的執行的資料間隔,如果這是 DAG 首次排程,則為 Nonerestriction 封裝 DAG 及其任務如何指定排程,並包含三個屬性

  • earliest:DAG 可能排程的最早時間。這是從 DAG 及其任務的所有 start_date 引數計算出的 pendulum.DateTime,如果根本沒有找到 start_date 引數,則為 None

  • latest:與 earliest 類似,這是 DAG 可能排程的最晚時間,從 end_date 引數計算得出。

  • catchup:一個布林值,反映 DAG 的 catchup 引數。

注意

earliestlatest 都適用於 DAG 執行的邏輯日期(資料間隔的開始時間),而不是執行排程的時間(通常在資料間隔結束之後)。

如果先前已排程執行,我們現在應該透過迴圈瀏覽後續日期以找到不是週六、週日或美國假日的日期,來排程下一個非假日工作日。但是,如果先前沒有排程執行,我們將選擇 restriction.earliest 之後的下一個非假日工作日的午夜。restriction.catchup 也需要考慮 - 如果它是 False,即使 start_date 值在過去,我們也無法在目前時間之前排程。最後,如果我們計算出的資料間隔晚於 restriction.latest,我們必須尊重它,並且不排程執行,而是傳回 None

如果我們決定排程執行,我們需要使用 DagRunInfo 來描述它。此類型有兩個引數和屬性

  • data_interval:一個 DataInterval 實例,描述下一次執行的資料間隔。

  • run_after:一個 pendulum.DateTime 實例,告知排程器何時可以排程 DAG 執行。

DagRunInfo 可以這樣建立

info = DagRunInfo(
    data_interval=DataInterval(start=start, end=end),
    run_after=run_after,
)

由於我們通常希望在資料間隔結束後立即排程執行,因此上面的 endrun_after 通常是相同的。DagRunInfo 因此為此提供了一個快捷方式

info = DagRunInfo.interval(start=start, end=end)
assert info.data_interval.end == info.run_after  # Always True.

為了參考,以下是我們的 plugin 和 DAG 檔案的完整內容

airflow/example_dags/plugins/workday.py[原始碼]

from pendulum import UTC, Date, DateTime, Time

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable

if TYPE_CHECKING:
    from airflow.timetables.base import TimeRestriction

log = logging.getLogger(__name__)
try:
    from pandas.tseries.holiday import USFederalHolidayCalendar

    holiday_calendar = USFederalHolidayCalendar()
except ImportError:
    log.warning("Could not import pandas. Holidays will not be considered.")
    holiday_calendar = None  # type: ignore[assignment]


class AfterWorkdayTimetable(Timetable):
    def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
        next_start = d
        while True:
            if next_start.weekday() not in (5, 6):  # not on weekend
                if holiday_calendar is None:
                    holidays = set()
                else:
                    holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
                if next_start not in holidays:
                    break
            next_start = next_start.add(days=incr)
        return next_start
    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
        # Skip backwards over weekends and holidays to find last run
        start = self.get_next_workday(start, incr=-1)
        return DataInterval(start=start, end=(start + timedelta(days=1)))
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
            last_start = last_automated_data_interval.start
            next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
        # Otherwise this is the first ever run on the regular schedule...
        elif (earliest := restriction.earliest) is None:
            return None  # No start_date. Don't schedule.
        elif not restriction.catchup:
            # If the DAG has catchup=False, today is the earliest to consider.
            next_start = max(earliest, DateTime.combine(Date.today(), Time.min))
        elif earliest.time() != Time.min:
            # If earliest does not fall on midnight, skip to the next day.
            next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
        else:
            next_start = earliest
        # Skip weekends and holidays
        next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))

        if restriction.latest is not None and next_start > restriction.latest:
            return None  # Over the DAG's scheduled end; don't schedule.
        return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]


import pendulum

from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.operators.empty import EmptyOperator


with DAG(
    dag_id="example_workday_timetable",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
):
    EmptyOperator(task_id="run_this")

參數化 Timetable

有時我們需要將一些執行時期引數傳遞給 timetable。繼續我們的 AfterWorkdayTimetable 範例,也許我們有在不同時區運行的 DAG,並且我們想要在隔天早上 8 點而不是午夜排程一些 DAG。我們不需要為每個目的建立單獨的 timetable,而是想要執行類似以下的操作

class SometimeAfterWorkdayTimetable(Timetable):
    def __init__(self, schedule_at: Time) -> None:
        self._schedule_at = schedule_at

    def next_dagrun_info(self, last_automated_dagrun, restriction):
        ...
        end = start + timedelta(days=1)
        return DagRunInfo(
            data_interval=DataInterval(start=start, end=end),
            run_after=DateTime.combine(end.date(), self._schedule_at).replace(tzinfo=UTC),
        )

但是,由於 timetable 是 DAG 的一部分,我們需要告訴 Airflow 如何使用我們在 __init__ 中提供的內容來序列化它。這是透過在我們的 timetable 類別上實作兩個額外的方法來完成的

class SometimeAfterWorkdayTimetable(Timetable):
    ...

    def serialize(self) -> dict[str, Any]:
        return {"schedule_at": self._schedule_at.isoformat()}

    @classmethod
    def deserialize(cls, value: dict[str, Any]) -> Timetable:
        return cls(Time.fromisoformat(value["schedule_at"]))

當 DAG 被序列化時,會呼叫 serialize 以取得 JSON 可序列化的值。當序列化的 DAG 被排程器存取以重建 timetable 時,該值會傳遞給 deserialize

Timetable 在 UI 中的顯示

預設情況下,自訂 timetable 會依其類別名稱顯示在 UI 中(例如,「DAGs」表格中的Schedule 欄)。可以透過覆寫 summary 屬性來自訂此設定。這對於參數化 timetable 尤其有用,以便包含在 __init__ 中提供的引數。例如,對於我們的 SometimeAfterWorkdayTimetable 類別,我們可以有

@property
def summary(self) -> str:
    return f"after each workday, at {self._schedule_at}"

因此,對於像這樣宣告的 DAG

with DAG(
    schedule=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
):
    ...

Schedule 欄將顯示 after each workday, at 08:00:00

另請參閱

模組 airflow.timetables.base

公開介面已大量記錄,以說明子類別應實作的內容。

Timetable 描述在 UI 中的顯示

您也可以透過覆寫 description 屬性,為您的 Timetable 實作提供描述。這對於在 UI 中為您的實作提供完整描述特別有用。例如,對於我們的 SometimeAfterWorkdayTimetable 類別,我們可以有

description = "Schedule: after each workday"

如果您想要衍生描述,您也可以將其包裝在 __init__ 內。

def __init__(self) -> None:
    self.description = "Schedule: after each workday, at f{self._schedule_at}"

當您想要提供與 summary 屬性不同的完整描述時,這特別有用。

因此,對於像這樣宣告的 DAG

with DAG(
    schedule=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
):
    ...

i 圖示將顯示,Schedule: after each workday, at 08:00:00

另請參閱

模組 airflow.timetables.interval

檢查 CronDataIntervalTimetable 描述實作,它在 UI 中提供完整的 cron 描述。

變更產生的 run_id

版本 2.4 新增功能。

自 Airflow 2.4 以來,Timetable 也負責產生 DagRuns 的 run_id

例如,為了讓 Run ID 顯示執行開始時間(即資料間隔的結束時間,而不是目前使用的開始時間)的「人性化」日期,您可以將類似這樣的方法新增至自訂 timetable

def generate_run_id(
    self,
    *,
    run_type: DagRunType,
    logical_date: DateTime,
    data_interval: DataInterval | None,
    **extra,
) -> str:
    if run_type == DagRunType.SCHEDULED and data_interval:
        return data_interval.end.format("YYYY-MM-DD dddd")
    return super().generate_run_id(
        run_type=run_type, logical_date=logical_date, data_interval=data_interval, **extra
    )

請記住,RunID 限制為 250 個字元,並且在 DAG 內必須是唯一的。

此條目是否有幫助?