Amazon Redshift Data¶
Amazon Redshift 管理設定、操作和擴展資料倉儲的所有工作:配置容量、監控和備份叢集,以及將修補程式和升級應用於 Amazon Redshift 引擎。您可以專注於使用您的資料來為您的業務和客戶獲取新的見解。
先決條件任務¶
要使用這些運算子,您必須完成以下幾件事
使用 AWS Console 或 AWS CLI 建立必要的資源。
透過 pip 安裝 API 函式庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 Airflow® 安裝
設定連線.
通用參數¶
- aws_conn_id
參考 Amazon Web Services 連線 ID。如果此參數設定為
None
,則會使用預設的 boto3 行為,而不會查詢連線。否則,請使用儲存在連線中的憑證。預設值:aws_default
- region_name
AWS 區域名稱。如果此參數設定為
None
或省略,則將使用 AWS 連線額外參數 中的 region_name。否則,請使用指定的值來取代連線值。預設值:None
- verify
是否驗證 SSL 憑證。
False
- 不驗證 SSL 憑證。path/to/cert/bundle.pem - 要使用的 CA 憑證套件的檔案名稱。如果您想使用與 botocore 使用的 CA 憑證套件不同的套件,您可以指定此引數。
如果此參數設定為
None
或省略,則將使用 AWS 連線額外參數 中的 verify。否則,請使用指定的值來取代連線值。預設值:None
- botocore_config
提供的字典用於建構 botocore.config.Config。此設定可用於設定 避免節流例外、逾時等。
範例,有關參數的更多詳細資訊,請查看 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此參數設定為
None
或省略,則將使用 AWS 連線額外參數 中的 config_kwargs。否則,請使用指定的值來取代連線值。預設值:None
注意
指定空字典
{}
將覆寫 botocore.config.Config 的連線設定
運算子¶
在 Amazon Redshift 叢集上執行陳述式¶
使用 RedshiftDataOperator
以針對 Amazon Redshift 叢集執行陳述式。
這與 RedshiftSQLOperator
的不同之處在於,它允許使用者透過 AWS API 查詢和檢索資料,並避免需要 Postgres 連線。
tests/system/amazon/aws/example_redshift.py
create_table_redshift_data = RedshiftDataOperator(
task_id="create_table_redshift_data",
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=[
"""
CREATE TABLE IF NOT EXISTS fruit (
fruit_id INTEGER,
name VARCHAR NOT NULL,
color VARCHAR NOT NULL
);
"""
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
)
在執行多個陳述式時重複使用會話¶
在上游任務中指定 session_keep_alive_seconds
參數。在下游任務中,從 XCom 取得會話 ID 並將其傳遞給 session_id
參數。當您使用暫存表時,這非常有用。
tests/system/amazon/aws/example_redshift.py
create_tmp_table_data_api = RedshiftDataOperator(
task_id="create_tmp_table_data_api",
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=[
"""
CREATE TEMPORARY TABLE tmp_people (
id INTEGER,
first_name VARCHAR(100),
age INTEGER
);
"""
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
session_keep_alive_seconds=600,
)
insert_data_reuse_session = RedshiftDataOperator(
task_id="insert_data_reuse_session",
sql=[
"INSERT INTO tmp_people VALUES ( 1, 'Bob', 30);",
"INSERT INTO tmp_people VALUES ( 2, 'Alice', 35);",
"INSERT INTO tmp_people VALUES ( 3, 'Charlie', 40);",
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
session_id="{{ task_instance.xcom_pull(task_ids='create_tmp_table_data_api', key='session_id') }}",
)