建立執行中的管線

讓我們看看另一個範例:我們需要從線上託管的檔案取得一些資料,並將其插入到我們的本機資料庫中。我們還需要查看在插入時刪除重複的列。

請注意: 本教學中使用的 operator 已棄用。其建議的後繼者 SQLExecuteQueryOperator 的運作方式類似。您可能會發現本指南很有幫助。

初始設定

我們需要安裝 Docker,因為我們將在本範例中使用在 Docker 中執行 Airflow 程序。以下步驟應已足夠,但完整說明請參閱快速入門文件。

# Download the docker-compose.yaml file
curl -LfO 'https://airflow.dev.org.tw/docs/apache-airflow/stable/docker-compose.yaml'

# Make expected directories and set an expected environment variable
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Initialize the database
docker compose up airflow-init

# Start up all services
docker compose up

在所有服務啟動後,網頁 UI 將可在以下網址使用:https://127.0.0.1:8080。預設帳戶的使用者名稱為 airflow,密碼為 airflow

我們還需要建立與 postgres 資料庫的連線。若要透過網頁 UI 建立連線,請從「Admin」選單中選取「Connections」,然後按一下加號以「Add a new record」到連線清單。

填寫如下所示的欄位。請注意「Connection Id」值,我們將其作為 postgres_conn_id kwarg 的參數傳遞。

  • 連線 ID:tutorial_pg_conn

  • 連線類型:postgres

  • 主機:postgres

  • 結構描述:airflow

  • 登入:airflow

  • 密碼:airflow

  • 連接埠:5432

測試您的連線,如果測試成功,請儲存您的連線。

建立表格任務

我們可以使用 PostgresOperator 來定義在我們的 postgres 資料庫中建立表格的任務。

我們將建立一個表格以方便資料清理步驟 (employees_temp),以及另一個表格來儲存我們清理後的資料 (employees)。

from airflow.providers.postgres.operators.postgres import PostgresOperator

create_employees_table = PostgresOperator(
    task_id="create_employees_table",
    postgres_conn_id="tutorial_pg_conn",
    sql="""
        CREATE TABLE IF NOT EXISTS employees (
            "Serial Number" NUMERIC PRIMARY KEY,
            "Company Name" TEXT,
            "Employee Markme" TEXT,
            "Description" TEXT,
            "Leave" INTEGER
        );""",
)

create_employees_temp_table = PostgresOperator(
    task_id="create_employees_temp_table",
    postgres_conn_id="tutorial_pg_conn",
    sql="""
        DROP TABLE IF EXISTS employees_temp;
        CREATE TABLE employees_temp (
            "Serial Number" NUMERIC PRIMARY KEY,
            "Company Name" TEXT,
            "Employee Markme" TEXT,
            "Description" TEXT,
            "Leave" INTEGER
        );""",
)

選項:從檔案使用 SQL

如果您想從您的 DAG 中抽象化這些 sql 陳述式,您可以將這些 sql 檔案移動到 dags/ 目錄中的某個位置,並將 sql 檔案路徑(相對於 dags/)傳遞給 sql kwarg。例如,對於 employees,在 dags/ 中建立一個 sql 目錄,將 employees DDL 放在 dags/sql/employees_schema.sql 中,並修改 PostgresOperator() 為

create_employees_table = PostgresOperator(
    task_id="create_employees_table",
    postgres_conn_id="tutorial_pg_conn",
    sql="sql/employees_schema.sql",
)

並對 employees_temp 表格重複執行。

資料檢索任務

在這裡,我們檢索資料,將其儲存到 Airflow 實例上的檔案中,然後從該檔案將資料載入到中繼表格中,我們可以在其中執行資料清理步驟。

import os
import requests
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task
def get_data():
    # NOTE: configure this as appropriate for your airflow environment
    data_path = "/opt/airflow/dags/files/employees.csv"
    os.makedirs(os.path.dirname(data_path), exist_ok=True)

    url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"

    response = requests.request("GET", url)

    with open(data_path, "w") as file:
        file.write(response.text)

    postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
    conn = postgres_hook.get_conn()
    cur = conn.cursor()
    with open(data_path, "r") as file:
        cur.copy_expert(
            "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
            file,
        )
    conn.commit()

資料合併任務

在這裡,我們從檢索到的資料中選取完全唯一的記錄,然後我們檢查是否有任何員工 Serial Numbers 已經在資料庫中(如果有的話,我們會使用新資料更新這些記錄)。

from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task
def merge_data():
    query = """
        INSERT INTO employees
        SELECT *
        FROM (
            SELECT DISTINCT *
            FROM employees_temp
        ) t
        ON CONFLICT ("Serial Number") DO UPDATE
        SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
    """
    try:
        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        cur.execute(query)
        conn.commit()
        return 0
    except Exception as e:
        return 1

完成我們的 DAG

我們已經開發了我們的任務,現在我們需要將它們包裝在 DAG 中,這使我們能夠定義任務應何時以及如何執行,並聲明任務對其他任務的任何依賴關係。以下 DAG 配置為

  • 每天午夜執行一次,從 2021 年 1 月 1 日開始,

  • 僅在錯過天數的情況下執行一次,以及

  • 60 分鐘後逾時

process_employees DAG 定義的最後一行,我們看到

[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
  • merge_data() 任務依賴於 get_data() 任務,

  • get_data() 依賴於 create_employees_tablecreate_employees_temp_table 任務,以及

  • create_employees_tablecreate_employees_temp_table 任務可以獨立執行。

將所有部分放在一起,我們就完成了 DAG。

import datetime
import pendulum
import os

import requests
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator


@dag(
    dag_id="process_employees",
    schedule_interval="0 0 * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
    create_employees_table = PostgresOperator(
        task_id="create_employees_table",
        postgres_conn_id="tutorial_pg_conn",
        sql="""
            CREATE TABLE IF NOT EXISTS employees (
                "Serial Number" NUMERIC PRIMARY KEY,
                "Company Name" TEXT,
                "Employee Markme" TEXT,
                "Description" TEXT,
                "Leave" INTEGER
            );""",
    )

    create_employees_temp_table = PostgresOperator(
        task_id="create_employees_temp_table",
        postgres_conn_id="tutorial_pg_conn",
        sql="""
            DROP TABLE IF EXISTS employees_temp;
            CREATE TABLE employees_temp (
                "Serial Number" NUMERIC PRIMARY KEY,
                "Company Name" TEXT,
                "Employee Markme" TEXT,
                "Description" TEXT,
                "Leave" INTEGER
            );""",
    )

    @task
    def get_data():
        # NOTE: configure this as appropriate for your airflow environment
        data_path = "/opt/airflow/dags/files/employees.csv"
        os.makedirs(os.path.dirname(data_path), exist_ok=True)

        url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"

        response = requests.request("GET", url)

        with open(data_path, "w") as file:
            file.write(response.text)

        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        with open(data_path, "r") as file:
            cur.copy_expert(
                "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
                file,
            )
        conn.commit()

    @task
    def merge_data():
        query = """
            INSERT INTO employees
            SELECT *
            FROM (
                SELECT DISTINCT *
                FROM employees_temp
            ) t
            ON CONFLICT ("Serial Number") DO UPDATE
            SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
        """
        try:
            postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
            conn = postgres_hook.get_conn()
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            return 0
        except Exception as e:
            return 1

    [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()


dag = ProcessEmployees()

將此程式碼儲存到 /dags 資料夾中的 python 檔案中(例如 dags/process_employees.py),並(在短暫延遲後),process_employees DAG 將包含在網頁 UI 上可用的 DAG 清單中。

../_images/tutorial-pipeline-1.png

您可以透過取消暫停 process_employees DAG(透過左端的滑桿)並執行它(透過動作下的「執行」按鈕)來觸發它。

../_images/tutorial-pipeline-2.png

process_employees DAG 的網格視圖中,我們看到所有任務在所有執行的執行中都成功執行。成功!

接下來呢?

您現在有一個在 Airflow 內部使用 Docker Compose 執行的管線。以下是您接下來可能想做的一些事情

另請參閱

這個條目有幫助嗎?