XComs¶
XComs(“跨通訊”的縮寫)是一種機制,讓任務彼此溝通,因為預設情況下,任務是完全隔離的,並且可能在完全不同的機器上運行。
XCom 由一個 key
(本質上是它的名稱)以及它來自的 task_id
和 dag_id
識別。它們可以具有任何可序列化的值(包括用 @dataclass
或 @attr.define
修飾的物件,請參閱 TaskFlow 參數:),但它們僅設計用於少量資料;請勿使用它們傳遞大量值,例如資料框架。
XCom 使用 Task Instances 上的 xcom_push
和 xcom_pull
方法顯式地“推送”和“拉取”到/從其儲存。
要在名為 “task-1” 的任務中推送一個值,該值將被另一個任務使用
# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)
要拉取上面程式碼中推送的值在另一個不同的任務中
# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
許多運算子會將其結果自動推送到名為 return_value
的 XCom 鍵中,如果 do_xcom_push
參數設定為 True
(預設情況下是這樣),並且 @task
函數也會這樣做。xcom_pull
預設使用 return_value
作為鍵,如果沒有鍵傳遞給它,這表示可以編寫像這樣的程式碼
# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')
您也可以在 範本 中使用 XComs
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
XComs 是 變數 的相關概念,主要區別在於 XComs 是每個任務實例的,專為 DAG 運行內的通訊而設計,而變數是全局的,專為整體組態和值共享而設計。
如果您想一次推送多個 XCom 或重新命名推送的 XCom 鍵,您可以使用設定 do_xcom_push
和 multiple_outputs
參數為 True
,然後返回一個值字典。
注意
如果首次任務運行未成功,則在每次重試任務時,XComs 將被清除,以使任務運行具有冪等性。
物件儲存 XCom 後端¶
預設的 XCom 後端是 BaseXCom
類別,它將 XComs 儲存在 Airflow 資料庫中。這對於小值來說很好,但對於大值或大量 XComs 來說可能會出現問題。
要啟用將 XComs 儲存在物件儲存中,您可以將 xcom_backend
組態選項設定為 airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
。您還需要將 xcom_objectstorage_path
設定為所需的位置。連線 ID 從您將提供的 URL 的使用者部分取得,例如 xcom_objectstorage_path = s3://conn_id@mybucket/key
。此外,xcom_objectstorage_threshold
需要設定為大於 -1 的值。任何小於閾值(以位元組為單位)的物件都將儲存在資料庫中,而任何更大的物件都將放入物件儲存中。這將允許混合設定。如果 XCom 儲存在物件儲存上,則參考將保存在資料庫中。最後,您可以將 xcom_objectstorage_compression
設定為 fsspec 支援的壓縮方法,例如 zip
或 snappy
,以在將資料儲存到物件儲存之前對其進行壓縮。
因此,例如,以下組態將在 S3 中儲存任何大於 1MB 的內容,並使用 gzip 對其進行壓縮
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstorage_compression = gzip
注意
壓縮需要您的 Python 環境中安裝了對其的支援。例如,要使用 snappy
壓縮,您需要安裝 python-snappy
。Zip、gzip 和 bz2 可以直接使用。
自訂 XCom 後端¶
XCom 系統具有可互換的後端,您可以透過 xcom_backend
組態選項設定正在使用的後端。
如果您想實作自己的後端,您應該繼承 BaseXCom
,並覆寫 serialize_value
和 deserialize_value
方法。
還有一個 orm_deserialize_value
方法,每當為 UI 或報告目的呈現 XCom 物件時都會調用它;如果您的 XCom 中有大型或難以檢索的值,您應該覆寫此方法以避免調用該程式碼(而是返回更輕量、不完整的表示),以便 UI 保持響應。
您也可以覆寫 clear
方法,並在清除給定 DAG 和任務的結果時使用它。這允許自訂 XCom 後端更輕鬆地處理資料生命週期。
在容器中驗證自訂 XCom 後端的使用¶
根據 Airflow 的部署位置(即,本地、Docker、K8s 等),確保自訂 XCom 後端實際上正在初始化可能很有用。例如,容器環境的複雜性可能會使您更難確定您的後端是否在容器部署期間正確載入。幸運的是,以下指南可用於幫助您建立對自訂 XCom 實作的信心。
如果您可以 exec 進入 Airflow 容器中的終端機,則可以印出正在使用的實際 XCom 類別
from airflow.models.xcom import XCom
print(XCom.__name__)