使用命令列介面¶
本文檔旨在概述使用 CLI 時的所有常見任務。
注意
有關 CLI 命令的更多資訊,請參閱 命令列介面與環境變數參考
設定 Bash/Zsh 自動完成¶
當使用 bash (或 zsh
) 作為您的 shell 時,airflow
可以使用 argcomplete 進行自動完成。
對於所有啟用 argcomplete 的 python 應用程式的 全域啟用,請執行
sudo activate-global-python-argcomplete
對於永久性(但非全域性)airflow 啟用,請使用
register-python-argcomplete airflow >> ~/.bashrc
對於僅針對 airflow 的 argcomplete 一次性啟用,請使用
eval "$(register-python-argcomplete airflow)"

如果您使用 zsh
,請將以下內容新增至您的 .zshrc
autoload bashcompinit
bashcompinit
eval "$(register-python-argcomplete airflow)"
建立連線¶
有關使用 CLI 建立連線的資訊,請參閱 從 CLI 建立連線
將 DAG 結構匯出為圖片¶
Airflow 可讓您將 DAG 結構列印或儲存為圖片。這對於記錄或分享您的 DAG 結構很有用。您需要安裝 Graphviz。
例如,要將 example_complex
DAG 列印到終端機
airflow dags show example_complex
這會將渲染的 DAG 結構(類似於 圖形)以 DOT 格式列印到螢幕上。
支援多種檔案格式。若要使用它們,請新增參數 --save [filename].[format]
。
要將 example_complex
DAG 儲存為 PNG 檔案
airflow dags show example_complex --save example_complex.png
這會將以下圖片儲存為檔案

DAG 表示範例¶
支援以下檔案格式
bmp
canon
、dot
、gv
、xdot
、xdot1.2
、xdot1.4
cgimage
cmap
eps
exr
fig
gd
、gd2
gif
gtk
ico
imap
、cmapx
imap_np
、cmapx_np
ismap
jp2
jpg
、jpeg
、jpe
json
、json0
、dot_json
、xdot_json
pct
、pict
pic
plain
、plain-ext
png
pov
ps
ps2
psd
sgi
svg
、svgz
tga
tif
、tiff
tk
vml
、vmlz
vrml
wbmp
webp
xlib
x11
預設情況下,Airflow 會在 airflow.cfg
檔案的 [core]
區段中 dags_folder
選項指定的目錄中尋找 DAG。您可以使用 --subdir
參數選取新的目錄。
顯示 DAG 結構¶
有時您會處理包含複雜依賴關係的 DAG。預覽 DAG 以查看其是否正確會很有幫助。
如果您有 macOS,您可以將 iTerm2 與 imgcat 腳本一起使用,以在主控台中顯示 DAG 結構。您也需要安裝 Graphviz。
其他終端機不支援顯示高品質圖形。您可以將圖片轉換為文字形式,但其解析度會讓您無法閱讀。
若要執行此操作,您應該在 airflow dags show
命令中使用 --imgcat
切換。例如,如果您想要顯示 example_bash_operator
DAG,則可以使用以下命令
airflow dags show example_bash_operator --imgcat
您會看到與以下螢幕截圖類似的結果。

iTerm2 中的 DAG 預覽¶
格式化命令輸出¶
某些 Airflow 命令(例如 airflow dags list
或 airflow tasks states-for-dag-run
)支援 --output
旗標,允許使用者變更命令輸出的格式。可能的選項
table
- 將資訊渲染為純文字表格
simple
- 將資訊渲染為簡單表格,可由標準 linux 公用程式解析
json
- 以 json 字串形式渲染資訊
yaml
- 以有效的 yaml 形式渲染資訊
json
和 yaml
格式都可讓您更輕鬆地使用命令列工具(例如 jq 或 yq)來操作資料。例如
airflow tasks states-for-dag-run example_complex 2020-11-13T00:00:00+00:00 --output json | jq ".[] | {sd: .start_date, ed: .end_date}"
{
"sd": "2020-11-29T14:53:46.811030+00:00",
"ed": "2020-11-29T14:53:46.974545+00:00"
}
{
"sd": "2020-11-29T14:53:56.926441+00:00",
"ed": "2020-11-29T14:53:57.118781+00:00"
}
{
"sd": "2020-11-29T14:53:56.915802+00:00",
"ed": "2020-11-29T14:53:57.125230+00:00"
}
{
"sd": "2020-11-29T14:53:56.922131+00:00",
"ed": "2020-11-29T14:53:57.129091+00:00"
}
{
"sd": "2020-11-29T14:53:56.931243+00:00",
"ed": "2020-11-29T14:53:57.126306+00:00"
}
從元數據資料庫清除歷史記錄¶
注意
強烈建議您在執行 db clean
命令之前備份元數據資料庫。
db clean
命令的工作方式是從每個表格中刪除早於提供的 --clean-before-timestamp
的記錄。
您可以選擇性地提供要執行刪除的表格清單。如果未提供表格清單,則將包含所有表格。
您可以使用 --dry-run
選項來列印要清除的主要表格中的列計數。
預設情況下,db clean
會將清除的列封存到 _airflow_deleted__<table>__<timestamp>
形式的表格中。如果您不希望以這種方式保留資料,您可以提供參數 --skip-archive
。
從封存表格匯出已清除的記錄¶
db export-archived
命令會將封存表格(由 db clean
命令建立)的內容匯出為指定的格式,預設為 CSV 檔案。匯出的檔案將包含在 db clean
過程中從主要表格中清除的記錄。
您可以使用 --export-format
選項指定匯出格式。預設格式為 csv,目前也是唯一支援的格式。
您還必須使用 --output-path
選項指定要將資料匯出到的路徑位置。此位置必須存在。
其他選項包括:--tables
用於指定要匯出的表格,--drop-archives
用於在匯出後刪除封存表格。
刪除封存表格¶
如果在 db clean
過程中,您未使用 --skip-archive
選項(會刪除封存表格),您仍然可以使用 db drop-archived
命令刪除封存表格。此操作不可逆,建議您在使用 db export-archived
命令將表格備份到磁碟後再刪除它們。
您可以使用 --tables
選項指定要刪除的表格。如果未指定表格,則將刪除所有封存表格。
注意級聯刪除¶
請記住,某些表格具有使用 ON DELETE CASCADE
定義的外鍵關係,因此在一個表格中刪除可能會觸發其他表格中的刪除。例如,task_instance
表格鍵至 dag_run
表格,因此如果刪除 DagRun 記錄,則也會刪除其所有相關聯的任務執行個體。
DAG 執行個體的特殊處理¶
通常,Airflow 會透過查詢最新的 DagRun 來判斷下一個要執行的 DagRun。如果您刪除所有 DAG 執行個體,Airflow 可能會排程已完成的舊 DAG 執行個體,例如,如果您已設定 catchup=True
。因此,db clean
命令將保留最新的非手動觸發 DAG 執行個體,以維持排程的連續性。
可回填 DAG 的考量¶
並非所有 DAG 都設計用於 Airflow 的回填命令。但是對於那些 DAG,需要特別注意。如果您刪除 DAG 執行個體,並且您在包含已刪除 DAG 執行個體的日期範圍內執行回填,則這些執行個體將會重新建立並再次執行。因此,如果您的 DAG 屬於此類別,您可能需要避免刪除 DAG 執行個體,而僅清除其他大型表格,例如任務執行個體和日誌等。
升級 Airflow¶
執行 airflow db migrate --help
以取得使用詳細資訊。
降級 Airflow¶
注意
建議您在執行 db downgrade
或任何其他資料庫操作之前備份資料庫。
您可以使用 db downgrade
命令降級到特定的 Airflow 版本。或者,您可以提供 Alembic 修訂版本 ID 以降級到該版本。
如果您想要預覽命令但不執行它們,請使用選項 --show-sql-only
。
選項 --from-revision
和 --from-version
只能與 --show-sql-only
選項結合使用,因為在實際執行遷移時,我們應始終從目前的修訂版本降級。
有關 Airflow 版本與 Alembic 修訂版本之間的對應關係,請參閱 資料庫遷移參考。
匯出連線¶
您可以使用 CLI 從資料庫匯出連線。支援的檔案格式為 json
、yaml
和 env
。
您可以將目標檔案指定為參數
airflow connections export connections.json
或者,您可以指定 file-format
參數以覆寫檔案格式
airflow connections export /tmp/connections --file-format yaml
您也可以為 STDOUT 指定 -
airflow connections export -
JSON 格式包含一個物件,其中鍵包含連線 ID,值包含連線的定義。在此格式中,連線定義為 JSON 物件。以下是 JSON 檔案範例。
{
"airflow_db": {
"conn_type": "mysql",
"host": "mysql",
"login": "root",
"password": "plainpassword",
"schema": "airflow",
"port": null,
"extra": null
},
"druid_broker_default": {
"conn_type": "druid",
"host": "druid-broker",
"login": null,
"password": null,
"schema": null,
"port": 8082,
"extra": "{\"endpoint\": \"druid/v2/sql\"}"
}
}
YAML 檔案結構與 JSON 的結構類似。連線 ID 和一個或多個連線的定義的鍵值對。在此格式中,連線定義為 YAML 物件。以下是 YAML 檔案範例。
airflow_db:
conn_type: mysql
extra: null
host: mysql
login: root
password: plainpassword
port: null
schema: airflow
druid_broker_default:
conn_type: druid
extra: '{"endpoint": "druid/v2/sql"}'
host: druid-broker
login: null
password: null
port: 8082
schema: null
您也可以使用 .env
格式匯出連線。鍵是連線 ID,值是連線的序列化表示法,使用 Airflow 的連線 URI 格式或 JSON。若要使用 JSON,請提供選項 --serialization-format=json
,否則將使用 Airflow 連線 URI 格式。以下是使用兩種格式的 .env
檔案範例。
URI 範例
airflow_db=mysql://root:plainpassword@mysql/airflow
druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql
JSON 範例輸出
airflow_db={"conn_type": "mysql", "login": "root", "password": "plainpassword", "host": "mysql", "schema": "airflow"}
druid_broker_default={"conn_type": "druid", "host": "druid-broker", "port": 8082, "extra": "{\"endpoint\": \"druid/v2/sql\"}"}
測試 DAG 匯入錯誤¶
CLI 可用於透過 list-import-errors
子命令檢查任何已發現的 DAG 是否有匯入錯誤。可以建立自動化步驟,如果任何 DAG 無法匯入,則透過檢查命令輸出而失敗,尤其是在與 --output
一起使用以產生標準檔案格式時。例如,當沒有錯誤時,預設輸出為 No data found
,而 json 輸出為 []
。然後可以在 CI 或 pre-commit 中執行檢查,以加速審查程序和測試。
如果存在任何錯誤,則失敗的範例命令,使用 jq 來解析輸出
airflow dags list-import-errors --output=json | jq -e 'select(type=="array" and length == 0)'
該行可以直接新增至自動化,或者如果您想要列印輸出,則可以使用 tee
airflow dags list-import-errors | tee import_errors.txt && jq -e 'select(type=="array" and length == 0)' import_errors.txt
Jenkins 管道中的範例
stage('All DAGs are loadable') {
steps {
sh 'airflow dags list-import-errors | tee import_errors.txt && jq -e \'select(type=="array" and length == 0)\' import_errors.txt'
}
}
注意
為了使此功能準確運作,您必須確保 Airflow 不會在 stdout 中記錄任何其他文字。例如,您可能需要修正任何棄用警告、將 2>/dev/null
新增至您的命令,或者如果您有在外掛程式載入時產生日誌的外掛程式,則在 Airflow 組態中設定 lazy_load_plugins = True
。