AzureBlobStorageToTeradataOperator

AzureBlobStorageToTeradataOperator 的目的是定義從 Azure Blob Storage 到 Teradata 資料表傳輸 CSV、JSON 和 Parquet 格式資料的任務。使用 AzureBlobStorageToTeradataOperator 將資料從 Azure Blob Storage 傳輸到 Teradata。此運算子利用 Teradata READ_NOS 功能,將 CSV、JSON 和 Parquet 格式的資料從 Azure Blob Storage 匯入到 Teradata。此運算子直接從物件儲存區存取資料,並使用 READ_NOS 和 CREATE TABLE AS 功能以及以下 SQL 語句在資料庫中產生永久資料表。

CREATE MULTISET TABLE multiset_table_name AS (
  SELECT *
  FROM (
    LOCATION='YOUR-OBJECT-STORE-URI'
    AUTHORIZATION=authorization_object
  ) AS d
) WITH DATA;

它促進從公有和私有物件儲存區載入資料。對於私有物件儲存區,可以透過 Teradata 授權資料庫物件或在 Airflow 中使用 Azure Blob Storage 連線定義的物件儲存區登入和物件儲存區金鑰來授權存取物件儲存區。相反地,對於從公有物件儲存區傳輸資料,則不需要授權或存取憑證。

  • Teradata 授權資料庫物件存取類型可以與 AzureBlobStorageToTeradataOperatorteradata_authorization_name 參數一起使用

  • 物件儲存區存取金鑰 ID 和存取金鑰密碼存取類型可以與 S3ToTeradataOperatorazure_conn_id 參數一起使用

https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Setting-Access-Privileges

注意

如果同時定義了兩種存取類型,則 Teradata 授權資料庫物件優先。

從公有 Azure Blob Storage 傳輸資料到 Teradata

以下是 AzureBlobStorageToTeradataOperator 從公有 Azure Blob Storage 傳輸 CSV 資料格式到 Teradata 資料表的範例用法

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[原始碼]

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

使用 AWS 連線從私有 Azure Blob Storage 傳輸資料到 Teradata

以下是 AzureBlobStorageToTeradataOperator 使用定義為 AWS 連線的 AWS 憑證從私有 S3 物件儲存區傳輸 CSV 資料格式到 Teradata 的範例用法

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[原始碼]

    transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_key_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

使用 Teradata 授權物件從私有 Azure Blob Storage 傳輸資料到 Teradata

Teradata 授權資料庫物件用於控制誰可以存取外部物件儲存區。Teradata 授權資料庫物件應存在於 Teradata 資料庫中,以便在從 S3 傳輸資料到 Teradata 時使用。請參閱 Teradata 中外部物件儲存區的身份驗證

以下是 AzureBlobStorageToTeradataOperator 使用在 Teradata 中定義的授權資料庫物件從私有 S3 物件儲存區傳輸 CSV 資料格式到 Teradata 的範例用法。

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[原始碼]

    transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_auth_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        teradata_authorization_name="azure_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

從 Azure Blob Storage 傳輸 CSV 格式資料到 Teradata

以下是 AzureBlobStorageToTeradataOperator 從 Azure Blob Storage 傳輸 CSV 資料格式到 Teradata 資料表的範例用法

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[原始碼]

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

從 Azure Blob Storage 傳輸 JSON 格式資料到 Teradata

以下是 AzureBlobStorageToTeradataOperator 從 Azure Blob Storage 傳輸 JSON 資料格式到 Teradata 資料表的範例用法

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[原始碼]

    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

從 Azure Blob Storage 傳輸 PARQUET 格式資料到 Teradata

以下是 AzureBlobStorageToTeradataOperator 從 Azure Blob Storage 傳輸 PARQUET 資料格式到 Teradata 資料表的範例用法

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[原始碼]

    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

完整的 AzureBlobStorageToTeradataOperator 運算子 DAG

當我們將所有內容放在一起時,我們的 DAG 應該看起來像這樣

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[原始碼]


ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_azure_blob_to_teradata_transfer_operator"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_csv = TeradataOperator(
        task_id="read_data_table_csv",
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_table_csv = TeradataOperator(
        task_id="drop_table_csv",
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_key_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_key_data_table_csv = TeradataOperator(
        task_id="read_key_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_key_table_csv = TeradataOperator(
        task_id="drop_key_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    create_azure_authorization = TeradataOperator(
        task_id="create_azure_authorization",
        conn_id=CONN_ID,
        sql="CREATE AUTHORIZATION azure_authorization USER '{{ var.value.get('AZURE_BLOB_ACCOUNTNAME') }}' PASSWORD '{{ var.value.get('AZURE_BLOB_ACCOUNT_SECRET_KEY') }}' ",
    )
    transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_auth_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        teradata_authorization_name="azure_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_auth_data_table_csv = TeradataOperator(
        task_id="read_auth_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_auth_table_csv = TeradataOperator(
        task_id="drop_auth_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    drop_auth = TeradataOperator(
        task_id="drop_auth",
        conn_id=CONN_ID,
        sql="DROP AUTHORIZATION azure_authorization;",
    )
    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_json = TeradataOperator(
        task_id="read_data_table_json",
        sql="SELECT count(1) from example_blob_teradata_json;",
    )
    drop_table_json = TeradataOperator(
        task_id="drop_table_json",
        sql="DROP TABLE example_blob_teradata_json;",
    )
    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_data_table_parquet = TeradataOperator(
        task_id="read_data_table_parquet",
        sql="SELECT count(1) from example_blob_teradata_parquet;",
    )
    drop_table_parquet = TeradataOperator(
        task_id="drop_table_parquet",
        sql="DROP TABLE example_blob_teradata_parquet;",
    )

    (
        transfer_data_csv
        >> transfer_data_json
        >> transfer_data_parquet
        >> read_data_table_csv
        >> read_data_table_json
        >> read_data_table_parquet
        >> drop_table_csv
        >> drop_table_json
        >> drop_table_parquet
        >> transfer_key_data_csv
        >> read_key_data_table_csv
        >> drop_key_table_csv
        >> create_azure_authorization
        >> transfer_auth_data_csv
        >> read_auth_data_table_csv
        >> drop_auth_table_csv
        >> drop_auth
    )

此條目是否有幫助?