airflow.providers.standard.operators.python

模組內容

類別

PythonOperator

執行 Python 可呼叫物件。

BranchPythonOperator

工作流程可以在此任務執行後「分支」或遵循路徑。

ShortCircuitOperator

允許管線根據 python_callable 的結果繼續執行。

PythonVirtualenvOperator

在自動建立和銷毀的 virtualenv 中執行函式。

BranchPythonVirtualenvOperator

工作流程可以在虛擬環境中執行此任務後「分支」或遵循路徑。

ExternalPythonOperator

在不會重新建立的 virtualenv 中執行函式。

BranchExternalPythonOperator

工作流程可以在此任務執行後「分支」或遵循路徑。

函式

get_current_context()

檢索執行上下文字典,而不變更使用者方法的簽章。

屬性

log

airflow.providers.standard.operators.python.log[原始碼]
class airflow.providers.standard.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[原始碼]

基底類別: airflow.models.baseoperator.BaseOperator

執行 Python 可呼叫物件。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: PythonOperator

執行可呼叫物件時,Airflow 將傳遞一組關鍵字引數,這些引數可用於您的函式中。這組 kwargs 完全對應於您可以在 Jinja 範本中使用的內容。為了使其運作,您需要在函式標頭中定義 **kwargs,或者您可以直接新增您想要取得的關鍵字引數 - 例如,使用以下程式碼,您的可呼叫物件將取得 ti 上下文變數的值。

使用明確引數

def my_python_callable(ti):
    pass

使用 kwargs

def my_python_callable(**kwargs):
    ti = kwargs["ti"]
參數
  • python_callable (Callable) – 對可呼叫物件的參考

  • op_args (collections.abc.Collection[Any] | None) – 位置引數的列表,將在呼叫您的可呼叫物件時解包

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 關鍵字引數的字典,將在您的函式中解包

  • templates_dict (dict[str, Any] | None) – 字典,其中的值是範本,這些範本將在 __init__execute 之間由 Airflow 引擎進行範本化,並在套用範本後在您的可呼叫物件的上下文中提供。(已範本化)

  • templates_exts (collections.abc.Sequence[str] | None) – 檔案副檔名列表,用於在處理範本化欄位時解析,例如 ['.sql', '.hql']

  • show_return_value_in_logs (bool) – 布林值,指示是否顯示 return_value 日誌。預設為 True,允許 return value 日誌輸出。可以設定為 False,以防止在您傳回大量資料(例如將大量 XCom 傳輸到 TaskAPI)時記錄 return value 輸出。

template_fields: collections.abc.Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')
template_fields_renderers
BLUE: = '#ffefeb'
ui_color
shallow_copy_attrs: collections.abc.Sequence[str] = ('python_callable', 'op_kwargs')
execute(context)

在建立運算子時衍生。

上下文是與呈現 Jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

determine_kwargs(context)
execute_callable()

使用給定的引數呼叫 python 可呼叫物件。

傳回

呼叫的傳回值。

傳回類型

Any

class airflow.providers.standard.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)

基底類別: PythonOperator, airflow.operators.branch.BranchMixIn

工作流程可以在此任務執行後「分支」或遵循路徑。

它衍生自 PythonOperator,並預期一個 Python 函式,該函式傳回單個 task_id、單個 task_group_id 或要遵循的 task_id 和/或 task_group_id 列表。傳回的 task_id(s) 和/或 task_group_id(s) 應指向直接位於 {self} 下游的任務或任務群組。所有其他「分支」或直接下游任務都標記為 skipped 狀態,以便這些路徑無法向前移動。skipped 狀態會向下游傳播,以允許填滿 DAG 狀態並推斷 DAG 執行狀態。

execute(context)

在建立運算子時衍生。

上下文是與呈現 Jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

class airflow.providers.standard.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)

基底類別: PythonOperator, airflow.models.skipmixin.SkipMixin

允許管線根據 python_callable 的結果繼續執行。

ShortCircuitOperator 衍生自 PythonOperator,並評估 python_callable 的結果。如果傳回的結果為 False 或 falsy 值,則管線將會短路。下游任務將根據設定的短路模式標記為「skipped」狀態。如果傳回的結果為 True 或 truthy 值,則下游任務會照常進行,並推送傳回結果的 XCom

可以設定短路以遵循或忽略為下游任務設定的 trigger_rule。如果 ignore_downstream_trigger_rules 設定為 True(預設設定),則會跳過所有下游任務,而無需考慮為任務定義的 trigger_rule。但是,如果此參數設定為 False,則會跳過直接下游任務,但會遵循其他後續下游任務的指定 trigger_rule。在此模式下,運算子假設直接下游任務是有意要跳過的,但可能不是其他後續任務。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: ShortCircuitOperator

參數

ignore_downstream_trigger_rules (bool) – 如果設定為 True,則將跳過此運算子任務的所有下游任務。這是預設行為。如果設定為 False,則將跳過直接下游任務,但將遵循為所有其他下游任務定義的 trigger_rule

execute(context)

在建立運算子時衍生。

上下文是與呈現 Jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

class airflow.providers.standard.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)

基底類別: _BasePythonVirtualenvOperator

在自動建立和銷毀的 virtualenv 中執行函式。

函式(具有某些注意事項)必須使用 def 定義,而不是類別的一部分。所有匯入都必須在函式內發生,並且不得參考範圍外的任何變數。名為 virtualenv_string_args 的全域範圍變數將可用(由 string_args 填入)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用傳回值。請注意,如果您的 virtualenv 在與 Airflow 不同的 Python 主要版本中執行,則您無法使用傳回值、op_args、op_kwargs,或使用透過外掛程式提供給 Airflow 的任何巨集。但是,您可以使用 string_args。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: PythonVirtualenvOperator

參數
  • python_callable (Callable) – 一個 Python 函式,不參考外部變數,使用 def 定義,將在虛擬環境中執行。

  • requirements (None | collections.abc.Iterable[str] | str) – 需求字串列表,或 pip 指定的(範本化)「需求檔案」。

  • python_version (str | None) – 執行虛擬環境的 Python 版本。請注意,2 和 2.7 都是可接受的形式。

  • serializer (_SerializerTypeDef | None) –

    用於序列化引數和結果的序列化器。它可以是以下其中之一

    • "pickle":(預設) 使用 pickle 進行序列化。包含在 Python 標準函式庫中。

    • "cloudpickle":使用 cloudpickle 序列化更複雜的類型,這需要將 cloudpickle 包含在您的需求中。

    • "dill":使用 dill 序列化更複雜的類型,這需要將 dill 包含在您的需求中。

  • system_site_packages (bool) – 是否在您的虛擬環境中包含 system_site_packages。有關更多資訊,請參閱 virtualenv 文件。

  • pip_install_options (list[str] | None) – 安裝需求時的 pip 安裝選項列表。請參閱「pip install -h」以取得可用選項

  • op_args (collections.abc.Collection[Any] | None) – 要傳遞給 python_callable 的位置引數列表。

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 要傳遞給 python_callable 的關鍵字引數字典。

  • string_args (collections.abc.Iterable[str] | None) – 存在於全域變數 virtualenv_string_args 中的字串,在執行時以 list[str] 的形式提供給 python_callable。請注意,引數以換行符號分隔。

  • templates_dict (dict | None) – 字典,其中的值是範本,這些範本將在 __init__execute 之間由 Airflow 引擎進行範本化,並在套用範本後在您的可呼叫物件的上下文中提供

  • templates_exts (list[str] | None) – 檔案副檔名列表,用於在處理範本化欄位時解析,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 預期目標環境中安裝了 Airflow。如果為 true,如果未安裝 Airflow,運算子將引發警告,並在啟動時嘗試載入 Airflow 巨集。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此結束代碼結束,則將任務保持在 skipped 狀態(預設值:None)。如果設定為 None,則任何非零結束代碼都將被視為失敗。

  • index_urls (None | collections.abc.Collection[str] | str) – 可選的索引 URL 列表,用於從中載入 Python 套件。如果未提供,則將使用系統 pip conf 從來源套件。

  • venv_cache_path (None | os.PathLike[str]) – 虛擬環境父資料夾的可選路徑,其中將快取虛擬環境,建立子資料夾 venv-{hash},其中 hash 將替換為需求的校驗和。如果未提供,則將在每次執行時在暫存資料夾中建立和刪除虛擬環境。

  • env_vars (dict[str, str] | None) – 字典,其中包含在執行虛擬環境時要為其設定的其他環境變數。

  • inherit_env (bool) – 在執行虛擬環境時是否繼承目前的環境變數。如果設定為 True,則虛擬環境將繼承父進程 (os.environ) 的環境變數。如果設定為 False,則將使用乾淨的環境執行虛擬環境。

  • use_airflow_context (bool) – 是否向 python_callable 提供 get_current_context()。尚未實作 - 等待 AIP-72 上下文序列化。

template_fields: collections.abc.Sequence[str]
template_ext: collections.abc.Sequence[str] = ('.txt',)
execute_callable()

使用給定的引數呼叫 python 可呼叫物件。

傳回

呼叫的傳回值。

class airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)

基底類別: PythonVirtualenvOperator, airflow.operators.branch.BranchMixIn

工作流程可以在虛擬環境中執行此任務後「分支」或遵循路徑。

它衍生自 PythonVirtualenvOperator,並預期一個 Python 函式,該函式傳回單個 task_id、單個 task_group_id 或要遵循的 task_id 和/或 task_group_id 列表。傳回的 task_id(s) 和/或 task_group_id(s) 應指向直接位於 {self} 下游的任務或任務群組。所有其他「分支」或直接下游任務都標記為 skipped 狀態,以便這些路徑無法向前移動。skipped 狀態會向下游傳播,以允許填滿 DAG 狀態並推斷 DAG 執行狀態。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: BranchPythonVirtualenvOperator

execute(context)[source]

在建立運算子時衍生。

上下文是與呈現 Jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

class airflow.providers.standard.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]

基底類別: _BasePythonVirtualenvOperator

在不會重新建立的 virtualenv 中執行函式。

重複使用,無需建立虛擬環境的額外負擔 (但有一些注意事項)。

該函數必須使用 def 定義,且不能是類別的一部分。所有 import 必須在函數內部進行,且不能引用範圍外的變數。名為 virtualenv_string_args 的全域範圍變數將會可用 (由 string_args 填充)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用傳回值。請注意,如果您的虛擬環境運行的 Python 主要版本與 Airflow 不同,則您無法使用傳回值、op_args、op_kwargs,或使用透過外掛程式提供給 Airflow 的任何巨集。但是您可以使用 string_args。

如果 Airflow 安裝在外部環境中的版本與運算子使用的版本不同,則運算子將會失敗。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: ExternalPythonOperator

參數
  • python (str) – 指向虛擬環境內應使用的 Python 二進位檔的完整路徑字串 (特定於檔案系統) (在 VENV/bin 資料夾中)。應為絕對路徑 (因此通常以 “/” 或 “X:/” 開頭,取決於使用的檔案系統/作業系統)。

  • python_callable (Callable) – 一個 Python 函式,不參考外部變數,使用 def 定義,將在虛擬環境中執行。

  • serializer (_SerializerTypeDef | None) –

    用於序列化引數和結果的序列化器。它可以是以下其中之一

    • "pickle":(預設) 使用 pickle 進行序列化。包含在 Python 標準函式庫中。

    • "cloudpickle":使用 cloudpickle 序列化更複雜的類型,這需要將 cloudpickle 包含在您的需求中。

    • "dill":使用 dill 序列化更複雜的類型,這需要將 dill 包含在您的需求中。

  • op_args (collections.abc.Collection[Any] | None) – 要傳遞給 python_callable 的位置引數列表。

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 要傳遞給 python_callable 的關鍵字引數字典。

  • string_args (collections.abc.Iterable[str] | None) – 存在於全域變數 virtualenv_string_args 中的字串,在執行時以 list[str] 的形式提供給 python_callable。請注意,引數以換行符號分隔。

  • templates_dict (dict | None) – 字典,其中的值是範本,這些範本將在 __init__execute 之間由 Airflow 引擎進行範本化,並在套用範本後在您的可呼叫物件的上下文中提供

  • templates_exts (list[str] | None) – 檔案副檔名列表,用於在處理範本化欄位時解析,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 預期目標環境中安裝了 Airflow。如果為 true,如果未安裝 Airflow,運算子將引發警告,並在啟動時嘗試載入 Airflow 巨集。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此結束代碼結束,則將任務保持在 skipped 狀態(預設值:None)。如果設定為 None,則任何非零結束代碼都將被視為失敗。

  • env_vars (dict[str, str] | None) – 字典,其中包含在執行虛擬環境時要為其設定的其他環境變數。

  • inherit_env (bool) – 在執行虛擬環境時是否繼承目前的環境變數。如果設定為 True,則虛擬環境將繼承父進程 (os.environ) 的環境變數。如果設定為 False,則將使用乾淨的環境執行虛擬環境。

  • use_airflow_context (bool) – 是否向 python_callable 提供 get_current_context()。尚未實作 - 等待 AIP-72 上下文序列化。

template_fields: collections.abc.Sequence[str][source]
execute_callable()[source]

使用給定的引數呼叫 python 可呼叫物件。

傳回

呼叫的傳回值。

class airflow.providers.standard.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]

繼承自: ExternalPythonOperator, airflow.operators.branch.BranchMixIn

工作流程可以在此任務執行後「分支」或遵循路徑。

擴展了 ExternalPythonOperator,因此期望取得 Python:應使用的虛擬環境 (在 VENV/bin 資料夾中)。應為絕對路徑,以便它可以像 ExternalPythonOperator 一樣在單獨的虛擬環境中運行。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: BranchExternalPythonOperator

execute(context)[source]

在建立運算子時衍生。

上下文是與呈現 Jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

airflow.providers.standard.operators.python.get_current_context()[source]

檢索執行上下文字典,而無需更改使用者方法的簽章。

這是檢索執行上下文字典的最簡單方法。

舊樣式

def my_task(**context):
    ti = context["ti"]

新樣式

from airflow.providers.standard.operators.python import get_current_context


def my_task():
    context = get_current_context()
    ti = context["ti"]

僅當在運算子開始執行後呼叫此方法時,當前上下文才會有值。

此條目是否有幫助?