S3ToTeradataOperator¶
S3ToTeradataOperator
的目的是定義從 AWS Simple Storage Service (S3) 到 Teradata 表格傳輸 CSV、JSON 和 Parquet 格式資料的任務。此運算子使用 Teradata READ_NOS 功能將資料從 AWS Simple Storage Service (S3) 傳輸到 Teradata 表格。READ_NOS 是 Teradata Vantage 中的表格運算子,允許使用者列出指定位置的外部檔案。如需更多詳細資訊,請參閱READ_NOS 功能
使用 S3ToTeradataOperator
將資料從 S3 傳輸到 Teradata。此運算子利用 Teradata READ_NOS 功能,將 CSV、JSON 和 Parquet 格式的資料從 S3 匯入到 Teradata。此運算子直接從物件儲存區存取資料,並使用 READ_NOS 和 CREATE TABLE AS 功能以及以下 SQL 語句在資料庫中產生永久表格。
注意
目前版本的
S3ToTeradataOperator
不支援使用安全性權杖服務 (STS) 暫時憑證存取 AWS S3。相反地,它僅支援使用長期憑證存取。
將 CSV 格式資料從 S3 傳輸到 Teradata¶
以下是使用 S3ToTeradataOperator 將 CSV 資料格式從 S3 傳輸到 teradata 表格的使用範例
transfer_data_csv = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_csv",
s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
public_bucket=True,
teradata_table="example_s3_teradata_csv",
aws_conn_id="aws_default",
trigger_rule="all_done",
)
將 JSON 格式資料從 S3 傳輸到 Teradata¶
以下是使用 S3ToTeradataOperator 將 JSON 資料格式從 S3 傳輸到 teradata 表格的使用範例
transfer_data_json = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_json",
s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/",
public_bucket=True,
teradata_table="example_s3_teradata_json",
aws_conn_id="aws_default",
trigger_rule="all_done",
)
將 PARQUET 格式資料從 S3 傳輸到 Teradata¶
以下是使用 S3ToTeradataOperator 將 PARQUET 資料格式從 S3 傳輸到 teradata 表格的使用範例
transfer_data_parquet = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_parquet",
s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/",
public_bucket=True,
teradata_table="example_s3_teradata_parquet",
aws_conn_id="aws_default",
trigger_rule="all_done",
)
完整的 S3ToTeradataOperator
運算子 DAG¶
當我們將所有內容放在一起時,我們的 DAG 應該如下所示
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_s3_to_teradata_transfer_operator"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
# [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
transfer_data_csv = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_csv",
s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
public_bucket=True,
teradata_table="example_s3_teradata_csv",
aws_conn_id="aws_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
conn_id=CONN_ID,
sql="SELECT * from example_s3_teradata_csv;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
drop_table_csv = TeradataOperator(
task_id="drop_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_s3_teradata_csv;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv]
transfer_key_data_csv = S3ToTeradataOperator(
task_id="transfer_key_data_s3_to_teradata_key_csv",
s3_source_key="/s3/airflowteradatatest.s3.ap-southeast-2.amazonaws.com/",
teradata_table="example_s3_teradata_csv",
aws_conn_id="aws_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
read_key_data_table_csv = TeradataOperator(
task_id="read_key_data_table_csv",
conn_id=CONN_ID,
sql="SELECT * from example_s3_teradata_csv;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
drop_key_table_csv = TeradataOperator(
task_id="drop_key_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_s3_teradata_csv;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_create_authorization]
create_aws_authorization = TeradataOperator(
task_id="create_aws_authorization",
conn_id=CONN_ID,
sql="CREATE AUTHORIZATION aws_authorization USER '{{ var.value.get('AWS_ACCESS_KEY_ID') }}' PASSWORD '{{ var.value.get('AWS_SECRET_ACCESS_KEY') }}' ",
)
# [END s3_to_teradata_transfer_operator_howto_guide_create_authorization]
# [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv]
transfer_auth_data_csv = S3ToTeradataOperator(
task_id="transfer_auth_data_s3_to_teradata_auth_csv",
s3_source_key="/s3/teradata-download.s3.us-east-1.amazonaws.com/DevTools/csv/",
teradata_table="example_s3_teradata_csv",
teradata_authorization_name="aws_authorization",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
read_auth_data_table_csv = TeradataOperator(
task_id="read_auth_data_table_csv",
conn_id=CONN_ID,
sql="SELECT * from example_s3_teradata_csv;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
drop_auth_table_csv = TeradataOperator(
task_id="drop_auth_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_s3_teradata_csv;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_authorization]
drop_auth = TeradataOperator(
task_id="drop_auth",
conn_id=CONN_ID,
sql="DROP AUTHORIZATION aws_authorization;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_authorization]
# [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
transfer_data_json = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_json",
s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/",
public_bucket=True,
teradata_table="example_s3_teradata_json",
aws_conn_id="aws_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
read_data_table_json = TeradataOperator(
task_id="read_data_table_json",
sql="SELECT * from example_s3_teradata_json;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
drop_table_json = TeradataOperator(
task_id="drop_table_json",
sql="DROP TABLE example_s3_teradata_json;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
# [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
transfer_data_parquet = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_parquet",
s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/",
public_bucket=True,
teradata_table="example_s3_teradata_parquet",
aws_conn_id="aws_default",
trigger_rule="all_done",
)
# [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
read_data_table_parquet = TeradataOperator(
task_id="read_data_table_parquet",
sql="SELECT * from example_s3_teradata_parquet;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
# [START s3_to_teradata_transfer_operator_howto_guide_drop_table]
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
sql="DROP TABLE example_s3_teradata_parquet;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table]
(
transfer_data_csv
>> transfer_data_json
>> transfer_data_parquet
>> read_data_table_csv
>> read_data_table_json
>> read_data_table_parquet
>> drop_table_csv
>> drop_table_json
>> drop_table_parquet
>> transfer_key_data_csv
>> read_key_data_table_csv
>> drop_key_table_csv
>> create_aws_authorization
>> transfer_auth_data_csv
>> read_auth_data_table_csv
>> drop_auth_table_csv
>> drop_auth
)