BranchDateTimeOperator¶
使用 BranchDateTimeOperator
根據時間是否落在兩個目標引數給定的範圍內,分支到兩個執行路徑之一。
此 operator 有兩種模式。第一種模式是使用目前時間 (DAG 執行時的機器時鐘時間),第二種模式是使用 DAG 執行時的 logical_date
。
使用目前時間¶
上述用法在某些情況下可能很有用 - 例如,當 DAG 用於執行清理和維護,並且實際上不應適用於任何預期要回填的 DAG 時,因為「目前時間」使回填成為非冪等的,其結果取決於 DAG 實際執行的時間。即使在排程執行時,它也可能稍微不確定。DAGRun 從排程到執行可能需要一些時間,這可能表示即使 DAGRun 已正確排程,用於分支決策的實際時間也會與排程時間不同,並且分支決策可能會因這些延遲而有所不同。
empty_task_11 = EmptyOperator(task_id="date_in_range", dag=dag1)
empty_task_21 = EmptyOperator(task_id="date_outside_range", dag=dag1)
cond1 = BranchDateTimeOperator(
task_id="datetime_branch",
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag1,
)
# Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [empty_task_11, empty_task_21]
目標參數 target_upper
和 target_lower
可以接收 datetime.datetime
、datetime.time
或 None
。datetime.time
物件被使用時,它將與目前日期結合,以便與之進行比較。如果 target_upper
設定為在給定的 target_lower
之前發生的 datetime.time
,則會將一天新增至 target_upper
。這樣做是為了允許跨越兩個日期的時間段。
empty_task_12 = EmptyOperator(task_id="date_in_range", dag=dag2)
empty_task_22 = EmptyOperator(task_id="date_outside_range", dag=dag2)
cond2 = BranchDateTimeOperator(
task_id="datetime_branch",
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.time(0, 0, 0),
target_lower=pendulum.time(15, 0, 0),
dag=dag2,
)
# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run empty_task_12 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [empty_task_12, empty_task_22]
如果目標參數設定為 None
,則 operator 將僅使用非 None
目標執行單方面比較。將 target_upper
和 target_lower
都設定為 None
將引發例外狀況。
使用邏輯日期¶
這種用法更適合「資料範圍」。當 DAG 重新執行時,logical_date
不會變更,並且不受執行延遲的影響,因此這種方法適用於可能回填的冪等 DAG 執行。
empty_task_13 = EmptyOperator(task_id="date_in_range", dag=dag3)
empty_task_23 = EmptyOperator(task_id="date_outside_range", dag=dag3)
cond3 = BranchDateTimeOperator(
task_id="datetime_branch",
use_task_logical_date=True,
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag3,
)
# Run empty_task_13 if cond3 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond3 >> [empty_task_13, empty_task_23]