Operators

Operator 在概念上是預定義的 任務 的模板,您可以直接在 DAG 內宣告式地定義它

with DAG("my-dag") as dag:
    ping = HttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")

    ping >> email

Airflow 擁有非常廣泛的 operators 集合,其中一些內建於核心或預先安裝的 providers 中。來自核心的一些熱門 operators 包括

  • BashOperator - 執行 bash 命令

  • PythonOperator - 呼叫任意 Python 函數

  • EmailOperator - 發送電子郵件

  • 使用 @task 裝飾器來執行任意 Python 函數。它不支援渲染作為參數傳遞的 jinja 模板。

注意

建議使用 @task 裝飾器來取代經典的 PythonOperator,以執行參數中不需模板渲染的 Python callable。

如需所有核心 operators 的列表,請參閱:核心 Operators 與 Hooks 參考

如果您需要的 operator 沒有隨 Airflow 預設安裝,您可能可以在我們龐大的社群 provider packages 中找到它。來自這裡的一些熱門 operators 包括

但還有更多更多 - 您可以在我們的 providers packages 文件中查看所有社群管理的 operators、hooks、sensors 和 transfers 的完整列表。

注意

在 Airflow 的程式碼中,我們經常混用 任務 和 Operators 的概念,它們在很大程度上是可以互換的。但是,當我們談論任務時,我們指的是 DAG 的通用「執行單元」;當我們談論 Operator 時,我們指的是可重複使用、預先製作的任務模板,其邏輯都已為您完成,只需要一些參數即可。

Jinja 模板

Airflow 利用 Jinja 模板 的強大功能,這可以與 macros 結合使用,成為一個強大的工具。

例如,假設您想使用 BashOperator 將資料間隔的開始時間作為環境變數傳遞給 Bash 腳本

# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

在這裡,{{ ds }} 是一個模板變數,並且由於 BashOperatorenv 參數使用 Jinja 進行模板化,因此資料間隔的開始日期將作為名為 DATA_INTERVAL_START 的環境變數在您的 Bash 腳本中可用。

當 Python 比 Jinja 模板更易讀時,您也可以傳遞 callable。callable 必須接受兩個具名參數 contextjinja_env

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        return do_complex_things(f)


t = BashOperator(
    task_id="complex_templated_echo",
    bash_command=build_complex_command,
    dag=dag,
)

由於每個模板欄位僅渲染一次,因此 callable 的回傳值不會再次經過渲染。因此,callable 必須手動渲染任何模板。這可以透過在當前任務上呼叫 render_template() 來完成,如下所示

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        data = do_complex_things(f)
    return context["task"].render_template(data, context, jinja_env)

您可以將模板化用於文件中標記為「templated」的每個參數。模板替換發生在 operator 的 pre_execute 函數被呼叫之前。

您也可以將模板化用於巢狀欄位,只要這些巢狀欄位在它們所屬的結構中被標記為模板化即可:在 template_fields 屬性中註冊的欄位將提交給模板替換,就像下面範例中的 path 欄位一樣

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

注意

template_fields 屬性是一個類別變數,保證是 Sequence[str] 類型(即字串的列表或元組)。

深度巢狀欄位也可以被替換,只要所有中間欄位都被標記為模板欄位即可

class MyDataTransformer:
    template_fields: Sequence[str] = ("reader",)

    def __init__(self, my_reader):
        self.reader = my_reader

    # [additional code here...]


class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

您可以在建立 DAG 時將自訂選項傳遞給 Jinja Environment。一種常見的用法是避免 Jinja 從模板字串中刪除尾隨換行符

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
        # some other jinja2 Environment options here
    },
)

請參閱 Jinja 文件 以查找所有可用選項。

某些 operators 也會將以特定後綴(在 template_ext 中定義)結尾的字串視為在渲染欄位時對檔案的引用。這對於直接從檔案載入腳本或查詢,而不是將它們包含到 DAG 程式碼中非常有用。

例如,考慮一個執行多行 bash 腳本的 BashOperator,這將載入 script.sh 中的檔案,並使用其內容作為 bash_command 的值

run_script = BashOperator(
    task_id="run_script",
    bash_command="script.sh",
)

預設情況下,以這種方式提供的路徑應相對於 DAG 的資料夾(因為這是預設的 Jinja 模板搜尋路徑),但可以透過設定 DAG 上的 template_searchpath 參數來新增其他路徑。

在某些情況下,您可能希望將字串排除在模板化之外並直接使用它。考慮以下任務

print_script = BashOperator(
    task_id="print_script",
    bash_command="cat script.sh",
)

這將失敗並顯示 TemplateNotFound: cat script.sh,因為 Airflow 會將該字串視為檔案的路徑,而不是命令。我們可以透過將其包裝在 literal() 中來防止 airflow 將此值視為對檔案的引用。這種方法禁用巨集和檔案的渲染,並且可以應用於選定的巢狀欄位,同時保留其餘內容的預設模板化規則。

from airflow.utils.template import literal


fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command=literal("cat script.sh"),
)

版本 2.8 新增:新增了 literal()

或者,如果您想防止 Airflow 將值視為對檔案的引用,您可以覆寫 template_ext

fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()

將欄位渲染為原生 Python 物件

預設情況下,template_fields 中的所有 Jinja 模板都渲染為字串。然而,這並不總是期望的。例如,假設一個 extract 任務將字典 {"1001": 301.27, "1002": 433.21, "1003": 502.22} 推送到 XCom

@task(task_id="extract")
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)

如果一個任務依賴於 extract,則 order_data 參數會被傳遞一個字串 "{'1001': 301.27, '1002': 433.21, '1003': 502.22}"

def transform(order_data):
    total_order_value = sum(order_data.values())  # Fails because order_data is a str :(
    return {"total_order_value": total_order_value}


transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
    python_callable=transform,
)

extract() >> transform

如果我們想要取得實際的 dict 而不是字串,則有兩種解決方案。第一種是使用 callable

def render_transform_op_kwargs(context, jinja_env):
    order_data = context["ti"].xcom_pull("extract")
    return {"order_data": order_data}


transform = PythonOperator(
    task_id="transform",
    op_kwargs=render_transform_op_kwargs,
    python_callable=transform,
)

或者,也可以指示 Jinja 渲染原生 Python 物件。這可以透過將 render_template_as_native_obj=True 傳遞給 DAG 來完成。這會使 Airflow 使用 NativeEnvironment 而不是預設的 SandboxedEnvironment

with DAG(
    dag_id="example_template_as_python_object",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
):
    transform = PythonOperator(
        task_id="transform",
        op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
        python_callable=transform,
    )

保留字 params

在 Apache Airflow 2.2.0 中,params 變數在 DAG 序列化期間使用。請不要在第三方 operators 中使用該名稱。如果您升級環境並收到以下錯誤

AttributeError: 'str' object has no attribute '__module__'

請在您的 operators 中更改名稱,不要使用 params

這個條目是否有幫助?