airflow.operators.python
¶
模組內容¶
類別¶
執行 Python 可呼叫物件。 |
|
工作流程可以在此任務執行後「分支」或遵循路徑。 |
|
允許管線根據 |
|
在自動建立和銷毀的 virtualenv 中執行函式。 |
|
工作流程可以在虛擬環境中執行此任務後「分支」或遵循路徑。 |
|
在不會重新建立的 virtualenv 中執行函式。 |
|
工作流程可以在此任務執行後「分支」或遵循路徑。 |
函式¶
檢查 virtualenv 套件是否已安裝,方法是檢查它是否在路徑上或是否作為套件安裝。 |
|
|
請改用 |
擷取執行環境字典,而不變更使用者方法的簽章。 |
屬性¶
- airflow.operators.python.is_venv_installed()[原始碼]¶
檢查 virtualenv 套件是否已安裝,方法是檢查它是否在路徑上或是否作為套件安裝。
- 回傳
如果是,則為 True。無論哪種檢查方式有效都可以。
- 回傳類型
- airflow.operators.python.task(python_callable=None, multiple_outputs=None, **kwargs)[原始碼]¶
請改用
airflow.decorators.task()
,此方法已棄用。呼叫
@task.python
並允許使用者將 Python 函式轉換為 Airflow 任務。- 參數
python_callable (Callable | None) – 可呼叫物件的參考
op_kwargs – 將在您的函式中解壓縮的關鍵字引數字典 (已套用範本)
op_args – 將在呼叫您的可呼叫物件時解壓縮的位置引數清單 (已套用範本)
multiple_outputs (bool | None) – 如果設定,函式傳回值將會展開為多個 XCom 值。字典將會展開為以鍵作為鍵的 xcom 值。預設為 False。
- class airflow.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[原始碼]¶
基底類別:
airflow.models.baseoperator.BaseOperator
執行 Python 可呼叫物件。
另請參閱
如需有關如何使用此運算子的詳細資訊,請參閱指南: PythonOperator
執行可呼叫物件時,Airflow 會傳遞一組關鍵字引數,這些引數可用於您的函式中。這組 kwargs 完全對應於您可以在 jinja 範本中使用的內容。為了使其運作,您需要在函式標頭中定義
**kwargs
,或者您可以直接新增您想要取得的關鍵字引數 - 例如,使用以下程式碼,您的可呼叫物件將取得ti
和next_ds
環境變數的值。使用明確引數
def my_python_callable(ti, next_ds): pass
使用 kwargs
def my_python_callable(**kwargs): ti = kwargs["ti"] next_ds = kwargs["next_ds"]
- 參數
python_callable (Callable) – 可呼叫物件的參考
op_args (Collection[Any] | None) – 將在呼叫您的可呼叫物件時解壓縮的位置引數清單
op_kwargs (Mapping[str, Any] | None) – 將在您的函式中解壓縮的關鍵字引數字典
templates_dict (dict[str, Any] | None) – 值為範本的字典,這些範本將在
__init__
和execute
之間由 Airflow 引擎套用範本,並在範本套用後在您的可呼叫物件的環境中使用。(已套用範本)templates_exts (Sequence[str] | None) – 在處理已套用範本的欄位時要解析的檔案副檔名清單,例如
['.sql', '.hql']
show_return_value_in_logs (bool) – 是否顯示 return_value 日誌的布林值。預設為 True,允許 return value 日誌輸出。當您傳回大量資料 (例如傳輸大量 XCom 至 TaskAPI) 時,可以將其設定為 False 以防止 return value 日誌輸出。
- class airflow.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[原始碼]¶
基底類別:
PythonOperator
,airflow.operators.branch.BranchMixIn
工作流程可以在此任務執行後「分支」或遵循路徑。
它衍生自 PythonOperator,並預期一個 Python 函式,該函式會傳回單一 task_id、單一 task_group_id,或要遵循的 task_ids 和/或 task_group_ids 清單。傳回的 task_id(s) 和/或 task_group_id(s) 應指向 {self} 直接下游的任務或任務群組。所有其他「分支」或直接下游任務都標記為
skipped
狀態,以便這些路徑無法繼續前進。skipped
狀態會向下游傳播,以允許 DAG 狀態填滿並推斷 DAG 執行狀態。
- class airflow.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[原始碼]¶
基底類別:
PythonOperator
,airflow.models.skipmixin.SkipMixin
允許管線根據
python_callable
的結果繼續執行。ShortCircuitOperator 衍生自 PythonOperator,並評估
python_callable
的結果。如果傳回的結果為 False 或 falsy 值,則管線將會短路。下游任務將根據設定的短路模式標記為「skipped」狀態。如果傳回的結果為 True 或 truthy 值,則下游任務會照常進行,並推送傳回結果的XCom
。短路可以設定為遵守或忽略為下游任務設定的
trigger_rule
。如果ignore_downstream_trigger_rules
設定為 True (預設設定),則所有下游任務都會被略過,而不會考慮為任務定義的trigger_rule
。但是,如果此參數設定為 False,則會略過直接下游任務,但會遵守其他後續下游任務的指定trigger_rule
。在此模式下,運算子會假設直接下游任務是有意要略過的,但可能並非其他後續任務。另請參閱
如需有關如何使用此運算子的詳細資訊,請參閱指南: ShortCircuitOperator
- 參數
ignore_downstream_trigger_rules (bool) – 如果設定為 True,則會略過此運算子任務的所有下游任務。這是預設行為。如果設定為 False,則會略過直接的下游任務,但會遵守為所有其他下游任務定義的
trigger_rule
。
- class airflow.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[原始碼]¶
基底類別:
_BasePythonVirtualenvOperator
在自動建立和銷毀的 virtualenv 中執行函式。
函式 (具有某些注意事項) 必須使用 def 定義,且不得為類別的一部分。所有匯入都必須在函式內部發生,且不得參考範圍外的變數。名為 virtualenv_string_args 的全域範圍變數將會可用 (由 string_args 填入)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用傳回值。請注意,如果您的 virtualenv 在與 Airflow 不同的 Python 主要版本中執行,則您無法使用傳回值、op_args、op_kwargs,或使用透過外掛程式提供給 Airflow 的任何巨集。但是您可以使用 string_args。
另請參閱
如需有關如何使用此運算子的詳細資訊,請參閱指南: PythonVirtualenvOperator
- 參數
python_callable (Callable) – 沒有外部變數參考的 python 函式,以 def 定義,將在虛擬環境中執行。
requirements (None | Iterable[str] | str) – 需求字串清單,或 pip 指定的 (已套用範本)「需求檔案」。
python_version (str | None) – 要執行虛擬環境的 Python 版本。請注意,2 和 2.7 都是可接受的形式。
serializer (_SerializerTypeDef | None) –
用於序列化引數和結果的序列化程式。它可以是下列其中之一
"pickle"
: (預設) 使用 pickle 進行序列化。包含在 Python 標準程式庫中。"cloudpickle"
: 使用 cloudpickle 序列化更複雜的類型,這需要將 cloudpickle 包含在您的需求中。"dill"
: 使用 dill 序列化更複雜的類型,這需要將 dill 包含在您的需求中。
system_site_packages (bool) – 是否在您的虛擬環境中包含 system_site_packages。請參閱 virtualenv 文件以取得更多資訊。
pip_install_options (list[str] | None) – 安裝需求時的 pip install 選項清單。請參閱 'pip install -h' 以取得可用的選項
op_args (Collection[Any] | None) – 要傳遞至 python_callable 的位置引數清單。
op_kwargs (Mapping[str, Any] | None) – 要傳遞至 python_callable 的關鍵字引數字典。
string_args (Iterable[str] | None) – 存在於全域變數 virtualenv_string_args 中的字串,在執行階段可作為 list[str] 供 python_callable 使用。請注意,引數會依換行符號分隔。
templates_dict (dict | None) – 值為範本的字典,這些範本將在
__init__
和execute
之間由 Airflow 引擎套用範本,並在範本套用後在您的可呼叫物件的環境中使用templates_exts (list[str] | None) – 在處理已套用範本的欄位時要解析的檔案副檔名清單,例如
['.sql', '.hql']
expect_airflow (bool) – 預期目標環境中已安裝 Airflow。如果為 true,則當未安裝 Airflow 時,運算子將會引發警告,並且在啟動時會嘗試載入 Airflow 巨集。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此結束代碼結束,則將任務保留在
skipped
狀態 (預設值: None)。如果設定為None
,則任何非零結束代碼都將被視為失敗。index_urls (None | Collection[str] | str) – 從中載入 Python 套件的索引 URL 選用清單。如果未提供,則系統 pip conf 將用於從中取得套件。
venv_cache_path (None | os.PathLike[str]) – 虛擬環境父資料夾的選用路徑,虛擬環境將在其中快取,建立子資料夾 venv-{hash},其中 hash 將替換為需求的總和檢查碼。如果未提供,則將在每次執行時在暫存資料夾中建立和刪除虛擬環境。
env_vars (dict[str, str] | None) – 包含其他環境變數的字典,用於在執行虛擬環境時為其設定。
inherit_env (bool) – 是否在執行虛擬環境時繼承目前的環境變數。如果設定為
True
,則虛擬環境將繼承父程序 (os.environ
) 的環境變數。如果設定為False
,則虛擬環境將在乾淨的環境中執行。use_dill (bool) – 已棄用,請改用
serializer
。是否使用 dill 序列化引數和結果 (預設為 pickle)。這允許更複雜的類型,但需要您在需求中包含 dill。
- class airflow.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]¶
繼承自:
PythonVirtualenvOperator
,airflow.operators.branch.BranchMixIn
工作流程可以在虛擬環境中執行此任務後「分支」或遵循路徑。
它衍生自 PythonVirtualenvOperator,並預期一個 Python 函數,該函數返回單個 task_id、單個 task_group_id,或要遵循的 task_ids 和/或 task_group_ids 列表。返回的 task_id(s) 和/或 task_group_id(s) 應指向直接位於 {self} 下游的任務或任務組。所有其他「分支」或直接下游任務都標記為
skipped
狀態,以便這些路徑無法前進。skipped
狀態會向下游傳播,以允許 DAG 狀態填滿並推斷 DAG 運行狀態。另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: BranchPythonVirtualenvOperator
- class airflow.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]¶
基底類別:
_BasePythonVirtualenvOperator
在不會重新建立的 virtualenv 中執行函式。
重複使用,無需建立虛擬環境的額外開銷(但有一些注意事項)。
該函數必須使用 def 定義,且不能是類別的一部分。所有導入都必須在函數內部發生,並且不能引用範圍外的變數。名為 virtualenv_string_args 的全域範圍變數將可用(由 string_args 填充)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用傳回值。請注意,如果您的虛擬環境運行的 Python 主要版本與 Airflow 不同,則您無法使用傳回值、op_args、op_kwargs,或使用透過外掛程式提供給 Airflow 的任何巨集。但是您可以使用 string_args。
如果 Airflow 安裝在外部環境中的版本與運算子使用的版本不同,則運算子將會失敗。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: ExternalPythonOperator
- 參數
python (str) – 指向應使用的虛擬環境內 Python 二進位檔案的完整路徑字串(特定於檔案系統)(在
VENV/bin
資料夾中)。應為絕對路徑(因此通常以“/”或“X:/”開頭,具體取決於使用的檔案系統/作業系統)。python_callable (Callable) – 沒有外部變數參考的 python 函式,以 def 定義,將在虛擬環境中執行。
serializer (_SerializerTypeDef | None) –
用於序列化引數和結果的序列化程式。它可以是下列其中之一
"pickle"
: (預設) 使用 pickle 進行序列化。包含在 Python 標準程式庫中。"cloudpickle"
: 使用 cloudpickle 序列化更複雜的類型,這需要將 cloudpickle 包含在您的需求中。"dill"
: 使用 dill 序列化更複雜的類型,這需要將 dill 包含在您的需求中。
op_args (Collection[Any] | None) – 要傳遞至 python_callable 的位置引數清單。
op_kwargs (Mapping[str, Any] | None) – 要傳遞至 python_callable 的關鍵字引數字典。
string_args (Iterable[str] | None) – 存在於全域變數 virtualenv_string_args 中的字串,在執行階段可作為 list[str] 供 python_callable 使用。請注意,引數會依換行符號分隔。
templates_dict (dict | None) – 值為範本的字典,這些範本將在
__init__
和execute
之間由 Airflow 引擎套用範本,並在範本套用後在您的可呼叫物件的環境中使用templates_exts (list[str] | None) – 在處理已套用範本的欄位時要解析的檔案副檔名清單,例如
['.sql', '.hql']
expect_airflow (bool) – 預期目標環境中已安裝 Airflow。如果為 true,則當未安裝 Airflow 時,運算子將會引發警告,並且在啟動時會嘗試載入 Airflow 巨集。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此結束代碼結束,則將任務保留在
skipped
狀態 (預設值: None)。如果設定為None
,則任何非零結束代碼都將被視為失敗。env_vars (dict[str, str] | None) – 包含其他環境變數的字典,用於在執行虛擬環境時為其設定。
inherit_env (bool) – 是否在執行虛擬環境時繼承目前的環境變數。如果設定為
True
,則虛擬環境將繼承父程序 (os.environ
) 的環境變數。如果設定為False
,則虛擬環境將在乾淨的環境中執行。use_dill (bool) – 已棄用,請改用
serializer
。是否使用 dill 序列化引數和結果 (預設為 pickle)。這允許更複雜的類型,但需要您在需求中包含 dill。
- class airflow.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]¶
繼承自:
ExternalPythonOperator
,airflow.operators.branch.BranchMixIn
工作流程可以在此任務執行後「分支」或遵循路徑。
擴展了 ExternalPythonOperator,因此期望取得 Python:應該使用的虛擬環境(在
VENV/bin
資料夾中)。應為絕對路徑,因此它可以像 ExternalPythonOperator 一樣在單獨的虛擬環境中運行。另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: BranchExternalPythonOperator
- airflow.operators.python.get_current_context()[source]¶
檢索執行上下文字典,而無需更改使用者方法的簽章。
這是檢索執行上下文字典的最簡單方法。
舊樣式
def my_task(**context): ti = context["ti"]
新樣式
from airflow.operators.python import get_current_context def my_task(): context = get_current_context() ti = context["ti"]
僅當在運算子開始執行後調用此方法時,當前上下文才會有值。