Operators¶
Operator 在概念上是預定義的 任務 的模板,您可以直接在 DAG 內宣告式地定義它
with DAG("my-dag") as dag:
ping = HttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="admin@example.com", subject="Update complete")
ping >> email
Airflow 擁有非常廣泛的 operators 集合,其中一些內建於核心或預先安裝的 providers 中。來自核心的一些熱門 operators 包括
BashOperator
- 執行 bash 命令PythonOperator
- 呼叫任意 Python 函數EmailOperator
- 發送電子郵件使用
@task
裝飾器來執行任意 Python 函數。它不支援渲染作為參數傳遞的 jinja 模板。
注意
建議使用 @task
裝飾器來取代經典的 PythonOperator
,以執行參數中不需模板渲染的 Python callable。
如需所有核心 operators 的列表,請參閱:核心 Operators 與 Hooks 參考。
如果您需要的 operator 沒有隨 Airflow 預設安裝,您可能可以在我們龐大的社群 provider packages 中找到它。來自這裡的一些熱門 operators 包括
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
但還有更多更多 - 您可以在我們的 providers packages 文件中查看所有社群管理的 operators、hooks、sensors 和 transfers 的完整列表。
注意
在 Airflow 的程式碼中,我們經常混用 任務 和 Operators 的概念,它們在很大程度上是可以互換的。但是,當我們談論任務時,我們指的是 DAG 的通用「執行單元」;當我們談論 Operator 時,我們指的是可重複使用、預先製作的任務模板,其邏輯都已為您完成,只需要一些參數即可。
Jinja 模板¶
Airflow 利用 Jinja 模板 的強大功能,這可以與 macros 結合使用,成為一個強大的工具。
例如,假設您想使用 BashOperator
將資料間隔的開始時間作為環境變數傳遞給 Bash 腳本
# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id="test_env",
bash_command="/tmp/test.sh ",
dag=dag,
env={"DATA_INTERVAL_START": date},
)
在這裡,{{ ds }}
是一個模板變數,並且由於 BashOperator
的 env
參數使用 Jinja 進行模板化,因此資料間隔的開始日期將作為名為 DATA_INTERVAL_START
的環境變數在您的 Bash 腳本中可用。
當 Python 比 Jinja 模板更易讀時,您也可以傳遞 callable。callable 必須接受兩個具名參數 context
和 jinja_env
def build_complex_command(context, jinja_env):
with open("file.csv") as f:
return do_complex_things(f)
t = BashOperator(
task_id="complex_templated_echo",
bash_command=build_complex_command,
dag=dag,
)
由於每個模板欄位僅渲染一次,因此 callable 的回傳值不會再次經過渲染。因此,callable 必須手動渲染任何模板。這可以透過在當前任務上呼叫 render_template()
來完成,如下所示
def build_complex_command(context, jinja_env):
with open("file.csv") as f:
data = do_complex_things(f)
return context["task"].render_template(data, context, jinja_env)
您可以將模板化用於文件中標記為「templated」的每個參數。模板替換發生在 operator 的 pre_execute
函數被呼叫之前。
您也可以將模板化用於巢狀欄位,只要這些巢狀欄位在它們所屬的結構中被標記為模板化即可:在 template_fields
屬性中註冊的欄位將提交給模板替換,就像下面範例中的 path
欄位一樣
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
dag=dag,
)
注意
template_fields
屬性是一個類別變數,保證是 Sequence[str]
類型(即字串的列表或元組)。
深度巢狀欄位也可以被替換,只要所有中間欄位都被標記為模板欄位即可
class MyDataTransformer:
template_fields: Sequence[str] = ("reader",)
def __init__(self, my_reader):
self.reader = my_reader
# [additional code here...]
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
dag=dag,
)
您可以在建立 DAG 時將自訂選項傳遞給 Jinja Environment
。一種常見的用法是避免 Jinja 從模板字串中刪除尾隨換行符
my_dag = DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# some other jinja2 Environment options here
},
)
請參閱 Jinja 文件 以查找所有可用選項。
某些 operators 也會將以特定後綴(在 template_ext
中定義)結尾的字串視為在渲染欄位時對檔案的引用。這對於直接從檔案載入腳本或查詢,而不是將它們包含到 DAG 程式碼中非常有用。
例如,考慮一個執行多行 bash 腳本的 BashOperator,這將載入 script.sh
中的檔案,並使用其內容作為 bash_command
的值
run_script = BashOperator(
task_id="run_script",
bash_command="script.sh",
)
預設情況下,以這種方式提供的路徑應相對於 DAG 的資料夾(因為這是預設的 Jinja 模板搜尋路徑),但可以透過設定 DAG 上的 template_searchpath
參數來新增其他路徑。
在某些情況下,您可能希望將字串排除在模板化之外並直接使用它。考慮以下任務
print_script = BashOperator(
task_id="print_script",
bash_command="cat script.sh",
)
這將失敗並顯示 TemplateNotFound: cat script.sh
,因為 Airflow 會將該字串視為檔案的路徑,而不是命令。我們可以透過將其包裝在 literal()
中來防止 airflow 將此值視為對檔案的引用。這種方法禁用巨集和檔案的渲染,並且可以應用於選定的巢狀欄位,同時保留其餘內容的預設模板化規則。
from airflow.utils.template import literal
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command=literal("cat script.sh"),
)
版本 2.8 新增:新增了 literal()
。
或者,如果您想防止 Airflow 將值視為對檔案的引用,您可以覆寫 template_ext
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()
將欄位渲染為原生 Python 物件¶
預設情況下,template_fields
中的所有 Jinja 模板都渲染為字串。然而,這並不總是期望的。例如,假設一個 extract
任務將字典 {"1001": 301.27, "1002": 433.21, "1003": 502.22}
推送到 XCom
@task(task_id="extract")
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
如果一個任務依賴於 extract
,則 order_data
參數會被傳遞一個字串 "{'1001': 301.27, '1002': 433.21, '1003': 502.22}"
def transform(order_data):
total_order_value = sum(order_data.values()) # Fails because order_data is a str :(
return {"total_order_value": total_order_value}
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
python_callable=transform,
)
extract() >> transform
如果我們想要取得實際的 dict 而不是字串,則有兩種解決方案。第一種是使用 callable
def render_transform_op_kwargs(context, jinja_env):
order_data = context["ti"].xcom_pull("extract")
return {"order_data": order_data}
transform = PythonOperator(
task_id="transform",
op_kwargs=render_transform_op_kwargs,
python_callable=transform,
)
或者,也可以指示 Jinja 渲染原生 Python 物件。這可以透過將 render_template_as_native_obj=True
傳遞給 DAG 來完成。這會使 Airflow 使用 NativeEnvironment 而不是預設的 SandboxedEnvironment
with DAG(
dag_id="example_template_as_python_object",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
):
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
python_callable=transform,
)
保留字 params¶
在 Apache Airflow 2.2.0 中,params
變數在 DAG 序列化期間使用。請不要在第三方 operators 中使用該名稱。如果您升級環境並收到以下錯誤
AttributeError: 'str' object has no attribute '__module__'
請在您的 operators 中更改名稱,不要使用 params
。