從 1.10 升級到 2¶
Apache Airflow 2 是一個主要版本,本文檔旨在協助使用者從 Airflow 1.10.x 遷移到 Airflow 2
步驟 1:切換到 Python 3¶
Airflow 1.10 是最後一個支援 Python 2 的發布系列。Airflow 2.0.0 需要 Python 3.6+,並且已使用 Python 版本 3.6、3.7 和 3.8 進行測試。Python 3.9 支援從 Airflow 2.1.2 開始新增。
Airflow 2.3.0 放棄了對 Python 3.6 的支援。它已使用 Python 3.7、3.8、3.9、3.10 進行測試。
如果您有特定的任務仍然需要 Python 2,那麼您可以為此使用 @task.virtualenv
、@task.docker
或 @task.kubernetes
decorators。
有關 Python 2 和 Python 3 之間重大變更的列表,請參閱 CouchBaseDB 團隊的這篇實用部落格。
步驟 2:升級到 1.10.15¶
為了最大限度地減少使用者從 Airflow 1.10 升級到 Airflow 2.0 及更高版本的阻力,已建立 Airflow 1.10.15 a.k.a「橋樑版本」。這是最後一個 1.10 功能版本。Airflow 1.10.15 包含對各種功能的支援,這些功能已從 Airflow 2.0 回溯移植,以便使用者在升級到 Airflow 2.0 之前輕鬆測試其 Airflow 環境。
我們強烈建議所有升級到 Airflow 2.0 的使用者,首先升級到 Airflow 1.10.15 並測試其 Airflow 部署,然後再升級到 Airflow 2.0。Airflow 1.10.x 已於 2021 年 6 月 17 日終止生命週期。不會發布新的 Airflow 1.x 版本。
1.10.15 中的功能包括
1. Airflow 2.0 的大多數重大 DAG 和架構變更已回溯移植到 Airflow 1.10.15。此向後相容性並不意味著 1.10.15 將以與 Airflow 2.0 相同的方式處理這些 DAG。相反,這意味著大多數與 Airflow 2.0 相容的 DAG 將在 Airflow 1.10.15 中運作。此回溯移植將讓使用者有時間逐步修改其 DAG,而不會造成任何服務中斷。
2. 我們還將更新後的 Airflow 2.0 CLI 命令回溯移植到 Airflow 1.10.15,以便使用者可以在升級之前修改其腳本以與 Airflow 2.0 相容。
3. 對於 KubernetesExecutor 的使用者,我們已回溯移植 pod_template_file
功能,適用於 KubernetesExecutor,以及一個腳本,該腳本將根據您的 airflow.cfg
設定生成 pod_template_file
。若要生成此檔案,只需執行以下命令
airflow generate_pod_template -o <output file path>
執行此步驟後,只需在 airflow.cfg
的 kubernetes_executor
區段的 pod_template_file
組態中寫出此檔案的路徑即可
注意
在 Airflow 2.4.2 版本之前,kubernetes_executor
區段稱為 kubernetes
。
步驟 3:執行升級檢查腳本¶
升級到 Airflow 1.10.15 後,我們建議您安裝「升級檢查」腳本。這些腳本將讀取您的 airflow.cfg
和所有 DAG,並提供升級前所需所有變更的詳細報告。我們正在努力測試此腳本,我們的目標是任何可以通過這些測試的 Airflow 設定都能夠升級到 2.0 而不會出現任何問題。
pip install apache-airflow-upgrade-check
安裝完成後,請執行升級檢查腳本。
airflow upgrade_check
有關此過程的更多詳細資訊,請參閱此處 升級檢查腳本。
步驟 4:切換到 Backport Providers¶
現在您已在 Airflow 1.10.15 中設定了 Python 3.6+ 環境,您可以開始將 DAG 移植到 Airflow 2.0 相容性!
此轉換中最重要的一步也是最容易分階段進行的一步。所有 Airflow 2.0 operators 都使用 backport provider 套件向後相容於 Airflow 1.10。您可以自行決定時間,透過 pip 安裝 provider 並變更匯入路徑,來轉換為使用這些 backport-providers。
例如:在過去,您可能會以這種方式匯入 DockerOperator
from airflow.operators.docker_operator import DockerOperator
您現在將執行此命令來安裝 provider
pip install apache-airflow-backport-providers-docker
然後使用此路徑匯入 operator
from airflow.providers.docker.operators.docker import DockerOperator
請注意,backport provider 套件只是與 Airflow 2.0 相容的 provider 套件的回溯移植。例如
pip install 'apache-airflow[docker]'
自動安裝 apache-airflow-providers-docker
套件。但是您可以與 Airflow 核心分開管理/升級/移除 provider 套件。
升級到 Apache Airflow 2.0 後,當您安裝 Airflow extras 時,將自動安裝這些 provider 套件。當您安裝 Airflow 時,即使沒有 extras,也會自動安裝幾個 providers(http、ftp、sqlite、imap)。您可以在 Provider packages 中閱讀有關 providers 的更多資訊。
步驟 5:升級 Airflow DAG¶
變更範本中未定義變數的處理方式
在 Airflow 2.0 之前,Jinja 範本允許使用未定義的變數。它們會呈現為空字串,且不會向使用者指示使用了未定義的變數。在此版本中,任何涉及未定義變數的範本呈現都會使任務失敗,並且在呈現時會在 UI 中顯示錯誤。
當實例化 DAG 時,可以還原此行為。
import jinja2
dag = DAG("simple_dag", template_undefined=jinja2.Undefined)
或者,也可以透過使用 | default
Jinja 篩選器(如下所示)來個別覆寫每個 Jinja 範本變數。
{{a | default(1)}}
KubernetesPodOperator 的變更
與 KubernetesExecutor
非常相似,KubernetesPodOperator
將不再採用 Airflow 自訂類別,而是期望 pod_template yaml 檔案或 kubernetes.client.models
物件。
一個值得注意的例外是,我們將繼續支援 airflow.providers.cncf.kubernetes.secret.Secret
類別。
以前,使用者會匯入每個個別類別來建置 pod,如下所示
from airflow.kubernetes.pod import Port
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume_mount import VolumeMount
volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
volume = Volume(name="test-volume", configs=volume_config)
volume_mount = VolumeMount("test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True)
port = Port("http", 80)
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env],
ports=[port],
volumes=[volume],
volume_mounts=[volume_mount],
name="airflow-test-pod",
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
hostnetwork=False,
tolerations=tolerations,
configmaps=configmaps,
init_containers=[init_container],
priority_class_name="medium",
)
現在,使用者可以使用 kubernetes.client.models
類別作為建立所有 k8s 物件的單一進入點。
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.secret import Secret
configmaps = ["test-configmap-1", "test-configmap-2"]
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
port = k8s.V1ContainerPort(name="http", container_port=80)
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env],
ports=[port],
volumes=[volume],
volume_mounts=[volume_mount],
name="airflow-test-pod",
task_id="task",
is_delete_operator_pod=True,
hostnetwork=False,
)
我們決定保留 Secret 類別,因為使用者似乎非常喜歡它,它簡化了將 Kubernetes secrets 掛載到 workers 中的複雜性。
有關 KubernetesPodOperator API 變更的更詳細列表,請閱讀附錄中標題為「KubernetesPodOperator 的變更參數」的章節
變更 dag_run_conf_overrides_params 的預設值
DagRun 組態字典現在預設會覆寫 params 字典。如果您透過 airflow dags backfill -c
或 airflow dags trigger -c
傳遞一些鍵值對,則這些鍵值對將覆寫 params 中的現有鍵值對。您可以透過在 airflow.cfg
中將 dag_run_conf_overrides_params
設定為 False
來還原此行為。
DAG 探索安全模式現在不區分大小寫
當 DAG_DISCOVERY_SAFE_MODE
處於活動狀態時,Airflow 現在將以不區分大小寫的模式篩選所有包含字串 airflow
和 dag
的檔案。變更此設定是為了更好地支援新的 @dag
decorator。
權限變更
DAG 層級的權限動作 can_dag_read
和 can_dag_edit
在 Airflow 2.0 中已棄用。它們將被 can_read
和 can_edit
取代。當角色被賦予 DAG 層級的存取權限時,資源名稱(或 Flask App-Builder 術語中的「檢視選單」)現在將以 DAG:
為前綴。因此,example_dag_id
上的動作 can_dag_read
現在表示為 DAG:example_dag_id
上的 can_read
。有一個名為 DAGs
的特殊檢視(在 1.10.x 版本中稱為 all_dags
),允許角色存取所有 DAG。預設的 Admin
、Viewer
、User
、Op
角色都可以存取 DAGs
檢視。
作為執行 ``airflow db migrate`` 的一部分,現有的權限將為您遷移。
當使用設定的 access_control
變數初始化 DAG 時,任何舊權限名稱的使用都會在資料庫中自動更新,因此這不會是重大變更。將會引發 DeprecationWarning。
捨棄舊版 UI,改用 FAB RBAC UI
警告
重大變更
- 先前我們使用兩個版本的 UI
非 RBAC UI
Flask App Builder RBAC UI
這很難維護,因為這意味著我們必須在兩個地方實作/更新功能。在此版本中,我們已移除舊版 UI,改用 Flask App Builder RBAC UI,從而減輕了巨大的維護負擔。不再需要在組態中明確設定 RBAC UI,因為它是唯一的預設 UI。
如果您先前使用非 RBAC UI,則必須切換到新的 RBAC-UI 並建立使用者才能存取 Airflow 的 Webserver。有關建立使用者的 CLI 的更多詳細資訊,請參閱 命令列介面和環境變數參考
請注意,自訂驗證後端需要重新編寫以針對新的基於 FAB 的 UI。
作為此變更的一部分,已移除 [webserver]
區段中的一些組態項目,且不再適用,包括 authenticate
、filter_by_owner
、owner_mode
和 rbac
。
在升級到此版本之前,我們建議啟用新的 FAB RBAC UI。為此,您應該將 airflow.cfg
檔案中 [webserver]
中的 rbac
選項設定為 True
[webserver]
rbac = True
為了登入介面,您需要建立管理員帳戶。
假設您已安裝 Airflow 1.10.15,您可以使用 Airflow 2.0 CLI 命令語法 airflow users create
建立使用者。您無需變更組態檔案,因為 FAB RBAC UI 是唯一支援的 UI。
airflow users create \
--role Admin \
--username admin \
--firstname FIRST_NAME \
--lastname LAST_NAME \
--email EMAIL@example.org
OAuth 中的重大變更
注意
當 Airflow Webserver 的多個副本正在執行時,它們需要共用相同的 secret_key 才能存取相同的使用者會話。透過任何組態機制注入此金鑰。1.10.15 橋樑版本修改了此功能,以使用隨機生成的 secret keys 而不是不安全的預設值,並且可能會破壞依賴預設值的現有部署。Webserver 金鑰也用於授權對 Celery workers 的請求,以便在檢索日誌時使用。但是,使用 secret key 生成的 token 具有較短的到期時間 - 請確保您在其上執行 Airflow 元件的所有機器上的時間已同步(例如使用 ntpd),否則當存取日誌時,您可能會收到「forbidden」錯誤。
flask-oauthlib
已被 authlib
取代,因為 flask-oauthlib
已被棄用,而改用 authlib
。已變更的新舊 provider 組態金鑰如下
舊金鑰 |
新金鑰 |
---|---|
|
|
|
|
|
|
|
|
有關更多資訊,請造訪 https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
Pendulum 支援中的重大變更
Airflow 已從 Pendulum 1.x 升級到 Pendulum 2.x。這帶來了一些重大變更,因為 Pendulum 2.x 中的某些方法及其定義已變更或已移除。
例如,以下程式碼片段現在將拋出錯誤
execution_date.format("YYYY-MM-DD HH:mm:ss", formatter="alternative")
因為 Pendulum 2.x 中不支援 formatter
選項,並且預設使用 alternative
。
有關更多資訊,請造訪 https://pendulum.eustace.io/blog/pendulum-2.0.0-is-out.html
步驟 6:升級組態設定¶
Airflow 2.0 對於組態資料的期望更加嚴格,並且在更多情況下需要明確指定組態值,而不是預設為通用值。
其中一些在升級檢查指南中詳細說明,但一個重要的變更領域是 Kubernetes Executor。以下針對 Kubernetes Executor 的使用者呼叫。
升級 KubernetesExecutor 設定
KubernetesExecutor 將不再從 airflow.cfg 讀取基礎 Pod 組態。
在 Airflow 2.0 中,KubernetesExecutor 將需要以 yaml 格式編寫的基礎 pod 範本。此檔案可以存在於主機上的任何位置,並將使用 airflow.cfg
檔案中的 pod_template_file
組態連結。您可以透過執行以下命令來建立 pod_template_file
:airflow generate_pod_template
airflow.cfg
仍然接受 worker_container_repository
、worker_container_tag
和預設命名空間的值。
以下 airflow.cfg
值將被棄用
worker_container_image_pull_policy
airflow_configmap
airflow_local_settings_configmap
dags_in_image
dags_volume_subpath
dags_volume_mount_point
dags_volume_claim
logs_volume_subpath
logs_volume_claim
dags_volume_host
logs_volume_host
env_from_configmap_ref
env_from_secret_ref
git_repo
git_branch
git_sync_depth
git_subpath
git_sync_rev
git_user
git_password
git_sync_root
git_sync_dest
git_dags_folder_mount_point
git_ssh_key_secret_name
git_ssh_known_hosts_configmap_name
git_sync_credentials_secret
git_sync_container_repository
git_sync_container_tag
git_sync_init_container_name
git_sync_run_as_user
worker_service_account_name
image_pull_secrets
gcp_service_account_keys
affinity
tolerations
run_as_user
fs_group
[kubernetes_node_selectors]
[kubernetes_annotations]
[kubernetes_environment_variables]
[kubernetes_secrets]
[kubernetes_labels]
``executor_config`` 現在在啟動任務時將期望 ``kubernetes.client.models.V1Pod`` 類別
在 Airflow 1.10.x 中,使用者可以透過將字典傳遞給 executor_config
變數來在執行時修改任務 pod。使用者現在將可以透過 kubernetes.client.models.V1Pod
完全存取 Kubernetes API。
在已棄用的版本中,使用者將使用以下字典掛載卷
second_task = PythonOperator(
task_id="four_task",
python_callable=test_volume_mount,
executor_config={
"KubernetesExecutor": {
"volumes": [
{
"name": "example-kubernetes-test-volume",
"hostPath": {"path": "/tmp/"},
},
],
"volume_mounts": [
{
"mountPath": "/foo/",
"name": "example-kubernetes-test-volume",
},
],
}
},
)
在新的模型中,使用者可以使用 pod_override
金鑰下的以下程式碼來完成相同的操作
from kubernetes.client import models as k8s
@task(
task_id="four_task",
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/foo/",
name="example-kubernetes-test-volume",
)
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
)
},
)
def test_volume_mount():
pass
second_task = test_volume_mount()
對於 Airflow 2.0,傳統的 executor_config
將繼續運作並顯示棄用警告,但在未來版本中將會移除。
步驟 7:升級到 Airflow 2¶
在執行如上所述的升級檢查、安裝 backported providers、修改 DAG 以使其相容以及更新組態設定之後,您應該已準備好升級到 Airflow 2.0。
最後一次執行升級檢查始終是一個好主意,以確保您沒有遺漏任何內容。在此階段,偵測到的問題應該為零或最少,您計劃在升級 Airflow 版本後修復這些問題。
此時,只需遵循標準的 Airflow 版本升級過程
確保您的 Airflow Meta 資料庫已備份
暫停所有 DAG 並確保沒有任何活動正在執行
暫停 DAG 的原因是確保在稍後步驟中將進行的資料庫升級期間,沒有任何活動正在寫入資料庫。
為了格外小心,最好在暫停 DAG 後進行資料庫備份。
安裝/升級 Airflow 版本到所選的 2.0 版本
確保安裝正確的 providers
這可以透過使用「extras」選項作為 Airflow 安裝的一部分,或個別安裝 providers 來完成。
請注意,如果您使用 pip 安裝,您可能必須先卸載 backport providers,然後再安裝新的 providers。如果您使用具有一組指定需求的 Airflow Docker 映像安裝,則這不適用,在這種情況下,變更會自動取得一組全新的模組。
您可以在 Provider packages 中閱讀有關 providers 的更多資訊。
使用
airflow db migrate
遷移 Airflow Meta 資料庫。上述命令可能不熟悉,因為它顯示了 Airflow 2.0 CLI 語法。
資料庫升級可能會根據需要修改資料庫結構描述,並將現有資料對應到符合更新的資料庫結構描述。
注意
資料庫升級可能需要一段時間,具體取決於資料庫中 DAG 的數量以及資料庫中儲存的任務歷史記錄、xcom 變數等的歷史記錄量。在我們的測試中,我們看到在 PostgreSQL 上具有大約 35,000 個任務實例和 500 個 DAG 的 Airflow 資料庫上,將 Airflow 資料庫從 Airflow 1.10.15 升級到 Airflow 2.0 花費了兩到三分鐘。為了更快的資料庫升級和更好的整體效能,建議您定期封存不再有價值的舊歷史元素。
重新啟動 Airflow Scheduler、Webserver 和 Workers
附錄¶
KubernetesPodOperator 的變更參數¶
Port 已從 list[Port] 遷移到 list[V1ContainerPort]
之前
from airflow.kubernetes.pod import Port
port = Port("http", 80)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
ports=[port],
task_id="task",
)
之後
from kubernetes.client import models as k8s
port = k8s.V1ContainerPort(name="http", container_port=80)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
ports=[port],
task_id="task",
)
Volume_mounts 已從 list[VolumeMount] 遷移到 list[V1VolumeMount]
之前
from airflow.kubernetes.volume_mount import VolumeMount
volume_mount = VolumeMount("test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volume_mounts=[volume_mount],
task_id="task",
)
之後
from kubernetes.client import models as k8s
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volume_mounts=[volume_mount],
task_id="task",
)
Volume 已從 list[Volume] 遷移到 list[V1Volume]
之前
from airflow.kubernetes.volume import Volume
volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
volume = Volume(name="test-volume", configs=volume_config)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volumes=[volume],
task_id="task",
)
之後
from kubernetes.client import models as k8s
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volumes=[volume],
task_id="task",
)
env_vars 已從 dict 遷移到 list[V1EnvVar]
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars={"ENV1": "val1", "ENV2": "val2"},
task_id="task",
)
之後
from kubernetes.client import models as k8s
env_vars = [
k8s.V1EnvVar(name="ENV1", value="val1"),
k8s.V1EnvVar(name="ENV2", value="val2"),
]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars=env_vars,
task_id="task",
)
PodRuntimeInfoEnv 已移除
PodRuntimeInfoEnv 現在可以作為 V1EnvVarSource
新增到 env_vars
變數中
之前
from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
task_id="task",
)
之後
from kubernetes.client import models as k8s
env_vars = [
k8s.V1EnvVar(
name="ENV3",
value_from=k8s.V1EnvVarSource(field_ref=k8s.V1ObjectFieldSelector(field_path="status.podIP")),
)
]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars=env_vars,
task_id="task",
)
configmaps 已移除
Configmaps 現在可以作為 V1EnvVarSource
新增到 env_from
變數中
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
configmaps=["test-configmap"],
task_id="task",
)
之後
from kubernetes.client import models as k8s
configmap = "test-configmap"
env_from = [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap))]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_from=env_from,
task_id="task",
)
Resources 已從 Dict 遷移到 V1ResourceRequirements
之前
resources = {
"limit_cpu": 0.25,
"limit_memory": "64Mi",
"limit_ephemeral_storage": "2Gi",
"request_cpu": "250m",
"request_memory": "64Mi",
"request_ephemeral_storage": "1Gi",
}
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
resources=resources,
)
之後
from kubernetes.client import models as k8s
resources = k8s.V1ResourceRequirements(
requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
limits={
"memory": "64Mi",
"cpu": 0.25,
"nvidia.com/gpu": None,
"ephemeral-storage": "2Gi",
},
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test-" + str(random.randint(0, 1000000)),
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
resources=resources,
)
image_pull_secrets 已從 String 遷移到 list[k8s.V1LocalObjectReference]
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
name="test",
task_id="task",
image_pull_secrets="fake-secret",
cluster_context="default",
)
之後
quay_k8s = KubernetesPodOperator(
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
name="airflow-private-image-pod",
task_id="task-two",
)
從實驗性 API 遷移到穩定 API v1 的指南¶
在 Airflow 2.0 中,我們新增了新的 REST API。實驗性 API 仍然有效,但未來可能會捨棄支援。
但是,實驗性 API 不需要驗證,因此預設情況下已停用。如果您想使用實驗性 API,則需要明確啟用它。如果您的應用程式仍在使用實驗性 API,則應認真考慮遷移到穩定 API。
穩定 API 公開了許多可透過 Webserver 取得的端點。以下是兩個端點之間的差異,可協助您從實驗性 REST API 遷移到穩定 REST API。
基礎端點
穩定 API v1 的基礎端點是 /api/v1/
。您必須將實驗性基礎端點從 /api/experimental/
變更為 /api/v1/
。下表顯示了差異
目的 |
實驗性 REST API 端點 |
穩定 REST API 端點 |
---|---|---|
建立 DAGRuns(POST) |
|
|
列出 DAGRuns(GET) |
|
|
檢查健康狀態(GET) |
|
|
任務資訊(GET) |
|
|
TaskInstance 公共變數(GET) |
|
|
暫停 DAG(PATCH) |
|
|
暫停 DAG 的資訊(GET) |
|
|
最新的 DAG Runs(GET) |
|
|
取得所有 pools(GET) |
|
|
建立 pool(POST) |
|
|
刪除 pool(DELETE) |
|
|
DAG Lineage(GET) |
|
|
此端點 /api/v1/dags/{dag_id}/dagRuns
也允許您使用查詢字串中的參數(例如 start_date
、end_date
、execution_date
等)篩選 dag_runs。因此,先前由此端點執行的操作
/api/experimental/dags/<string:dag_id>/dag_runs/<string:execution_date>
現在可以使用此端點 (/api/v1/dags/{dag_id}/dagRuns
) 的查詢字串中的篩選器參數來處理。有關更多資訊,請查看穩定 API 參考文件
從 DAG 回呼的例外處理變更¶
DAG 回呼中的例外狀況過去會使 Airflow Scheduler 崩潰。作為我們使 Scheduler 效能更高且更可靠的努力的一部分,我們已將此行為變更為改為記錄例外狀況。最重要的是,新增了一個新的 dag.callback_exceptions 計數器指標,以幫助更好地監控回呼例外狀況。
遷移到 TaskFlow API¶
Airflow 2.0 引入了 TaskFlow API,以簡化 Python 可呼叫任務的宣告。鼓勵使用者將經典 operators 替換為其 TaskFlow decorator 替代方案。有關詳細資訊,請參閱 使用 TaskFlow。
經典 Operator |
TaskFlow Decorator |
---|---|
|
|
|
|
|
|
|
|
|
|
Airflow 2.0 中的 CLI 變更¶
Airflow CLI 已組織,使相關命令組合在一起作為子命令,這表示如果您在腳本中使用這些命令,則必須對其進行變更。
本節描述已進行的變更,以及您更新腳本所需執行的動作。從命令列操作使用者的功能已變更。airflow create_user
、airflow delete_user
和 airflow list_users
已被歸類到單一命令 airflow users
,並帶有可選旗標 create
、list
和 delete
。airflow list_dags
命令現在是 airflow dags list
,airflow pause
是 airflow dags pause
,依此類推。
在 Airflow 1.10 和 2.0 中,有一個 airflow config
命令,但行為上有所不同。在 Airflow 1.10 中,它會印出所有配置選項,而在 Airflow 2.0 中,它是一個命令群組。airflow config
現在是 airflow config list
。您可以執行命令 airflow config --help
來查看其他選項
如需更新後的 CLI 命令完整列表,請參閱 https://airflow.dev.org.tw/cli.html。
您可以透過執行 airflow --help
來了解這些命令。例如,若要取得關於 celery
群組命令的說明,您必須執行 help 命令:airflow celery --help
。
舊命令 |
新命令 |
群組 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
``users`` 群組的範例用法
建立新使用者
airflow users create --username jondoe --lastname doe --firstname jon --email jdoe@apache.org --role Viewer --password test
列出使用者
airflow users list
刪除使用者
airflow users delete --username jondoe
將使用者新增至角色
airflow users add-role --username jondoe --role Public
從角色中移除使用者
airflow users remove-role --username jondoe --role Public
在 CLI 中針對簡短選項樣式變更,完全使用單一字元
對於 Airflow 簡短選項,請完全使用一個單一字元。新命令可依照下表使用
舊命令 |
新命令 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
對於 Airflow 長選項,請使用 kebab-case 而非 snake_case
舊選項 |
新選項 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
從 CLI 移除 serve_logs 命令
serve_logs
命令已被刪除。此命令應僅由內部應用程式機制執行,並且不需要從 CLI 介面存取。
dag_state CLI 命令
如果 DAGRun 是透過傳入 conf 鍵/值觸發的,它們也會在 dag_state CLI 回應中印出,例如:running, {“name”: “bob”},而在先前的版本中,它只會印出狀態:例如:running
在 backfill 命令中,棄用 ignore_first_depends_on_past 並預設為 True
當使用 depends_on_past
dags 執行回填時,使用者將需要傳遞 --ignore-first-depends-on-past
。我們應該將其預設為 true
以避免混淆
Airflow 外掛程式的變更¶
如果您正在使用 Airflow 外掛程式,並且傳遞了 admin_views
& menu_links
,這些在非 RBAC UI(基於 flask-admin
的 UI)中使用,請將其更新為使用 flask_appbuilder_views
和 flask_appbuilder_menu_links
。
舊版:
from airflow.plugins_manager import AirflowPlugin
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink
class TestView(BaseView):
@expose("/")
def test(self):
# in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
return self.render("test_plugin/test.html", content="Hello galaxy!")
v = TestView(category="Test Plugin", name="Test View")
ml = MenuLink(category="Test Plugin", name="Test Menu Link", url="https://airflow.dev.org.tw/")
class AirflowTestPlugin(AirflowPlugin):
admin_views = [v]
menu_links = [ml]
將其變更為:
from airflow.plugins_manager import AirflowPlugin
from flask_appbuilder import expose, BaseView as AppBuilderBaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
default_view = "test"
@expose("/")
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
"name": "Test View",
"category": "Test Plugin",
"view": v_appbuilder_view,
}
# Creating a flask appbuilder Menu Item
appbuilder_mitem = {
"name": "Google",
"category": "Search",
"category_icon": "fa-th",
"href": "https://www.google.com",
}
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem]
extras 名稱的變更¶
all
extra 已縮減為僅包含面向使用者的依賴項。這表示此 extra 不包含開發依賴項。如果您正在使用它並且依賴開發套件,則應使用 devel_all
。
對 Airflow 1.10.x 版本的支援¶
Airflow 1.10.x 已於 2021 年 6 月 17 日終止生命週期。將不再發布新的 Airflow 1.x 版本。
Backport providers 的支援已於 2021 年 3 月 17 日結束。將不再發布新版本的 backport providers。
我們計劃對我們的版本控制和發布流程採取嚴格的語義化版本控制方法。這表示我們不計劃在 2.* 版本中進行任何向後不相容的變更。任何重大變更,包括在 Airflow 2.0 中棄用的功能的移除,都將作為 Airflow 3.0 版本的一部分發生。