airflow.models.baseoperator

所有運算子的基礎運算子。

sphinx-autoapi-skip

模組內容

類別

ExecutorSafeguard

ExecutorSafeguard 裝飾器。

BaseOperatorMeta

BaseOperator 的元類別。

BaseOperator

所有運算子的抽象基礎類別。

函數

parse_retries(retries)

coerce_timedelta(value, *, key)

coerce_resources(resources)

get_merged_defaults(dag, task_group, task_params, ...)

partial(operator_class, *, task_id[, dag, task_group, ...])

chain(*tasks)

給定一些任務,建立一個依賴鏈。

cross_downstream(from_tasks, to_tasks)

為 from_tasks 中的所有任務設定對 to_tasks 中所有任務的下游依賴。

chain_linear(*elements)

簡化任務依賴定義。

屬性

ScheduleInterval

TaskPreExecuteHook

TaskPostExecuteHook

T

logger

BASEOPERATOR_ARGS_EXPECTED_TYPES

Chainable

airflow.models.baseoperator.ScheduleInterval[原始碼]
airflow.models.baseoperator.TaskPreExecuteHook[原始碼]
airflow.models.baseoperator.TaskPostExecuteHook[原始碼]
airflow.models.baseoperator.T[原始碼]
airflow.models.baseoperator.logger[原始碼]
airflow.models.baseoperator.parse_retries(retries)[原始碼]
airflow.models.baseoperator.coerce_timedelta(value, *, key)[原始碼]
airflow.models.baseoperator.coerce_resources(resources)[原始碼]
airflow.models.baseoperator.get_merged_defaults(dag, task_group, task_params, task_default_args)[原始碼]
airflow.models.baseoperator.partial(operator_class, *, task_id, dag=None, task_group=None, start_date=NOTSET, end_date=NOTSET, owner=NOTSET, email=NOTSET, params=None, resources=NOTSET, trigger_rule=NOTSET, depends_on_past=NOTSET, ignore_first_depends_on_past=NOTSET, wait_for_past_depends_before_skipping=NOTSET, wait_for_downstream=NOTSET, retries=NOTSET, queue=NOTSET, pool=NOTSET, pool_slots=NOTSET, execution_timeout=NOTSET, max_retry_delay=NOTSET, retry_delay=NOTSET, retry_exponential_backoff=NOTSET, priority_weight=NOTSET, weight_rule=NOTSET, sla=NOTSET, map_index_template=NOTSET, max_active_tis_per_dag=NOTSET, max_active_tis_per_dagrun=NOTSET, on_execute_callback=NOTSET, on_failure_callback=NOTSET, on_success_callback=NOTSET, on_retry_callback=NOTSET, on_skipped_callback=NOTSET, run_as_user=NOTSET, executor=NOTSET, executor_config=NOTSET, inlets=NOTSET, outlets=NOTSET, doc=NOTSET, doc_md=NOTSET, doc_json=NOTSET, doc_yaml=NOTSET, doc_rst=NOTSET, task_display_name=NOTSET, logger_name=NOTSET, allow_nested_operators=True, **kwargs)[原始碼]
class airflow.models.baseoperator.ExecutorSafeguard[原始碼]

ExecutorSafeguard 裝飾器。

檢查運算子的 execute 方法是否在 TaskInstance 外部手動呼叫,因為我們想要避免裝飾器運算子和傳統運算子之間的不良混合。

test_mode[原始碼]
classmethod decorator(func)[原始碼]
class airflow.models.baseoperator.BaseOperatorMeta[原始碼]

基底: abc.ABCMeta

BaseOperator 的元類別。

airflow.models.baseoperator.BASEOPERATOR_ARGS_EXPECTED_TYPES[原始碼]
class airflow.models.baseoperator.BaseOperator(task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure=conf.getboolean('email', 'default_email_on_failure', fallback=True), retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, task_concurrency=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, **kwargs)[原始碼]

基底: airflow.models.abstractoperator.AbstractOperator

所有運算子的抽象基礎類別。

由於運算子建立的物件會成為 DAG 中的節點,因此 BaseOperator 包含許多用於 DAG 爬行行為的遞迴方法。若要從此類別衍生,您應該覆寫建構子和 'execute' 方法。

從此類別衍生的運算子應同步執行或觸發特定任務(等待完成)。運算子的範例可能包括執行 Pig 工作的運算子 (PigOperator)、等待分割區落在 Hive 中的感測器運算子 (HiveSensorOperator),或將資料從 Hive 移至 MySQL 的運算子 (Hive2MySqlOperator)。這些運算子(任務)的實例針對特定操作、執行特定腳本、函數或資料傳輸。

此類別是抽象類別,不應被實例化。實例化由此類別衍生的類別會建立任務物件,最終成為 DAG 物件中的節點。任務依賴應使用 set_upstream 和/或 set_downstream 方法設定。

參數
  • task_id (str) – 任務的唯一且有意義的 ID

  • owner (str) – 任務的擁有者。建議使用有意義的描述(例如使用者/人員/團隊/角色名稱)來釐清所有權。

  • email (str | Iterable[str] | None) – 電子郵件警報中使用的「收件人」電子郵件地址。這可以是單一電子郵件或多個電子郵件。多個地址可以使用逗號或分號分隔的字串指定,或傳遞字串列表。

  • email_on_retry (bool) – 指示在重試任務時是否應傳送電子郵件警報

  • email_on_failure (bool) – 指示在任務失敗時是否應傳送電子郵件警報

  • retries (int | None) – 在任務失敗之前應執行的重試次數

  • retry_delay (datetime.timedelta | float) – 重試之間的延遲,可以設定為 timedeltafloat 秒,這將轉換為 timedelta,預設值為 timedelta(seconds=300)

  • retry_exponential_backoff (bool) – 允許在重試之間使用指數退避演算法逐步延長等待時間(延遲將轉換為秒)

  • max_retry_delay (datetime.timedelta | float | None) – 重試之間的最大延遲間隔,可以設定為 timedeltafloat 秒,這將轉換為 timedelta

  • start_date (datetime.datetime | None) – 任務的 start_date,決定第一個任務實例的 execution_date。最佳實務是將 start_date 四捨五入到 DAG 的 schedule_interval。每日工作會在某天的 00:00:00 設定其 start_date,每小時工作會在特定小時的 00:00 設定其 start_date。請注意,Airflow 僅查看最新的 execution_date 並新增 schedule_interval 以判斷下一個 execution_date。同樣非常重要的是要注意,不同任務的依賴關係需要在時間上對齊。如果任務 A 依賴於任務 B,並且它們的 start_date 以其 execution_date 未對齊的方式偏移,則永遠無法滿足 A 的依賴關係。如果您希望延遲任務,例如在凌晨 2 點執行每日任務,請查看 TimeSensorTimeDeltaSensor。我們建議不要使用動態 start_date,並建議使用固定的 start_date。請閱讀關於 start_date 的常見問題解答條目以取得更多資訊。

  • end_date (datetime.datetime | None) – 如果指定,排程器將不會超出此日期

  • depends_on_past (bool) – 設定為 true 時,任務實例將依序執行,且僅在前一個實例成功或已跳過的情況下執行。允許執行 start_date 的任務實例。

  • wait_for_past_depends_before_skipping (bool) – 若設定為 true,當任務實例應標記為跳過,且 depends_on_past 為 true 時,此任務實例將保持在 None 狀態,等待先前執行之任務完成

  • wait_for_downstream (bool) – 若設定為 true,任務 X 的實例在執行前,會等待先前任務 X 實例的直接下游任務成功完成或被跳過。當任務 X 的不同實例會修改相同的資源,且此資源被任務 X 的下游任務使用時,此設定非常有用。請注意,只要使用 wait_for_downstream,depends_on_past 就會強制設為 True。另請注意,僅會等待先前任務實例的「直接」下游任務;更下游的任何任務狀態都會被忽略。

  • dag (airflow.models.dag.DAG | None) – 此任務所附加的 DAG 參考 (若有)

  • priority_weight (int) – 此任務相對於其他任務的優先權重。這讓執行器可以在任務堆積時,優先觸發權重較高的任務。為更重要的任務設定較高的 priority_weight 值。由於並非所有資料庫引擎都支援 64 位元整數,因此值會限制為 32 位元。有效範圍為 -2,147,483,648 到 2,147,483,647。

  • weight_rule (str | airflow.task.priority_strategy.PriorityWeightStrategy) – 用於任務有效總優先權重的權重方法。選項包括:{ downstream | upstream | absolute } 預設值為 downstream。當設定為 downstream 時,任務的有效權重是所有下游後代的總和。因此,當使用正權重值時,上游任務將具有更高的權重,並將更積極地排程。當您有多個 DAG 執行個體,並希望在每個 DAG 可以繼續處理下游任務之前,先完成所有執行的所有上游任務時,這非常有用。當設定為 upstream 時,有效權重是所有上游祖先的總和。這與下游任務具有較高權重的情況相反,當使用正權重值時,下游任務將更積極地排程。當您有多個 DAG 執行個體,並且希望在開始其他 DAG 的上游任務之前先完成每個 DAG 時,這非常有用。當設定為 absolute 時,有效權重是確切指定的 priority_weight,沒有額外的權重。當您確切知道每個任務應具有的優先權重時,您可能會想要這樣做。此外,當設定為 absolute 時,對於非常大的 DAG,還具有顯著加快任務建立過程的額外效果。選項可以設定為字串,或使用靜態類別 airflow.utils.WeightRule 中定義的常數。無論權重規則為何,最終的優先權值都會限制為 32 位元。這是一個 實驗性功能。自 2.9.0 版本起,Airflow 允許透過建立 airflow.task.priority_strategy.PriorityWeightStrategy 的子類別並在外掛程式中註冊,然後透過 weight_rule 參數提供類別路徑或類別實例,來定義自訂優先權重策略。自訂優先權重策略將用於計算任務實例的有效總優先權重。

  • queue (str) – 執行此工作時要定向的佇列。並非所有執行器都實作佇列管理,但 CeleryExecutor 支援定向特定佇列。

  • pool (str | None) – 此任務應在其中執行的插槽池,插槽池是一種限制特定任務並行性的方法

  • pool_slots (int) – 此任務應使用的插槽池數量 (>= 1)。不允許小於 1 的值。

  • sla (datetime.timedelta | None) – 預期工作成功的時間。請注意,這表示期間結束後的 timedelta。例如,如果您設定 SLA 為 1 小時,如果 2016-01-01 執行個體尚未成功,排程器將在 2016-01-02 凌晨 1:00 後不久發送電子郵件。排程器會特別注意具有 SLA 的工作,並針對 SLA 未達成的情況發送警示電子郵件。SLA 未達成的情況也會記錄在資料庫中以供日後參考。所有共用相同 SLA 時間的任務都會捆綁在單一電子郵件中,並在該時間後不久發送。每個任務實例只會發送一次 SLA 通知。

  • execution_timeout (datetime.timedelta | None) – 此任務實例執行允許的最長時間,如果超過此時間,將會引發錯誤並失敗。

  • on_failure_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 當此任務的任務實例失敗時要呼叫的函式或函式清單。上下文字典會作為單一參數傳遞給此函式。上下文包含對任務實例相關物件的參考,並記錄在 API 的巨集章節下。

  • on_execute_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 與 on_failure_callback 非常相似,不同之處在於它會在任務執行之前立即執行。

  • on_retry_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 與 on_failure_callback 非常相似,不同之處在於它會在重試發生時執行。

  • on_success_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 與 on_failure_callback 非常相似,不同之處在於它會在任務成功時執行。

  • on_skipped_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 與 on_failure_callback 非常相似,不同之處在於它會在跳過發生時執行;只有在引發 AirflowSkipException 時才會呼叫此回呼。明確地說,如果任務因為 DAG 中先前的分支決策或觸發規則而未開始執行,導致執行跳過,以至於任務執行永遠不會排程,則不會呼叫此回呼。

  • pre_execute (TaskPreExecuteHook | None) –

    在任務執行之前立即呼叫的函式,接收上下文字典;引發例外狀況將阻止任務執行。

    這是一個 實驗性功能

  • post_execute (TaskPostExecuteHook | None) –

    在任務執行之後立即呼叫的函式,接收上下文字典和任務結果;引發例外狀況將阻止任務成功。

    這是一個 實驗性功能

  • trigger_rule (str) – 定義任務觸發時套用相依性的規則。選項包括:{ all_success | all_failed | all_done | all_skipped | one_success | one_done | one_failed | none_failed | none_failed_min_one_success | none_skipped | always} 預設值為 all_success。選項可以設定為字串,或使用靜態類別 airflow.utils.TriggerRule 中定義的常數

  • resources (dict[str, Any] | None) – 資源參數名稱 (Resources 建構函式的引數名稱) 與其值的對應。

  • run_as_user (str | None) – 在執行任務時要模擬的 Unix 使用者名稱

  • max_active_tis_per_dag (int | None) – 設定後,任務將能夠限制跨 execution_dates 的並行執行次數。

  • max_active_tis_per_dagrun (int | None) – 設定後,任務將能夠限制每個 DAG 執行個體的並行任務實例數。

  • executor (str | None) – 執行此任務時要定向的執行器。尚未支援

  • executor_config (dict | None) –

    特定執行器解譯的其他任務層級組態參數。參數以執行器的名稱作為命名空間。

    範例:透過 KubernetesExecutor 在特定 Docker 容器中執行此任務

    MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})
    

  • do_xcom_push (bool) – 若為 True,則推送包含 Operator 結果的 XCom

  • multiple_outputs (bool) – 若為 True 且 do_xcom_push 為 True,則推送多個 XCom,每個傳回字典結果中的鍵一個 XCom。若為 False 且 do_xcom_push 為 True,則推送單個 XCom。

  • task_group (airflow.utils.task_group.TaskGroup | None) – 任務應歸屬的 TaskGroup。這通常在未使用 TaskGroup 作為上下文管理器時提供。

  • doc (str | None) – 將文件或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見

  • doc_md (str | None) – 將文件 (Markdown 格式) 或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見

  • doc_rst (str | None) – 將文件 (RST 格式) 或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見

  • doc_json (str | None) – 將文件 (JSON 格式) 或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見

  • doc_yaml (str | None) – 將文件 (YAML 格式) 或註解新增至您的 Task 物件,這些物件在 Webserver 的「任務實例詳細資訊」檢視中可見

  • task_display_name (str | None) – 任務的顯示名稱,會顯示在 UI 上。

  • logger_name (str | None) – Operator 用於發出日誌的記錄器名稱。若設定為 None (預設值),記錄器名稱將回退到 airflow.task.operators.{class.__module__}.{class.__name__} (例如,SimpleHttpOperator 將具有 airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator 作為記錄器)。

  • allow_nested_operators (bool) –

    若為 True,當在另一個 Operator 中執行 Operator 時,將會記錄警告訊息。若為 False,則當 Operator 使用不當 (例如,巢狀於另一個 Operator 中) 時,將會引發例外狀況。在 Airflow 的未來版本中,此參數將會移除,且當 Operator 巢狀於彼此內部時,將始終擲回例外狀況 (預設值為 True)。

    範例:Operator 混合使用不當的範例

    @task(provide_context=True)
    def say_hello_world(**context):
        hello_world_task = BashOperator(
            task_id="hello_world_task",
            bash_command="python -c \"print('Hello, world!')\"",
            dag=dag,
        )
        hello_world_task.execute(context)
    

property dag: airflow.models.dag.DAG[source]

傳回 Operator 的 DAG (若已設定),否則引發錯誤。

property task_display_name: str[source]
property operator_class: type[BaseOperator][source]
property task_type: str[source]

@property: 任務的類型。

property operator_name: str[source]

@property: 若有設定,請為 Operator 使用更友善的顯示名稱。

property roots: list[BaseOperator][source]

DAGNode 需要。

property leaves: list[BaseOperator][source]

DAGNode 需要。

property output: airflow.models.xcom_arg.XComArg[source]

傳回目前 Operator 推送的 XCom 參考。

property inherits_from_empty_operator[source]

用於判斷 Operator 是否繼承自 EmptyOperator。

template_fields: Sequence[str] = ()[source]
template_ext: Sequence[str] = ()[source]
template_fields_renderers: dict[str, str][source]
ui_color: str = '#fff'[source]
ui_fgcolor: str = '#000'[source]
pool: str = ''[source]
shallow_copy_attrs: Sequence[str] = ()[source]
partial: Callable[Ellipsis, airflow.models.mappedoperator.OperatorPartial][source]
supports_lineage = False[source]
task_group: airflow.utils.task_group.TaskGroup | None[source]
subdag: airflow.models.dag.DAG | None[source]
start_date: pendulum.DateTime | None[source]
end_date: pendulum.DateTime | None[source]
start_trigger_args: airflow.triggers.base.StartTriggerArgs | None[source]
start_from_trigger: bool = False[source]
deps: frozenset[airflow.ti_deps.deps.base_ti_dep.BaseTIDep][source]

傳回運算子的依賴項集合。這些依賴項與執行環境依賴項不同,因為它們特定於任務,並且可以被子類別擴展/覆蓋。

__eq__(other)[source]

傳回 self==value。

__ne__(other)[source]

傳回 self!=value。

__hash__()[source]

傳回 hash(self)。

__or__(other)[source]

傳回 [此運算子] | [運算子]。

other 的輸入將被設定為接收來自此運算子的輸出。Other 將被設定為此運算子的下游任務。

__gt__(other)[source]

傳回 [運算子] > [輸出口]。

如果 other 是一個帶有 attr 註解的物件,它將被設定為此運算子的輸出口。

__lt__(other)[source]

傳回 [輸入口] > [運算子] 或 [運算子] < [輸入口]。

如果 other 是一個帶有 attr 註解的物件,它將被設定為此運算子的輸入口。

__setattr__(key, value)[source]

實作 setattr(self, name, value)。

add_inlets(inlets)[source]

設定此運算子的輸入口。

add_outlets(outlets)[source]

定義此運算子的輸出口。

get_dag()[source]
has_dag()[source]

如果運算子已分配給 DAG,則傳回 True。

prepare_for_execution()[source]

鎖定任務以供執行,以停用 __setattr__ 中的自訂動作並傳回副本。

set_xcomargs_dependencies()[source]

解析任務的上游依賴項。

透過這種方式,將 XComArg 作為範本欄位的值傳遞,將導致在兩個任務之間建立上游關係。

範例:

with DAG(...):
    generate_content = GenerateContentOperator(task_id="generate_content")
    send_email = EmailOperator(..., html_content=generate_content.output)

# This is equivalent to
with DAG(...):
    generate_content = GenerateContentOperator(task_id="generate_content")
    send_email = EmailOperator(..., html_content="{{ task_instance.xcom_pull('generate_content') }}")
    generate_content >> send_email
pre_execute(context)[source]

在呼叫 self.execute() 之前立即執行。

abstract execute(context)[source]

在建立運算子時衍生。

Context 是在渲染 jinja 範本時使用的相同字典。

有關更多上下文,請參閱 get_template_context。

post_execute(context, result=None)[source]

在呼叫 self.execute() 之後立即執行。

它會傳遞執行環境以及運算子傳回的任何結果。

on_kill()[source]

覆寫此方法以在任務實例被終止時清理子進程。

在運算子中使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下幽靈進程。

__deepcopy__(memo)[source]
__getstate__()[source]
__setstate__(state)[source]
render_template_fields(context, jinja_env=None)[source]

範本化 self.template_fields 中列出的所有屬性。

這會就地變更屬性,且不可逆。

參數
  • context (airflow.utils.context.Context) – 具有要應用於內容的值的 Context 字典。

  • jinja_env (jinja2.Environment | None) – 用於渲染的 Jinja 環境。

clear(start_date=None, end_date=None, upstream=False, downstream=False, session=NEW_SESSION)[source]

清除與任務相關聯的任務實例的狀態,並遵循指定的參數。

get_task_instances(start_date=None, end_date=None, session=NEW_SESSION)[source]

取得與此任務相關的任務實例,適用於特定的日期範圍。

run(start_date=None, end_date=None, ignore_first_depends_on_past=True, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, mark_success=False, test_mode=False, session=NEW_SESSION)[source]

針對日期範圍執行一組任務實例。

dry_run()[source]

對運算子執行 dry run - 僅渲染範本欄位。

get_direct_relatives(upstream=False)[source]

取得目前任務的直接關聯項清單,無論是上游還是下游。

__repr__()[source]

傳回 repr(self)。

static xcom_push(context, key, value, execution_date=None)[source]

使 XCom 可供任務提取。

參數
  • context (Any) – 執行環境字典

  • key (str) – XCom 的鍵

  • value (Any) – XCom 的值。該值會被 pickle 序列化並儲存在資料庫中。

  • execution_date (datetime.datetime | None) – 如果提供,XCom 將在此日期之前不可見。例如,這可用於在未來的日期向任務發送訊息,而不會立即顯示。

static xcom_pull(context, task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=None, session=NEW_SESSION)[source]

提取可選地符合特定條件的 XCom。

key 的預設值將搜尋限制為其他任務傳回的 XCom(而不是手動推送的 XCom)。若要移除此篩選器,請傳遞 key=None(或任何想要的值)。

如果提供單個 task_id 字串,則結果是來自該 task_id 的最新匹配 XCom 的值。如果提供多個 task_ids,則會傳回匹配值的元組。如果找不到任何匹配項,則傳回 None。

參數
  • context (Any) – 執行環境字典

  • key (str) – XCom 的鍵。如果提供,則僅傳回具有匹配鍵的 XCom。預設鍵為 'return_value',也可用作常數 XCOM_RETURN_KEY。此鍵會自動賦予任務傳回的 XCom(而不是手動推送的 XCom)。若要移除篩選器,請傳遞 key=None。

  • task_ids (str | list[str] | None) – 僅提取來自具有匹配 ID 的任務的 XCom。可以傳遞 None 以移除篩選器。

  • dag_id (str | None) – 如果提供,則僅從此 DAG 提取 XCom。如果為 None(預設值),則使用呼叫任務的 DAG。

  • include_prior_dates (bool | None) – 如果為 False,則僅傳回來自目前 execution_date 的 XCom。如果為 True,則也會傳回先前日期的 XCom。

classmethod get_serialized_fields()[source]

字串化的 DAG 和運算子精確地包含這些欄位。

serialize_for_task_group()[source]

序列化;DAGNode 需要。

defer(*, trigger, method_name, kwargs=None, timeout=None)[source]

將此運算子標記為「deferred」,暫停其執行,直到提供的觸發器觸發事件。

這是透過引發一個特殊的例外 (TaskDeferred) 來實現的,該例外在主要的 _execute_task 包裝器中被捕獲。觸發器可以將執行送回任務,或直接結束任務實例。如果觸發器將自行結束任務實例,則 method_name 應為 None;否則,請提供在恢復任務執行時應使用的方法名稱。

resume_execution(next_method, next_kwargs, context)[source]

當 deferred 任務恢復時呼叫此方法。

airflow.models.baseoperator.Chainable[source]
airflow.models.baseoperator.chain(*tasks)[source]

給定一些任務,建立一個依賴鏈。

此函式接受 BaseOperator(又名 tasks)、EdgeModifiers(又名 Labels)、XComArg、TaskGroups 或包含這些類型任何組合的清單(或同一清單中的組合)的值。如果您想在兩個清單之間串聯,則必須確保它們具有相同的長度。

使用傳統運算子/感測器

chain(t1, [t2, t3], [t4, t5], t6)

相當於

  / -> t2 -> t4 \
t1               -> t6
  \ -> t3 -> t5 /
t1.set_downstream(t2)
t1.set_downstream(t3)
t2.set_downstream(t4)
t3.set_downstream(t5)
t4.set_downstream(t6)
t5.set_downstream(t6)

使用任務裝飾函式又名 XComArgs

chain(x1(), [x2(), x3()], [x4(), x5()], x6())

相當於

  / -> x2 -> x4 \
x1               -> x6
  \ -> x3 -> x5 /
x1 = x1()
x2 = x2()
x3 = x3()
x4 = x4()
x5 = x5()
x6 = x6()
x1.set_downstream(x2)
x1.set_downstream(x3)
x2.set_downstream(x4)
x3.set_downstream(x5)
x4.set_downstream(x6)
x5.set_downstream(x6)

使用 TaskGroups

chain(t1, task_group1, task_group2, t2)

t1.set_downstream(task_group1)
task_group1.set_downstream(task_group2)
task_group2.set_downstream(t2)

也可以混合使用傳統運算子/感測器、EdgeModifiers、XComArg 和 TaskGroups

chain(t1, [Label("branch one"), Label("branch two")], [x1(), x2()], task_group1, x3())

相當於

  / "branch one" -> x1 \
t1                      -> task_group1 -> x3
  \ "branch two" -> x2 /
x1 = x1()
x2 = x2()
x3 = x3()
label1 = Label("branch one")
label2 = Label("branch two")
t1.set_downstream(label1)
label1.set_downstream(x1)
t2.set_downstream(label2)
label2.set_downstream(x2)
x1.set_downstream(task_group1)
x2.set_downstream(task_group1)
task_group1.set_downstream(x3)

# or

x1 = x1()
x2 = x2()
x3 = x3()
t1.set_downstream(x1, edge_modifier=Label("branch one"))
t1.set_downstream(x2, edge_modifier=Label("branch two"))
x1.set_downstream(task_group1)
x2.set_downstream(task_group1)
task_group1.set_downstream(x3)
參數

tasks (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 個別和/或任務、EdgeModifiers、XComArgs 或 TaskGroups 的清單,以設定依賴項

airflow.models.baseoperator.cross_downstream(from_tasks, to_tasks)[source]

為 from_tasks 中的所有任務設定對 to_tasks 中所有任務的下游依賴。

使用傳統運算子/感測器

cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6])

相當於

t1 ---> t4
   \ /
t2 -X -> t5
   / \
t3 ---> t6
t1.set_downstream(t4)
t1.set_downstream(t5)
t1.set_downstream(t6)
t2.set_downstream(t4)
t2.set_downstream(t5)
t2.set_downstream(t6)
t3.set_downstream(t4)
t3.set_downstream(t5)
t3.set_downstream(t6)

使用任務裝飾函式又名 XComArgs

cross_downstream(from_tasks=[x1(), x2(), x3()], to_tasks=[x4(), x5(), x6()])

相當於

x1 ---> x4
   \ /
x2 -X -> x5
   / \
x3 ---> x6
x1 = x1()
x2 = x2()
x3 = x3()
x4 = x4()
x5 = x5()
x6 = x6()
x1.set_downstream(x4)
x1.set_downstream(x5)
x1.set_downstream(x6)
x2.set_downstream(x4)
x2.set_downstream(x5)
x2.set_downstream(x6)
x3.set_downstream(x4)
x3.set_downstream(x5)
x3.set_downstream(x6)

也可以混合使用傳統運算子/感測器和 XComArg 任務

cross_downstream(from_tasks=[t1, x2(), t3], to_tasks=[x1(), t2, x3()])

相當於

t1 ---> x1
   \ /
x2 -X -> t2
   / \
t3 ---> x3
x1 = x1()
x2 = x2()
x3 = x3()
t1.set_downstream(x1)
t1.set_downstream(t2)
t1.set_downstream(x3)
x2.set_downstream(x1)
x2.set_downstream(t2)
x2.set_downstream(x3)
t3.set_downstream(x1)
t3.set_downstream(t2)
t3.set_downstream(x3)
參數
  • from_tasks (Sequence[airflow.models.taskmixin.DependencyMixin]) – 要從其開始的任務或 XComArgs 清單。

  • to_tasks (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 要設定為下游依賴項的任務或 XComArgs 清單。

airflow.models.baseoperator.chain_linear(*elements)[source]

簡化任務依賴定義。

例如:假設您想要像這樣的優先順序

    ╭─op2─╮ ╭─op4─╮
op1─┤     ├─├─op5─┤─op7
    ╰-op3─╯ ╰-op6─╯

然後您可以像這樣完成

chain_linear(op1, [op2, op3], [op4, op5, op6], op7)
參數

elements (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 運算子清單 / 運算子清單

此條目是否有幫助?