使用 Timetable 自訂 DAG 排程¶
在我們的範例中,假設一家公司想要在每個工作日之後執行一個任務,以處理在工作日期間收集的資料。第一個直覺的答案可能是 schedule="0 0 * * 1-5"
(週一至週五的午夜),但這表示在週五收集的資料不會在週五結束後立即處理,而是在下週一處理,並且該執行的間隔將從週五午夜到週一午夜。此外,上述排程字串無法略過假日的處理。我們想要的是
為每週一、週二、週三、週四和週五排程執行。執行的資料間隔將涵蓋每天的午夜到隔天的午夜(例如 2021-01-01 00:00:00 至 2021-01-02 00:00:00)。
每次執行都會在資料間隔結束後立即建立。涵蓋週一的執行發生在週二午夜,依此類推。涵蓋週五的執行發生在週六午夜。週日和週一午夜不會發生執行。
不要在定義的假日排程執行。
為了簡化,我們在此範例中僅處理 UTC 日期時間。
注意
自訂 timetable 傳回的所有日期時間值必須是「感知型」,即包含時區資訊。此外,它們必須使用 pendulum
的日期時間和時區類型。
Timetable 註冊¶
timetable 必須是 Timetable
的子類別,並且註冊為 plugin 的一部分。以下是我們實作新 timetable 的骨架
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable
class AfterWorkdayTimetable(Timetable):
pass
class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]
接下來,我們將開始將程式碼放入 AfterWorkdayTimetable
。實作完成後,我們應該能夠在我們的 DAG 檔案中使用 timetable
import pendulum
from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
with DAG(
dag_id="example_after_workday_timetable_dag",
start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
schedule=AfterWorkdayTimetable(),
tags=["example", "timetable"],
):
...
定義排程邏輯¶
當 Airflow 的排程器遇到 DAG 時,它會呼叫兩種方法之一,以了解何時排程 DAG 的下一次執行。
next_dagrun_info
:排程器使用此方法來了解 timetable 的常規排程,即我們範例中的「每個工作日一次,在結束時執行」部分。infer_manual_data_interval
:當 DAG 執行是手動觸發時(例如從 Web UI),排程器會使用此方法來了解如何反向推斷排程外的執行的資料間隔。
我們將從 infer_manual_data_interval
開始,因為它是兩者中較簡單的一個
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
# Skip backwards over weekends and holidays to find last run
start = self.get_next_workday(start, incr=-1)
return DataInterval(start=start, end=(start + timedelta(days=1)))
此方法接受一個引數 run_after
,這是一個 pendulum.DateTime
物件,指示 DAG 何時從外部觸發。由於我們的 timetable 為每個完整工作日建立資料間隔,因此此處推斷的資料間隔通常應從 run_after
前一天的午夜開始,但如果 run_after
落在週日或週一(即前一天是週六或週日),則應進一步推回至前一個週五。一旦我們知道間隔的開始時間,結束時間就只是它之後的完整一天。然後,我們建立一個 DataInterval
物件來描述此間隔。
接下來是 next_dagrun_info
的實作
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
# Otherwise this is the first ever run on the regular schedule...
elif (earliest := restriction.earliest) is None:
return None # No start_date. Don't schedule.
elif not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(earliest, DateTime.combine(Date.today(), Time.min))
elif earliest.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
else:
next_start = earliest
# Skip weekends and holidays
next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
此方法接受兩個引數。last_automated_data_interval
是一個 DataInterval
實例,指示此 DAG 先前非手動觸發的執行的資料間隔,如果這是 DAG 首次排程,則為 None
。restriction
封裝 DAG 及其任務如何指定排程,並包含三個屬性
earliest
:DAG 可能排程的最早時間。這是從 DAG 及其任務的所有start_date
引數計算出的pendulum.DateTime
,如果根本沒有找到start_date
引數,則為None
。latest
:與earliest
類似,這是 DAG 可能排程的最晚時間,從end_date
引數計算得出。catchup
:一個布林值,反映 DAG 的catchup
引數。
注意
earliest
和 latest
都適用於 DAG 執行的邏輯日期(資料間隔的開始時間),而不是執行排程的時間(通常在資料間隔結束之後)。
如果先前已排程執行,我們現在應該透過迴圈瀏覽後續日期以找到不是週六、週日或美國假日的日期,來排程下一個非假日工作日。但是,如果先前沒有排程執行,我們將選擇 restriction.earliest
之後的下一個非假日工作日的午夜。restriction.catchup
也需要考慮 - 如果它是 False
,即使 start_date
值在過去,我們也無法在目前時間之前排程。最後,如果我們計算出的資料間隔晚於 restriction.latest
,我們必須尊重它,並且不排程執行,而是傳回 None
。
如果我們決定排程執行,我們需要使用 DagRunInfo
來描述它。此類型有兩個引數和屬性
data_interval
:一個DataInterval
實例,描述下一次執行的資料間隔。run_after
:一個pendulum.DateTime
實例,告知排程器何時可以排程 DAG 執行。
DagRunInfo
可以這樣建立
info = DagRunInfo(
data_interval=DataInterval(start=start, end=end),
run_after=run_after,
)
由於我們通常希望在資料間隔結束後立即排程執行,因此上面的 end
和 run_after
通常是相同的。DagRunInfo
因此為此提供了一個快捷方式
info = DagRunInfo.interval(start=start, end=end)
assert info.data_interval.end == info.run_after # Always True.
為了參考,以下是我們的 plugin 和 DAG 檔案的完整內容
from pendulum import UTC, Date, DateTime, Time
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
log = logging.getLogger(__name__)
try:
from pandas.tseries.holiday import USFederalHolidayCalendar
holiday_calendar = USFederalHolidayCalendar()
except ImportError:
log.warning("Could not import pandas. Holidays will not be considered.")
holiday_calendar = None # type: ignore[assignment]
class AfterWorkdayTimetable(Timetable):
def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
next_start = d
while True:
if next_start.weekday() not in (5, 6): # not on weekend
if holiday_calendar is None:
holidays = set()
else:
holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
if next_start not in holidays:
break
next_start = next_start.add(days=incr)
return next_start
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
# Skip backwards over weekends and holidays to find last run
start = self.get_next_workday(start, incr=-1)
return DataInterval(start=start, end=(start + timedelta(days=1)))
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
# Otherwise this is the first ever run on the regular schedule...
elif (earliest := restriction.earliest) is None:
return None # No start_date. Don't schedule.
elif not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(earliest, DateTime.combine(Date.today(), Time.min))
elif earliest.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
else:
next_start = earliest
# Skip weekends and holidays
next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]
import pendulum
from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="example_workday_timetable",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=AfterWorkdayTimetable(),
tags=["example", "timetable"],
):
EmptyOperator(task_id="run_this")
參數化 Timetable¶
有時我們需要將一些執行時期引數傳遞給 timetable。繼續我們的 AfterWorkdayTimetable
範例,也許我們有在不同時區運行的 DAG,並且我們想要在隔天早上 8 點而不是午夜排程一些 DAG。我們不需要為每個目的建立單獨的 timetable,而是想要執行類似以下的操作
class SometimeAfterWorkdayTimetable(Timetable):
def __init__(self, schedule_at: Time) -> None:
self._schedule_at = schedule_at
def next_dagrun_info(self, last_automated_dagrun, restriction):
...
end = start + timedelta(days=1)
return DagRunInfo(
data_interval=DataInterval(start=start, end=end),
run_after=DateTime.combine(end.date(), self._schedule_at).replace(tzinfo=UTC),
)
但是,由於 timetable 是 DAG 的一部分,我們需要告訴 Airflow 如何使用我們在 __init__
中提供的內容來序列化它。這是透過在我們的 timetable 類別上實作兩個額外的方法來完成的
class SometimeAfterWorkdayTimetable(Timetable):
...
def serialize(self) -> dict[str, Any]:
return {"schedule_at": self._schedule_at.isoformat()}
@classmethod
def deserialize(cls, value: dict[str, Any]) -> Timetable:
return cls(Time.fromisoformat(value["schedule_at"]))
當 DAG 被序列化時,會呼叫 serialize
以取得 JSON 可序列化的值。當序列化的 DAG 被排程器存取以重建 timetable 時,該值會傳遞給 deserialize
。
Timetable 在 UI 中的顯示¶
預設情況下,自訂 timetable 會依其類別名稱顯示在 UI 中(例如,「DAGs」表格中的Schedule 欄)。可以透過覆寫 summary
屬性來自訂此設定。這對於參數化 timetable 尤其有用,以便包含在 __init__
中提供的引數。例如,對於我們的 SometimeAfterWorkdayTimetable
類別,我們可以有
@property
def summary(self) -> str:
return f"after each workday, at {self._schedule_at}"
因此,對於像這樣宣告的 DAG
with DAG(
schedule=SometimeAfterWorkdayTimetable(Time(8)), # 8am.
...,
):
...
Schedule 欄將顯示 after each workday, at 08:00:00
。
另請參閱
- 模組
airflow.timetables.base
公開介面已大量記錄,以說明子類別應實作的內容。
Timetable 描述在 UI 中的顯示¶
您也可以透過覆寫 description
屬性,為您的 Timetable 實作提供描述。這對於在 UI 中為您的實作提供完整描述特別有用。例如,對於我們的 SometimeAfterWorkdayTimetable
類別,我們可以有
description = "Schedule: after each workday"
如果您想要衍生描述,您也可以將其包裝在 __init__
內。
def __init__(self) -> None:
self.description = "Schedule: after each workday, at f{self._schedule_at}"
當您想要提供與 summary
屬性不同的完整描述時,這特別有用。
因此,對於像這樣宣告的 DAG
with DAG(
schedule=SometimeAfterWorkdayTimetable(Time(8)), # 8am.
...,
):
...
i 圖示將顯示,Schedule: after each workday, at 08:00:00
。
另請參閱
- 模組
airflow.timetables.interval
檢查
CronDataIntervalTimetable
描述實作,它在 UI 中提供完整的 cron 描述。
變更產生的 run_id
¶
版本 2.4 新增功能。
自 Airflow 2.4 以來,Timetable 也負責產生 DagRuns 的 run_id
。
例如,為了讓 Run ID 顯示執行開始時間(即資料間隔的結束時間,而不是目前使用的開始時間)的「人性化」日期,您可以將類似這樣的方法新增至自訂 timetable
def generate_run_id(
self,
*,
run_type: DagRunType,
logical_date: DateTime,
data_interval: DataInterval | None,
**extra,
) -> str:
if run_type == DagRunType.SCHEDULED and data_interval:
return data_interval.end.format("YYYY-MM-DD dddd")
return super().generate_run_id(
run_type=run_type, logical_date=logical_date, data_interval=data_interval, **extra
)
請記住,RunID 限制為 250 個字元,並且在 DAG 內必須是唯一的。