airflow.providers.standard.operators.bash
¶
模組內容¶
類別¶
執行 Bash 腳本、命令或一組命令。 |
- class airflow.providers.standard.operators.bash.BashOperator(*, bash_command, env=None, append_env=False, output_encoding='utf-8', skip_on_exit_code=99, cwd=None, output_processor=lambda result: ..., **kwargs)[原始碼]¶
基底:
airflow.models.baseoperator.BaseOperator
執行 Bash 腳本、命令或一組命令。
另請參閱
如需更多關於如何使用此運算子的資訊,請查看指南: BashOperator
如果 BaseOperator.do_xcom_push 為 True,當 bash 命令完成時,寫入 stdout 的最後一行也會被推送到 XCom
- 參數
bash_command (str | airflow.utils.types.ArgNotSet) – 要執行的命令、一組命令或對 Bash 腳本的參考(必須是 '.sh' 或 '.bash')。 (樣板化)
env (dict[str, str] | None) – 如果 env 不是 None,則它必須是一個字典,用於定義新進程的環境變數;這些變數將取代繼承當前進程環境的方式,後者是預設行為。(樣板化)
append_env (bool) – 如果為 False(預設值),則使用在 env 參數中傳遞的環境變數,並且不繼承當前進程環境。如果為 True,則繼承來自當前傳遞的環境變數,然後使用者傳遞的環境變數將更新現有的繼承環境變數,或將新變數附加到其中
output_encoding (str) – Bash 命令的輸出編碼
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果任務以此退出代碼退出,則將任務保持在
skipped
狀態(預設值:99)。如果設定為None
,任何非零退出代碼都將被視為失敗。cwd (str | None) – 在其中執行命令的工作目錄(樣板化)。如果為 None(預設值),則命令在臨時目錄中執行。若要使用目前的 DAG 資料夾作為工作目錄,您可以設定樣板
{{ dag_run.dag.folder }}
。當 bash_command 是 '.sh' 或 '.bash' 檔案時,Airflow 必須具有對工作目錄的寫入權限。腳本將在此目錄中呈現(Jinja 樣板)到一個新的臨時檔案中。output_processor (Callable[[str], Any]) – 進一步處理 bash 腳本輸出的函數(預設值為 lambda output: output)。
Airflow 將評估 Bash 命令的退出代碼。一般來說,非零退出代碼將導致任務失敗,而零將導致任務成功。退出代碼
99
(或在skip_on_exit_code
中設定的另一個代碼)將拋出airflow.exceptions.AirflowSkipException
,這將使任務處於skipped
狀態。您可以透過設定skip_on_exit_code=None
將所有非零退出代碼視為失敗。退出代碼
行為
0
成功
skip_on_exit_code (預設值:99)
否則
注意
除非整個 shell 以非零退出代碼退出,否則 Airflow 將無法識別非零退出代碼。如果非零退出代碼來自子命令,則可能會出現此問題。解決此問題的最簡單方法是在命令前加上
set -e;
bash_command = "set -e; python3 script.py '{{ data_interval_end }}'"
注意
若要簡單地執行
.sh
或.bash
腳本(不使用任何 Jinja 樣板),請在腳本名稱bash_command
參數後方新增一個空格 - 例如bash_command="my_script.sh "
。這是因為當 Airflow 檔案以.sh
或.bash
結尾時,會嘗試載入並將其作為 Jinja 樣板處理。如果您的腳本中有 Jinja 樣板,請勿放置任何空白字元。並將腳本的目錄新增至 DAG 的
template_searchpath
。如果您指定cwd
,Airflow 必須具有對此目錄的寫入權限。腳本將在此目錄中呈現(Jinja 樣板)到一個新的臨時檔案中。警告
應注意「使用者」輸入或在
bash_command
中使用 Jinja 樣板,因為此 bash 運算子不會對命令執行任何跳脫或清理。這主要適用於使用「dag_run」conf,因為可以透過 Web UI 中的使用者提交。大多數預設樣板變數沒有風險。
例如,不要 這樣做
bash_task = BashOperator( task_id="bash_task", bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"', )
相反地,您應該透過
env
kwarg 傳遞此值,並在 bash_command 內使用雙引號,如下所示bash_task = BashOperator( task_id="bash_task", bash_command="echo \"here is the message: '$message'\"", env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'}, )
在版本 2.10.0 中新增: output_processor 參數。
- template_fields: collections.abc.Sequence[str] = ('bash_command', 'env', 'cwd')[原始碼]¶
- template_ext: collections.abc.Sequence[str] = ('.sh', '.bash')[原始碼]¶