Amazon Redshift Data

Amazon Redshift 管理設定、操作和擴展資料倉儲的所有工作:配置容量、監控和備份叢集,以及將修補程式和升級應用於 Amazon Redshift 引擎。您可以專注於使用您的資料來為您的業務和客戶獲取新的見解。

先決條件任務

要使用這些運算子,您必須完成以下幾件事

通用參數

aws_conn_id

參考 Amazon Web Services 連線 ID。如果此參數設定為 None,則會使用預設的 boto3 行為,而不會查詢連線。否則,請使用儲存在連線中的憑證。預設值:aws_default

region_name

AWS 區域名稱。如果此參數設定為 None 或省略,則將使用 AWS 連線額外參數 中的 region_name。否則,請使用指定的值來取代連線值。預設值:None

verify

是否驗證 SSL 憑證。

  • False - 不驗證 SSL 憑證。

  • path/to/cert/bundle.pem - 要使用的 CA 憑證套件的檔案名稱。如果您想使用與 botocore 使用的 CA 憑證套件不同的套件,您可以指定此引數。

如果此參數設定為 None 或省略,則將使用 AWS 連線額外參數 中的 verify。否則,請使用指定的值來取代連線值。預設值:None

botocore_config

提供的字典用於建構 botocore.config.Config。此設定可用於設定 避免節流例外、逾時等。

範例,有關參數的更多詳細資訊,請查看 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此參數設定為 None 或省略,則將使用 AWS 連線額外參數 中的 config_kwargs。否則,請使用指定的值來取代連線值。預設值:None

注意

指定空字典 {} 將覆寫 botocore.config.Config 的連線設定

運算子

在 Amazon Redshift 叢集上執行陳述式

使用 RedshiftDataOperator 以針對 Amazon Redshift 叢集執行陳述式。

這與 RedshiftSQLOperator 的不同之處在於,它允許使用者透過 AWS API 查詢和檢索資料,並避免需要 Postgres 連線。

tests/system/amazon/aws/example_redshift.py

create_table_redshift_data = RedshiftDataOperator(
    task_id="create_table_redshift_data",
    cluster_identifier=redshift_cluster_identifier,
    database=DB_NAME,
    db_user=DB_LOGIN,
    sql=[
        """
        CREATE TABLE IF NOT EXISTS fruit (
        fruit_id INTEGER,
        name VARCHAR NOT NULL,
        color VARCHAR NOT NULL
        );
    """
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
)

在執行多個陳述式時重複使用會話

在上游任務中指定 session_keep_alive_seconds 參數。在下游任務中,從 XCom 取得會話 ID 並將其傳遞給 session_id 參數。當您使用暫存表時,這非常有用。

tests/system/amazon/aws/example_redshift.py

create_tmp_table_data_api = RedshiftDataOperator(
    task_id="create_tmp_table_data_api",
    cluster_identifier=redshift_cluster_identifier,
    database=DB_NAME,
    db_user=DB_LOGIN,
    sql=[
        """
        CREATE TEMPORARY TABLE tmp_people (
        id INTEGER,
        first_name VARCHAR(100),
        age INTEGER
        );
    """
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
    session_keep_alive_seconds=600,
)

insert_data_reuse_session = RedshiftDataOperator(
    task_id="insert_data_reuse_session",
    sql=[
        "INSERT INTO tmp_people VALUES ( 1, 'Bob', 30);",
        "INSERT INTO tmp_people VALUES ( 2, 'Alice', 35);",
        "INSERT INTO tmp_people VALUES ( 3, 'Charlie', 40);",
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
    session_id="{{ task_instance.xcom_pull(task_ids='create_tmp_table_data_api', key='session_id') }}",
)

此條目是否有幫助?