airflow.operators.python

模組內容

類別

PythonOperator

執行 Python 可呼叫物件。

BranchPythonOperator

工作流程可以在此任務執行後「分支」或遵循路徑。

ShortCircuitOperator

允許管線根據 python_callable 的結果繼續執行。

PythonVirtualenvOperator

在自動建立和銷毀的 virtualenv 中執行函式。

BranchPythonVirtualenvOperator

工作流程可以在虛擬環境中執行此任務後「分支」或遵循路徑。

ExternalPythonOperator

在不會重新建立的 virtualenv 中執行函式。

BranchExternalPythonOperator

工作流程可以在此任務執行後「分支」或遵循路徑。

函式

is_venv_installed()

檢查 virtualenv 套件是否已安裝,方法是檢查它是否在路徑上或是否作為套件安裝。

task([python_callable, multiple_outputs])

請改用 airflow.decorators.task(),此方法已棄用。

get_current_context()

擷取執行環境字典,而不變更使用者方法的簽章。

屬性

log

airflow.operators.python.log[原始碼]
airflow.operators.python.is_venv_installed()[原始碼]

檢查 virtualenv 套件是否已安裝,方法是檢查它是否在路徑上或是否作為套件安裝。

回傳

如果是,則為 True。無論哪種檢查方式有效都可以。

回傳類型

bool

airflow.operators.python.task(python_callable=None, multiple_outputs=None, **kwargs)[原始碼]

請改用 airflow.decorators.task(),此方法已棄用。

呼叫 @task.python 並允許使用者將 Python 函式轉換為 Airflow 任務。

參數
  • python_callable (Callable | None) – 可呼叫物件的參考

  • op_kwargs – 將在您的函式中解壓縮的關鍵字引數字典 (已套用範本)

  • op_args – 將在呼叫您的可呼叫物件時解壓縮的位置引數清單 (已套用範本)

  • multiple_outputs (bool | None) – 如果設定,函式傳回值將會展開為多個 XCom 值。字典將會展開為以鍵作為鍵的 xcom 值。預設為 False。

class airflow.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[原始碼]

基底類別: airflow.models.baseoperator.BaseOperator

執行 Python 可呼叫物件。

另請參閱

如需有關如何使用此運算子的詳細資訊,請參閱指南: PythonOperator

執行可呼叫物件時,Airflow 會傳遞一組關鍵字引數,這些引數可用於您的函式中。這組 kwargs 完全對應於您可以在 jinja 範本中使用的內容。為了使其運作,您需要在函式標頭中定義 **kwargs,或者您可以直接新增您想要取得的關鍵字引數 - 例如,使用以下程式碼,您的可呼叫物件將取得 tinext_ds 環境變數的值。

使用明確引數

def my_python_callable(ti, next_ds):
    pass

使用 kwargs

def my_python_callable(**kwargs):
    ti = kwargs["ti"]
    next_ds = kwargs["next_ds"]
參數
  • python_callable (Callable) – 可呼叫物件的參考

  • op_args (Collection[Any] | None) – 將在呼叫您的可呼叫物件時解壓縮的位置引數清單

  • op_kwargs (Mapping[str, Any] | None) – 將在您的函式中解壓縮的關鍵字引數字典

  • templates_dict (dict[str, Any] | None) – 值為範本的字典,這些範本將在 __init__execute 之間由 Airflow 引擎套用範本,並在範本套用後在您的可呼叫物件的環境中使用。(已套用範本)

  • templates_exts (Sequence[str] | None) – 在處理已套用範本的欄位時要解析的檔案副檔名清單,例如 ['.sql', '.hql']

  • show_return_value_in_logs (bool) – 是否顯示 return_value 日誌的布林值。預設為 True,允許 return value 日誌輸出。當您傳回大量資料 (例如傳輸大量 XCom 至 TaskAPI) 時,可以將其設定為 False 以防止 return value 日誌輸出。

template_fields: Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')[原始碼]
template_fields_renderers[原始碼]
BLUE = '#ffefeb'[原始碼]
ui_color[原始碼]
shallow_copy_attrs: Sequence[str] = ('python_callable', 'op_kwargs')[原始碼]
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與轉譯 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多環境資訊。

determine_kwargs(context)[原始碼]
execute_callable()[原始碼]

使用給定的引數呼叫 python 可呼叫物件。

回傳

呼叫的傳回值。

回傳類型

Any

class airflow.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[原始碼]

基底類別: PythonOperator, airflow.operators.branch.BranchMixIn

工作流程可以在此任務執行後「分支」或遵循路徑。

它衍生自 PythonOperator,並預期一個 Python 函式,該函式會傳回單一 task_id、單一 task_group_id,或要遵循的 task_ids 和/或 task_group_ids 清單。傳回的 task_id(s) 和/或 task_group_id(s) 應指向 {self} 直接下游的任務或任務群組。所有其他「分支」或直接下游任務都標記為 skipped 狀態,以便這些路徑無法繼續前進。skipped 狀態會向下游傳播,以允許 DAG 狀態填滿並推斷 DAG 執行狀態。

execute(context)[原始碼]

在建立運算子時衍生。

Context 是與轉譯 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多環境資訊。

class airflow.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[原始碼]

基底類別: PythonOperator, airflow.models.skipmixin.SkipMixin

允許管線根據 python_callable 的結果繼續執行。

ShortCircuitOperator 衍生自 PythonOperator,並評估 python_callable 的結果。如果傳回的結果為 False 或 falsy 值,則管線將會短路。下游任務將根據設定的短路模式標記為「skipped」狀態。如果傳回的結果為 True 或 truthy 值,則下游任務會照常進行,並推送傳回結果的 XCom

短路可以設定為遵守或忽略為下游任務設定的 trigger_rule。如果 ignore_downstream_trigger_rules 設定為 True (預設設定),則所有下游任務都會被略過,而不會考慮為任務定義的 trigger_rule。但是,如果此參數設定為 False,則會略過直接下游任務,但會遵守其他後續下游任務的指定 trigger_rule。在此模式下,運算子會假設直接下游任務是有意要略過的,但可能並非其他後續任務。

另請參閱

如需有關如何使用此運算子的詳細資訊,請參閱指南: ShortCircuitOperator

參數

ignore_downstream_trigger_rules (bool) – 如果設定為 True,則會略過此運算子任務的所有下游任務。這是預設行為。如果設定為 False,則會略過直接的下游任務,但會遵守為所有其他下游任務定義的 trigger_rule

execute(context)[原始碼]

在建立運算子時衍生。

Context 是與轉譯 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多環境資訊。

class airflow.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[原始碼]

基底類別: _BasePythonVirtualenvOperator

在自動建立和銷毀的 virtualenv 中執行函式。

函式 (具有某些注意事項) 必須使用 def 定義,且不得為類別的一部分。所有匯入都必須在函式內部發生,且不得參考範圍外的變數。名為 virtualenv_string_args 的全域範圍變數將會可用 (由 string_args 填入)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用傳回值。請注意,如果您的 virtualenv 在與 Airflow 不同的 Python 主要版本中執行,則您無法使用傳回值、op_args、op_kwargs,或使用透過外掛程式提供給 Airflow 的任何巨集。但是您可以使用 string_args。

另請參閱

如需有關如何使用此運算子的詳細資訊,請參閱指南: PythonVirtualenvOperator

參數
  • python_callable (Callable) – 沒有外部變數參考的 python 函式,以 def 定義,將在虛擬環境中執行。

  • requirements (None | Iterable[str] | str) – 需求字串清單,或 pip 指定的 (已套用範本)「需求檔案」。

  • python_version (str | None) – 要執行虛擬環境的 Python 版本。請注意,2 和 2.7 都是可接受的形式。

  • serializer (_SerializerTypeDef | None) –

    用於序列化引數和結果的序列化程式。它可以是下列其中之一

    • "pickle": (預設) 使用 pickle 進行序列化。包含在 Python 標準程式庫中。

    • "cloudpickle": 使用 cloudpickle 序列化更複雜的類型,這需要將 cloudpickle 包含在您的需求中。

    • "dill": 使用 dill 序列化更複雜的類型,這需要將 dill 包含在您的需求中。

  • system_site_packages (bool) – 是否在您的虛擬環境中包含 system_site_packages。請參閱 virtualenv 文件以取得更多資訊。

  • pip_install_options (list[str] | None) – 安裝需求時的 pip install 選項清單。請參閱 'pip install -h' 以取得可用的選項

  • op_args (Collection[Any] | None) – 要傳遞至 python_callable 的位置引數清單。

  • op_kwargs (Mapping[str, Any] | None) – 要傳遞至 python_callable 的關鍵字引數字典。

  • string_args (Iterable[str] | None) – 存在於全域變數 virtualenv_string_args 中的字串,在執行階段可作為 list[str] 供 python_callable 使用。請注意,引數會依換行符號分隔。

  • templates_dict (dict | None) – 值為範本的字典,這些範本將在 __init__execute 之間由 Airflow 引擎套用範本,並在範本套用後在您的可呼叫物件的環境中使用

  • templates_exts (list[str] | None) – 在處理已套用範本的欄位時要解析的檔案副檔名清單,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 預期目標環境中已安裝 Airflow。如果為 true,則當未安裝 Airflow 時,運算子將會引發警告,並且在啟動時會嘗試載入 Airflow 巨集。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此結束代碼結束,則將任務保留在 skipped 狀態 (預設值: None)。如果設定為 None,則任何非零結束代碼都將被視為失敗。

  • index_urls (None | Collection[str] | str) – 從中載入 Python 套件的索引 URL 選用清單。如果未提供,則系統 pip conf 將用於從中取得套件。

  • venv_cache_path (None | os.PathLike[str]) – 虛擬環境父資料夾的選用路徑,虛擬環境將在其中快取,建立子資料夾 venv-{hash},其中 hash 將替換為需求的總和檢查碼。如果未提供,則將在每次執行時在暫存資料夾中建立和刪除虛擬環境。

  • env_vars (dict[str, str] | None) – 包含其他環境變數的字典,用於在執行虛擬環境時為其設定。

  • inherit_env (bool) – 是否在執行虛擬環境時繼承目前的環境變數。如果設定為 True,則虛擬環境將繼承父程序 (os.environ) 的環境變數。如果設定為 False,則虛擬環境將在乾淨的環境中執行。

  • use_dill (bool) – 已棄用,請改用 serializer。是否使用 dill 序列化引數和結果 (預設為 pickle)。這允許更複雜的類型,但需要您在需求中包含 dill。

template_fields: Sequence[str][原始碼]
template_ext: Sequence[str] = ('.txt',)[原始碼]
execute_callable()[原始碼]

使用給定的引數呼叫 python 可呼叫物件。

回傳

呼叫的傳回值。

class airflow.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]

繼承自: PythonVirtualenvOperator, airflow.operators.branch.BranchMixIn

工作流程可以在虛擬環境中執行此任務後「分支」或遵循路徑。

它衍生自 PythonVirtualenvOperator,並預期一個 Python 函數,該函數返回單個 task_id、單個 task_group_id,或要遵循的 task_ids 和/或 task_group_ids 列表。返回的 task_id(s) 和/或 task_group_id(s) 應指向直接位於 {self} 下游的任務或任務組。所有其他「分支」或直接下游任務都標記為 skipped 狀態,以便這些路徑無法前進。skipped 狀態會向下游傳播,以允許 DAG 狀態填滿並推斷 DAG 運行狀態。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: BranchPythonVirtualenvOperator

execute(context)[source]

在建立運算子時衍生。

Context 是與轉譯 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多環境資訊。

class airflow.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]

基底類別: _BasePythonVirtualenvOperator

在不會重新建立的 virtualenv 中執行函式。

重複使用,無需建立虛擬環境的額外開銷(但有一些注意事項)。

該函數必須使用 def 定義,且不能是類別的一部分。所有導入都必須在函數內部發生,並且不能引用範圍外的變數。名為 virtualenv_string_args 的全域範圍變數將可用(由 string_args 填充)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用傳回值。請注意,如果您的虛擬環境運行的 Python 主要版本與 Airflow 不同,則您無法使用傳回值、op_args、op_kwargs,或使用透過外掛程式提供給 Airflow 的任何巨集。但是您可以使用 string_args。

如果 Airflow 安裝在外部環境中的版本與運算子使用的版本不同,則運算子將會失敗。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: ExternalPythonOperator

參數
  • python (str) – 指向應使用的虛擬環境內 Python 二進位檔案的完整路徑字串(特定於檔案系統)(在 VENV/bin 資料夾中)。應為絕對路徑(因此通常以“/”或“X:/”開頭,具體取決於使用的檔案系統/作業系統)。

  • python_callable (Callable) – 沒有外部變數參考的 python 函式,以 def 定義,將在虛擬環境中執行。

  • serializer (_SerializerTypeDef | None) –

    用於序列化引數和結果的序列化程式。它可以是下列其中之一

    • "pickle": (預設) 使用 pickle 進行序列化。包含在 Python 標準程式庫中。

    • "cloudpickle": 使用 cloudpickle 序列化更複雜的類型,這需要將 cloudpickle 包含在您的需求中。

    • "dill": 使用 dill 序列化更複雜的類型,這需要將 dill 包含在您的需求中。

  • op_args (Collection[Any] | None) – 要傳遞至 python_callable 的位置引數清單。

  • op_kwargs (Mapping[str, Any] | None) – 要傳遞至 python_callable 的關鍵字引數字典。

  • string_args (Iterable[str] | None) – 存在於全域變數 virtualenv_string_args 中的字串,在執行階段可作為 list[str] 供 python_callable 使用。請注意,引數會依換行符號分隔。

  • templates_dict (dict | None) – 值為範本的字典,這些範本將在 __init__execute 之間由 Airflow 引擎套用範本,並在範本套用後在您的可呼叫物件的環境中使用

  • templates_exts (list[str] | None) – 在處理已套用範本的欄位時要解析的檔案副檔名清單,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 預期目標環境中已安裝 Airflow。如果為 true,則當未安裝 Airflow 時,運算子將會引發警告,並且在啟動時會嘗試載入 Airflow 巨集。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此結束代碼結束,則將任務保留在 skipped 狀態 (預設值: None)。如果設定為 None,則任何非零結束代碼都將被視為失敗。

  • env_vars (dict[str, str] | None) – 包含其他環境變數的字典,用於在執行虛擬環境時為其設定。

  • inherit_env (bool) – 是否在執行虛擬環境時繼承目前的環境變數。如果設定為 True,則虛擬環境將繼承父程序 (os.environ) 的環境變數。如果設定為 False,則虛擬環境將在乾淨的環境中執行。

  • use_dill (bool) – 已棄用,請改用 serializer。是否使用 dill 序列化引數和結果 (預設為 pickle)。這允許更複雜的類型,但需要您在需求中包含 dill。

template_fields: Sequence[str][source]
execute_callable()[source]

使用給定的引數呼叫 python 可呼叫物件。

回傳

呼叫的傳回值。

class airflow.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]

繼承自: ExternalPythonOperator, airflow.operators.branch.BranchMixIn

工作流程可以在此任務執行後「分支」或遵循路徑。

擴展了 ExternalPythonOperator,因此期望取得 Python:應該使用的虛擬環境(在 VENV/bin 資料夾中)。應為絕對路徑,因此它可以像 ExternalPythonOperator 一樣在單獨的虛擬環境中運行。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: BranchExternalPythonOperator

execute(context)[source]

在建立運算子時衍生。

Context 是與轉譯 jinja 範本時使用的相同字典。

請參閱 get_template_context 以取得更多環境資訊。

airflow.operators.python.get_current_context()[source]

檢索執行上下文字典,而無需更改使用者方法的簽章。

這是檢索執行上下文字典的最簡單方法。

舊樣式

def my_task(**context):
    ti = context["ti"]

新樣式

from airflow.operators.python import get_current_context


def my_task():
    context = get_current_context()
    ti = context["ti"]

僅當在運算子開始執行後調用此方法時,當前上下文才會有值。

此條目是否有幫助?