airflow.operators.bash
¶
模組內容¶
類別¶
執行 Bash 指令稿、命令或一組命令。 |
- class airflow.operators.bash.BashOperator(*, bash_command, env=None, append_env=False, output_encoding='utf-8', skip_exit_code=None, skip_on_exit_code=99, cwd=None, output_processor=lambda result: ..., **kwargs)[原始碼]¶
基底類別:
airflow.models.baseoperator.BaseOperator
執行 Bash 指令稿、命令或一組命令。
另請參閱
如需關於如何使用此 operator 的更多資訊,請參閱指南: BashOperator
如果 BaseOperator.do_xcom_push 為 True,則當 bash 命令完成時,寫入 stdout 的最後一行也將被推送到 XCom
- 參數
bash_command (str | airflow.utils.types.ArgNotSet) – 要執行的命令、一組命令或 Bash 指令稿的參考(必須為 '.sh' 或 '.bash')。 (已套用範本)
env (dict[str, str] | None) – 如果 env 不是 None,則它必須是一個字典,用於定義新程序的環境變數;這些變數將取代繼承當前程序環境,這是預設行為。(已套用範本)
append_env (bool) – 如果為 False(預設),則使用 env 參數中傳遞的環境變數,且不繼承當前程序的環境。如果為 True,則繼承來自當前程序的環境變數,然後使用者傳遞的環境變數將更新現有的繼承環境變數,或將新變數附加到其中
output_encoding (str) – Bash 命令的輸出編碼
skip_on_exit_code (int | Container[int] | None) – 如果任務以此退出代碼退出,則將任務保持在
skipped
狀態(預設值:99)。如果設定為None
,則任何非零退出代碼都將被視為失敗。cwd (str | None) – 在其中執行命令的工作目錄(已套用範本)。如果為 None(預設值),則命令在臨時目錄中執行。若要使用當前 DAG 資料夾作為工作目錄,您可以設定範本
{{ dag_run.dag.folder }}
。當 bash_command 是 '.sh' 或 '.bash' 檔案時,Airflow 必須具有對工作目錄的寫入權限。指令稿將在此目錄中呈現(Jinja 範本)到一個新的臨時檔案中。output_processor (Callable[[str], Any]) – 用於進一步處理 bash 指令稿輸出的函式(預設值為 lambda output: output)。
Airflow 將評估 Bash 命令的退出代碼。一般來說,非零退出代碼將導致任務失敗,而零將導致任務成功。退出代碼
99
(或在skip_on_exit_code
中設定的另一個代碼)將拋出airflow.exceptions.AirflowSkipException
,這將使任務處於skipped
狀態。您可以將所有非零退出代碼視為失敗,方法是設定skip_on_exit_code=None
。退出代碼
行為
0
成功
skip_on_exit_code (預設值:99)
否則
注意
除非整個 shell 以非零退出代碼退出,否則 Airflow 將無法識別非零退出代碼。如果非零退出代碼來自子命令,則可能會出現問題。解決此問題的最簡單方法是在命令前加上
set -e;
bash_command = "set -e; python3 script.py '{{ next_execution_date }}'"
注意
若要簡單地執行
.sh
或.bash
指令稿(不使用任何 Jinja 範本),請在指令稿名稱bash_command
參數後新增一個空格 – 例如bash_command="my_script.sh "
。這是因為當檔案以.sh
或.bash
結尾時,Airflow 會嘗試載入此檔案並將其作為 Jinja 範本處理。如果您的指令稿中有 Jinja 範本,請勿放置任何空格。並在 DAG 的
template_searchpath
中新增指令稿的目錄。如果您指定cwd
,則 Airflow 必須具有對此目錄的寫入權限。指令稿將在此目錄中呈現(Jinja 範本)到一個新的臨時檔案中。警告
使用「使用者」輸入或在
bash_command
中使用 Jinja 範本時應謹慎,因為此 bash operator 不會對命令執行任何跳脫或清理。這主要適用於使用「dag_run」conf,因為這可以透過 Web UI 中的使用者提交。大多數預設範本變數沒有風險。
例如,不要 這樣做
bash_task = BashOperator( task_id="bash_task", bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"', )
相反地,您應該透過
env
kwarg 傳遞此內容,並在 bash_command 內部使用雙引號,如下所示bash_task = BashOperator( task_id="bash_task", bash_command="echo \"here is the message: '$message'\"", env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'}, )
2.10.0 版本新增: output_processor 參數。