airflow.models.baseoperator
¶
所有運算子的基礎運算子。
- sphinx-autoapi-skip
模組內容¶
類別¶
ExecutorSafeguard 裝飾器。 |
|
BaseOperator 的元類別。 |
|
所有運算子的抽象基礎類別。 |
函數¶
|
|
|
|
|
|
|
|
|
|
|
給定一些任務,建立一個依賴鏈。 |
|
為 from_tasks 中的所有任務設定對 to_tasks 中所有任務的下游依賴。 |
|
簡化任務依賴定義。 |
屬性¶
- airflow.models.baseoperator.get_merged_defaults(dag, task_group, task_params, task_default_args)[原始碼]¶
- airflow.models.baseoperator.partial(operator_class, *, task_id, dag=None, task_group=None, start_date=NOTSET, end_date=NOTSET, owner=NOTSET, email=NOTSET, params=None, resources=NOTSET, trigger_rule=NOTSET, depends_on_past=NOTSET, ignore_first_depends_on_past=NOTSET, wait_for_past_depends_before_skipping=NOTSET, wait_for_downstream=NOTSET, retries=NOTSET, queue=NOTSET, pool=NOTSET, pool_slots=NOTSET, execution_timeout=NOTSET, max_retry_delay=NOTSET, retry_delay=NOTSET, retry_exponential_backoff=NOTSET, priority_weight=NOTSET, weight_rule=NOTSET, sla=NOTSET, map_index_template=NOTSET, max_active_tis_per_dag=NOTSET, max_active_tis_per_dagrun=NOTSET, on_execute_callback=NOTSET, on_failure_callback=NOTSET, on_success_callback=NOTSET, on_retry_callback=NOTSET, on_skipped_callback=NOTSET, run_as_user=NOTSET, executor=NOTSET, executor_config=NOTSET, inlets=NOTSET, outlets=NOTSET, doc=NOTSET, doc_md=NOTSET, doc_json=NOTSET, doc_yaml=NOTSET, doc_rst=NOTSET, task_display_name=NOTSET, logger_name=NOTSET, allow_nested_operators=True, **kwargs)[原始碼]¶
- class airflow.models.baseoperator.ExecutorSafeguard[原始碼]¶
ExecutorSafeguard 裝飾器。
檢查運算子的 execute 方法是否在 TaskInstance 外部手動呼叫,因為我們想要避免裝飾器運算子和傳統運算子之間的不良混合。
- class airflow.models.baseoperator.BaseOperatorMeta[原始碼]¶
基底:
abc.ABCMeta
BaseOperator 的元類別。
- class airflow.models.baseoperator.BaseOperator(task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure=conf.getboolean('email', 'default_email_on_failure', fallback=True), retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, task_concurrency=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, **kwargs)[原始碼]¶
基底:
airflow.models.abstractoperator.AbstractOperator
所有運算子的抽象基礎類別。
由於運算子建立的物件會成為 DAG 中的節點,因此 BaseOperator 包含許多用於 DAG 爬行行為的遞迴方法。若要從此類別衍生,您應該覆寫建構子和 'execute' 方法。
從此類別衍生的運算子應同步執行或觸發特定任務(等待完成)。運算子的範例可能包括執行 Pig 工作的運算子 (PigOperator)、等待分割區落在 Hive 中的感測器運算子 (HiveSensorOperator),或將資料從 Hive 移至 MySQL 的運算子 (Hive2MySqlOperator)。這些運算子(任務)的實例針對特定操作、執行特定腳本、函數或資料傳輸。
此類別是抽象類別,不應被實例化。實例化由此類別衍生的類別會建立任務物件,最終成為 DAG 物件中的節點。任務依賴應使用 set_upstream 和/或 set_downstream 方法設定。
- 參數
task_id (str) – 任務的唯一且有意義的 ID
owner (str) – 任務的擁有者。建議使用有意義的描述(例如使用者/人員/團隊/角色名稱)來釐清所有權。
email (str | Iterable[str] | None) – 電子郵件警報中使用的「收件人」電子郵件地址。這可以是單一電子郵件或多個電子郵件。多個地址可以使用逗號或分號分隔的字串指定,或傳遞字串列表。
email_on_retry (bool) – 指示在重試任務時是否應傳送電子郵件警報
email_on_failure (bool) – 指示在任務失敗時是否應傳送電子郵件警報
retries (int | None) – 在任務失敗之前應執行的重試次數
retry_delay (datetime.timedelta | float) – 重試之間的延遲,可以設定為
timedelta
或float
秒,這將轉換為timedelta
,預設值為timedelta(seconds=300)
。retry_exponential_backoff (bool) – 允許在重試之間使用指數退避演算法逐步延長等待時間(延遲將轉換為秒)
max_retry_delay (datetime.timedelta | float | None) – 重試之間的最大延遲間隔,可以設定為
timedelta
或float
秒,這將轉換為timedelta
。start_date (datetime.datetime | None) – 任務的
start_date
,決定第一個任務實例的execution_date
。最佳實務是將 start_date 四捨五入到 DAG 的schedule_interval
。每日工作會在某天的 00:00:00 設定其 start_date,每小時工作會在特定小時的 00:00 設定其 start_date。請注意,Airflow 僅查看最新的execution_date
並新增schedule_interval
以判斷下一個execution_date
。同樣非常重要的是要注意,不同任務的依賴關係需要在時間上對齊。如果任務 A 依賴於任務 B,並且它們的 start_date 以其 execution_date 未對齊的方式偏移,則永遠無法滿足 A 的依賴關係。如果您希望延遲任務,例如在凌晨 2 點執行每日任務,請查看TimeSensor
和TimeDeltaSensor
。我們建議不要使用動態start_date
,並建議使用固定的 start_date。請閱讀關於 start_date 的常見問題解答條目以取得更多資訊。end_date (datetime.datetime | None) – 如果指定,排程器將不會超出此日期
depends_on_past (bool) – 設定為 true 時,任務實例將依序執行,且僅在前一個實例成功或已跳過的情況下執行。允許執行 start_date 的任務實例。
wait_for_past_depends_before_skipping (bool) – 若設定為 true,當任務實例應標記為跳過,且 depends_on_past 為 true 時,此任務實例將保持在 None 狀態,等待先前執行之任務完成
wait_for_downstream (bool) – 若設定為 true,任務 X 的實例在執行前,會等待先前任務 X 實例的直接下游任務成功完成或被跳過。當任務 X 的不同實例會修改相同的資源,且此資源被任務 X 的下游任務使用時,此設定非常有用。請注意,只要使用 wait_for_downstream,depends_on_past 就會強制設為 True。另請注意,僅會等待先前任務實例的「直接」下游任務;更下游的任何任務狀態都會被忽略。
dag (airflow.models.dag.DAG | None) – 此任務所附加的 DAG 參考 (若有)
priority_weight (int) – 此任務相對於其他任務的優先權重。這讓執行器可以在任務堆積時,優先觸發權重較高的任務。為更重要的任務設定較高的 priority_weight 值。由於並非所有資料庫引擎都支援 64 位元整數,因此值會限制為 32 位元。有效範圍為 -2,147,483,648 到 2,147,483,647。
weight_rule (str | airflow.task.priority_strategy.PriorityWeightStrategy) – 用於任務有效總優先權重的權重方法。選項包括:
{ downstream | upstream | absolute }
預設值為downstream
。當設定為downstream
時,任務的有效權重是所有下游後代的總和。因此,當使用正權重值時,上游任務將具有更高的權重,並將更積極地排程。當您有多個 DAG 執行個體,並希望在每個 DAG 可以繼續處理下游任務之前,先完成所有執行的所有上游任務時,這非常有用。當設定為upstream
時,有效權重是所有上游祖先的總和。這與下游任務具有較高權重的情況相反,當使用正權重值時,下游任務將更積極地排程。當您有多個 DAG 執行個體,並且希望在開始其他 DAG 的上游任務之前先完成每個 DAG 時,這非常有用。當設定為absolute
時,有效權重是確切指定的priority_weight
,沒有額外的權重。當您確切知道每個任務應具有的優先權重時,您可能會想要這樣做。此外,當設定為absolute
時,對於非常大的 DAG,還具有顯著加快任務建立過程的額外效果。選項可以設定為字串,或使用靜態類別airflow.utils.WeightRule
中定義的常數。無論權重規則為何,最終的優先權值都會限制為 32 位元。這是一個 實驗性功能。自 2.9.0 版本起,Airflow 允許透過建立airflow.task.priority_strategy.PriorityWeightStrategy
的子類別並在外掛程式中註冊,然後透過weight_rule
參數提供類別路徑或類別實例,來定義自訂優先權重策略。自訂優先權重策略將用於計算任務實例的有效總優先權重。queue (str) – 執行此工作時要定向的佇列。並非所有執行器都實作佇列管理,但 CeleryExecutor 支援定向特定佇列。
pool (str | None) – 此任務應在其中執行的插槽池,插槽池是一種限制特定任務並行性的方法
pool_slots (int) – 此任務應使用的插槽池數量 (>= 1)。不允許小於 1 的值。
sla (datetime.timedelta | None) – 預期工作成功的時間。請注意,這表示期間結束後的
timedelta
。例如,如果您設定 SLA 為 1 小時,如果2016-01-01
執行個體尚未成功,排程器將在2016-01-02
凌晨 1:00 後不久發送電子郵件。排程器會特別注意具有 SLA 的工作,並針對 SLA 未達成的情況發送警示電子郵件。SLA 未達成的情況也會記錄在資料庫中以供日後參考。所有共用相同 SLA 時間的任務都會捆綁在單一電子郵件中,並在該時間後不久發送。每個任務實例只會發送一次 SLA 通知。execution_timeout (datetime.timedelta | None) – 此任務實例執行允許的最長時間,如果超過此時間,將會引發錯誤並失敗。
on_failure_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 當此任務的任務實例失敗時要呼叫的函式或函式清單。上下文字典會作為單一參數傳遞給此函式。上下文包含對任務實例相關物件的參考,並記錄在 API 的巨集章節下。
on_execute_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 與
on_failure_callback
非常相似,不同之處在於它會在任務執行之前立即執行。on_retry_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 與
on_failure_callback
非常相似,不同之處在於它會在重試發生時執行。on_success_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 與
on_failure_callback
非常相似,不同之處在於它會在任務成功時執行。on_skipped_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 與
on_failure_callback
非常相似,不同之處在於它會在跳過發生時執行;只有在引發 AirflowSkipException 時才會呼叫此回呼。明確地說,如果任務因為 DAG 中先前的分支決策或觸發規則而未開始執行,導致執行跳過,以至於任務執行永遠不會排程,則不會呼叫此回呼。pre_execute (TaskPreExecuteHook | None) –
在任務執行之前立即呼叫的函式,接收上下文字典;引發例外狀況將阻止任務執行。
這是一個 實驗性功能。
post_execute (TaskPostExecuteHook | None) –
在任務執行之後立即呼叫的函式,接收上下文字典和任務結果;引發例外狀況將阻止任務成功。
這是一個 實驗性功能。
trigger_rule (str) – 定義任務觸發時套用相依性的規則。選項包括:
{ all_success | all_failed | all_done | all_skipped | one_success | one_done | one_failed | none_failed | none_failed_min_one_success | none_skipped | always}
預設值為all_success
。選項可以設定為字串,或使用靜態類別airflow.utils.TriggerRule
中定義的常數resources (dict[str, Any] | None) – 資源參數名稱 (Resources 建構函式的引數名稱) 與其值的對應。
run_as_user (str | None) – 在執行任務時要模擬的 Unix 使用者名稱
max_active_tis_per_dag (int | None) – 設定後,任務將能夠限制跨 execution_dates 的並行執行次數。
max_active_tis_per_dagrun (int | None) – 設定後,任務將能夠限制每個 DAG 執行個體的並行任務實例數。
executor (str | None) – 執行此任務時要定向的執行器。尚未支援
executor_config (dict | None) –
特定執行器解譯的其他任務層級組態參數。參數以執行器的名稱作為命名空間。
範例:透過 KubernetesExecutor 在特定 Docker 容器中執行此任務
MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})
do_xcom_push (bool) – 若為 True,則推送包含 Operator 結果的 XCom
multiple_outputs (bool) – 若為 True 且 do_xcom_push 為 True,則推送多個 XCom,每個傳回字典結果中的鍵一個 XCom。若為 False 且 do_xcom_push 為 True,則推送單個 XCom。
task_group (airflow.utils.task_group.TaskGroup | None) – 任務應歸屬的 TaskGroup。這通常在未使用 TaskGroup 作為上下文管理器時提供。
doc (str | None) – 將文件或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見
doc_md (str | None) – 將文件 (Markdown 格式) 或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見
doc_rst (str | None) – 將文件 (RST 格式) 或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見
doc_json (str | None) – 將文件 (JSON 格式) 或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見
doc_yaml (str | None) – 將文件 (YAML 格式) 或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見
task_display_name (str | None) – 任務的顯示名稱,會顯示在 UI 上。
logger_name (str | None) – Operator 用於發出日誌的記錄器名稱。若設定為 None (預設值),記錄器名稱將回退到 airflow.task.operators.{class.__module__}.{class.__name__} (例如,SimpleHttpOperator 將具有 airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator 作為記錄器)。
allow_nested_operators (bool) –
若為 True,當在另一個 Operator 中執行 Operator 時,將會記錄警告訊息。若為 False,則當 Operator 使用不當 (例如,巢狀於另一個 Operator 中) 時,將會引發例外狀況。在 Airflow 的未來版本中,此參數將會移除,且當 Operator 巢狀於彼此內部時,將始終擲回例外狀況 (預設值為 True)。
範例:Operator 混合使用不當的範例
@task(provide_context=True) def say_hello_world(**context): hello_world_task = BashOperator( task_id="hello_world_task", bash_command="python -c \"print('Hello, world!')\"", dag=dag, ) hello_world_task.execute(context)
- property dag: airflow.models.dag.DAG[source]¶
傳回 Operator 的 DAG (若已設定),否則引發錯誤。
- property operator_class: type[BaseOperator][source]¶
- property roots: list[BaseOperator][source]¶
DAGNode 需要。
- property leaves: list[BaseOperator][source]¶
DAGNode 需要。
- operator_extra_links: Collection[airflow.models.baseoperatorlink.BaseOperatorLink] = ()[source]¶
- subdag: airflow.models.dag.DAG | None[source]¶
- start_trigger_args: airflow.triggers.base.StartTriggerArgs | None[source]¶
- deps: frozenset[airflow.ti_deps.deps.base_ti_dep.BaseTIDep][source]¶
傳回運算子的依賴項集合。這些依賴項與執行環境依賴項不同,因為它們特定於任務,並且可以被子類別擴展/覆蓋。
- set_xcomargs_dependencies()[source]¶
解析任務的上游依賴項。
透過這種方式,將
XComArg
作為範本欄位的值傳遞,將導致在兩個任務之間建立上游關係。範例:
with DAG(...): generate_content = GenerateContentOperator(task_id="generate_content") send_email = EmailOperator(..., html_content=generate_content.output) # This is equivalent to with DAG(...): generate_content = GenerateContentOperator(task_id="generate_content") send_email = EmailOperator(..., html_content="{{ task_instance.xcom_pull('generate_content') }}") generate_content >> send_email
- abstract execute(context)[source]¶
在建立運算子時衍生。
Context 是在渲染 jinja 範本時使用的相同字典。
有關更多上下文,請參閱 get_template_context。
- on_kill()[source]¶
覆寫此方法以在任務實例被終止時清理子進程。
在運算子中使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下幽靈進程。
- render_template_fields(context, jinja_env=None)[source]¶
範本化 self.template_fields 中列出的所有屬性。
這會就地變更屬性,且不可逆。
- 參數
context (airflow.utils.context.Context) – 具有要應用於內容的值的 Context 字典。
jinja_env (jinja2.Environment | None) – 用於渲染的 Jinja 環境。
- clear(start_date=None, end_date=None, upstream=False, downstream=False, session=NEW_SESSION)[source]¶
清除與任務相關聯的任務實例的狀態,並遵循指定的參數。
- get_task_instances(start_date=None, end_date=None, session=NEW_SESSION)[source]¶
取得與此任務相關的任務實例,適用於特定的日期範圍。
- run(start_date=None, end_date=None, ignore_first_depends_on_past=True, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, mark_success=False, test_mode=False, session=NEW_SESSION)[source]¶
針對日期範圍執行一組任務實例。
- static xcom_push(context, key, value, execution_date=None)[source]¶
使 XCom 可供任務提取。
- 參數
context (Any) – 執行環境字典
key (str) – XCom 的鍵
value (Any) – XCom 的值。該值會被 pickle 序列化並儲存在資料庫中。
execution_date (datetime.datetime | None) – 如果提供,XCom 將在此日期之前不可見。例如,這可用於在未來的日期向任務發送訊息,而不會立即顯示。
- static xcom_pull(context, task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=None, session=NEW_SESSION)[source]¶
提取可選地符合特定條件的 XCom。
key 的預設值將搜尋限制為其他任務傳回的 XCom(而不是手動推送的 XCom)。若要移除此篩選器,請傳遞 key=None(或任何想要的值)。
如果提供單個 task_id 字串,則結果是來自該 task_id 的最新匹配 XCom 的值。如果提供多個 task_ids,則會傳回匹配值的元組。如果找不到任何匹配項,則傳回 None。
- 參數
context (Any) – 執行環境字典
key (str) – XCom 的鍵。如果提供,則僅傳回具有匹配鍵的 XCom。預設鍵為 'return_value',也可用作常數 XCOM_RETURN_KEY。此鍵會自動賦予任務傳回的 XCom(而不是手動推送的 XCom)。若要移除篩選器,請傳遞 key=None。
task_ids (str | list[str] | None) – 僅提取來自具有匹配 ID 的任務的 XCom。可以傳遞 None 以移除篩選器。
dag_id (str | None) – 如果提供,則僅從此 DAG 提取 XCom。如果為 None(預設值),則使用呼叫任務的 DAG。
include_prior_dates (bool | None) – 如果為 False,則僅傳回來自目前 execution_date 的 XCom。如果為 True,則也會傳回先前日期的 XCom。
- airflow.models.baseoperator.chain(*tasks)[source]¶
給定一些任務,建立一個依賴鏈。
此函式接受 BaseOperator(又名 tasks)、EdgeModifiers(又名 Labels)、XComArg、TaskGroups 或包含這些類型任何組合的清單(或同一清單中的組合)的值。如果您想在兩個清單之間串聯,則必須確保它們具有相同的長度。
使用傳統運算子/感測器
chain(t1, [t2, t3], [t4, t5], t6)
相當於
/ -> t2 -> t4 \ t1 -> t6 \ -> t3 -> t5 /
t1.set_downstream(t2) t1.set_downstream(t3) t2.set_downstream(t4) t3.set_downstream(t5) t4.set_downstream(t6) t5.set_downstream(t6)
使用任務裝飾函式又名 XComArgs
chain(x1(), [x2(), x3()], [x4(), x5()], x6())
相當於
/ -> x2 -> x4 \ x1 -> x6 \ -> x3 -> x5 /
x1 = x1() x2 = x2() x3 = x3() x4 = x4() x5 = x5() x6 = x6() x1.set_downstream(x2) x1.set_downstream(x3) x2.set_downstream(x4) x3.set_downstream(x5) x4.set_downstream(x6) x5.set_downstream(x6)
使用 TaskGroups
chain(t1, task_group1, task_group2, t2) t1.set_downstream(task_group1) task_group1.set_downstream(task_group2) task_group2.set_downstream(t2)
也可以混合使用傳統運算子/感測器、EdgeModifiers、XComArg 和 TaskGroups
chain(t1, [Label("branch one"), Label("branch two")], [x1(), x2()], task_group1, x3())
相當於
/ "branch one" -> x1 \ t1 -> task_group1 -> x3 \ "branch two" -> x2 /
x1 = x1() x2 = x2() x3 = x3() label1 = Label("branch one") label2 = Label("branch two") t1.set_downstream(label1) label1.set_downstream(x1) t2.set_downstream(label2) label2.set_downstream(x2) x1.set_downstream(task_group1) x2.set_downstream(task_group1) task_group1.set_downstream(x3) # or x1 = x1() x2 = x2() x3 = x3() t1.set_downstream(x1, edge_modifier=Label("branch one")) t1.set_downstream(x2, edge_modifier=Label("branch two")) x1.set_downstream(task_group1) x2.set_downstream(task_group1) task_group1.set_downstream(x3)
- 參數
tasks (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 個別和/或任務、EdgeModifiers、XComArgs 或 TaskGroups 的清單,以設定依賴項
- airflow.models.baseoperator.cross_downstream(from_tasks, to_tasks)[source]¶
為 from_tasks 中的所有任務設定對 to_tasks 中所有任務的下游依賴。
使用傳統運算子/感測器
cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6])
相當於
t1 ---> t4 \ / t2 -X -> t5 / \ t3 ---> t6
t1.set_downstream(t4) t1.set_downstream(t5) t1.set_downstream(t6) t2.set_downstream(t4) t2.set_downstream(t5) t2.set_downstream(t6) t3.set_downstream(t4) t3.set_downstream(t5) t3.set_downstream(t6)
使用任務裝飾函式又名 XComArgs
cross_downstream(from_tasks=[x1(), x2(), x3()], to_tasks=[x4(), x5(), x6()])
相當於
x1 ---> x4 \ / x2 -X -> x5 / \ x3 ---> x6
x1 = x1() x2 = x2() x3 = x3() x4 = x4() x5 = x5() x6 = x6() x1.set_downstream(x4) x1.set_downstream(x5) x1.set_downstream(x6) x2.set_downstream(x4) x2.set_downstream(x5) x2.set_downstream(x6) x3.set_downstream(x4) x3.set_downstream(x5) x3.set_downstream(x6)
也可以混合使用傳統運算子/感測器和 XComArg 任務
cross_downstream(from_tasks=[t1, x2(), t3], to_tasks=[x1(), t2, x3()])
相當於
t1 ---> x1 \ / x2 -X -> t2 / \ t3 ---> x3
x1 = x1() x2 = x2() x3 = x3() t1.set_downstream(x1) t1.set_downstream(t2) t1.set_downstream(x3) x2.set_downstream(x1) x2.set_downstream(t2) x2.set_downstream(x3) t3.set_downstream(x1) t3.set_downstream(t2) t3.set_downstream(x3)
- 參數
from_tasks (Sequence[airflow.models.taskmixin.DependencyMixin]) – 要從其開始的任務或 XComArgs 清單。
to_tasks (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 要設定為下游依賴項的任務或 XComArgs 清單。
- airflow.models.baseoperator.chain_linear(*elements)[source]¶
簡化任務依賴定義。
例如:假設您想要像這樣的優先順序
╭─op2─╮ ╭─op4─╮ op1─┤ ├─├─op5─┤─op7 ╰-op3─╯ ╰-op6─╯
然後您可以像這樣完成
chain_linear(op1, [op2, op3], [op4, op5, op6], op7)
- 參數
elements (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 運算子清單 / 運算子清單