Google Cloud SQL 運算子

先決條件任務

要使用這些運算子,您必須完成幾件事

CloudSQLCreateInstanceDatabaseOperator

在 Cloud SQL 執行個體內建立新的資料庫。

如需參數定義,請查看 CloudSQLCreateInstanceDatabaseOperator

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
    body=db_create_body, instance=INSTANCE_NAME, task_id="sql_db_create_task"
)

範例請求主體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project": PROJECT_ID}

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何在執行個體內建立新的資料庫

CloudSQLDeleteInstanceDatabaseOperator

從 Cloud SQL 執行個體刪除資料庫。

如需參數定義,請查看 CloudSQLDeleteInstanceDatabaseOperator

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_delete_task",
    trigger_rule=TriggerRule.ALL_DONE,
)

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何刪除資料庫

CloudSQLPatchInstanceDatabaseOperator

使用修補語意更新 Cloud SQL 執行個體內資料庫的資源資訊。請參閱:https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

如需參數定義,請查看 CloudSQLPatchInstanceDatabaseOperator

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_db_patch_task = CloudSQLPatchInstanceDatabaseOperator(
    body=db_patch_body,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_patch_task",
)

範例請求主體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

db_patch_body = {"charset": "utf16", "collation": "utf16_general_ci"}

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何更新資料庫

CloudSQLDeleteInstanceOperator

刪除 Google Cloud 中的 Cloud SQL 執行個體。

它也用於刪除讀取複本和容錯移轉複本。

如需參數定義,請查看 CloudSQLDeleteInstanceOperator

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
    instance=INSTANCE_NAME, task_id="sql_instance_delete_task", trigger_rule=TriggerRule.ALL_DONE
)

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何刪除 SQL 執行個體

CloudSQLExportInstanceOperator

將資料從 Cloud SQL 執行個體匯出到 Cloud Storage 儲存桶,作為 SQL 傾印或 CSV 檔案。

注意

此運算子是等冪的。如果使用相同的匯出檔案 URI 多次執行,GCS 中的匯出檔案將被簡單地覆寫。

如需參數定義,請查看 CloudSQLExportInstanceOperator

引數

定義匯出操作的範例主體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

export_body = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}
export_body_deferrable = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI_DEFERRABLE,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_export_task = CloudSQLExportInstanceOperator(
    body=export_body, instance=INSTANCE_NAME, task_id="sql_export_task"
)

對於所有這些操作,您也可以在可延遲模式下使用運算子

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_export_def_task = CloudSQLExportInstanceOperator(
    body=export_body_deferrable,
    instance=INSTANCE_NAME,
    task_id="sql_export_def_task",
    deferrable=True,
)

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何匯出資料

疑難排解

如果您在 Google Cloud 中收到「未授權」錯誤,請確保 Cloud SQL 執行個體的服務帳戶已授權寫入選定的 GCS 儲存桶。

與 GCS 通訊的不是在 Airflow 中設定的服務帳戶,而是特定 Cloud SQL 執行個體的服務帳戶。

要授予服務帳戶對 GCS 儲存桶的適當 WRITE 權限,您可以使用 GCSBucketCreateAclEntryOperator,如範例所示

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="WRITER",
    bucket=file_url_split[1],  # netloc (bucket)
    task_id="sql_gcp_add_bucket_permission_task",
)

CloudSQLImportInstanceOperator

將資料從 Cloud Storage 中的 SQL 傾印或 CSV 檔案匯入到 Cloud SQL 執行個體。

CSV 匯入:

對於 CSV 匯入,此運算子不是等冪的。如果同一個檔案被多次匯入,匯入的資料將在資料庫中重複。此外,如果存在任何唯一約束,重複匯入可能會導致錯誤。

SQL 匯入:

如果 SQL 匯入也是由 Cloud SQL 匯出的,則此運算子對於 SQL 匯入是等冪的。匯出的 SQL 包含所有要匯入的表的 'DROP TABLE IF EXISTS' 語句。

如果匯入檔案是以不同的方式產生的,則不保證等冪性。必須在 SQL 檔案層級確保等冪性。

如需參數定義,請查看 CloudSQLImportInstanceOperator

引數

定義匯入操作的範例主體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

import_body = {"importContext": {"fileType": "sql", "uri": FILE_URI}}

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_import_task = CloudSQLImportInstanceOperator(
    body=import_body, instance=INSTANCE_NAME, task_id="sql_import_task"
)

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何匯入資料

疑難排解

如果您在 Google Cloud 中收到「未授權」錯誤,請確保 Cloud SQL 執行個體的服務帳戶已授權從選定的 GCS 物件讀取。

與 GCS 通訊的不是在 Airflow 中設定的服務帳戶,而是特定 Cloud SQL 執行個體的服務帳戶。

要授予服務帳戶對 GCS 物件的適當 READ 權限,您可以使用 GCSBucketCreateAclEntryOperator,如範例所示

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_gcp_add_object_permission_task = GCSObjectCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="READER",
    bucket=file_url_split[1],  # netloc (bucket)
    object_name=file_url_split[2][1:],  # path (strip first '/')
    task_id="sql_gcp_add_object_permission_task",
)

CloudSQLCreateInstanceOperator

在 Google Cloud 中建立新的 Cloud SQL 執行個體。

它也用於建立讀取複本。

如需參數定義,請查看 CloudSQLCreateInstanceOperator

如果已存在同名的執行個體,則不會採取任何動作,並且運算子將成功。

引數

定義具有容錯移轉複本的執行個體範例主體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

body = {
    "name": INSTANCE_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
        "backupConfiguration": {"binaryLogEnabled": True, "enabled": True, "startTime": "05:00"},
        "activationPolicy": "ALWAYS",
        "dataDiskSizeGb": 30,
        "dataDiskType": "PD_SSD",
        "databaseFlags": [],
        "ipConfiguration": {
            "ipv4Enabled": True,
            "requireSsl": True,
        },
        "locationPreference": {"zone": "europe-west4-a"},
        "maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
        "pricingPlan": "PER_USE",
        "storageAutoResize": True,
        "storageAutoResizeLimit": 0,
        "userLabels": {"my-key": "my-value"},
    },
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
}

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_instance_create_task = CloudSQLCreateInstanceOperator(
    body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
)

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何建立執行個體

CloudSQLInstancePatchOperator

更新 Google Cloud 中 Cloud SQL 執行個體的設定(部分更新)。

如需參數定義,請查看 CloudSQLInstancePatchOperator

這是部分更新,因此僅會設定/更新主體中指定的設定值。其餘現有執行個體的配置將保持不變。

引數

定義執行個體的範例主體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

patch_body = {
    "name": INSTANCE_NAME,
    "settings": {
        "dataDiskSizeGb": 35,
        "maintenanceWindow": {"hour": 3, "day": 6, "updateTrack": "canary"},
        "userLabels": {"my-key-patch": "my-value-patch"},
    },
}

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_instance_patch_task = CloudSQLInstancePatchOperator(
    body=patch_body, instance=INSTANCE_NAME, task_id="sql_instance_patch_task"
)

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何修補執行個體

CloudSQLCloneInstanceOperator

複製 Cloud SQL 執行個體。

如需參數定義,請查看 CloudSQLCloneInstanceOperator

引數

對於 clone_context 物件屬性,請參閱 CloneContext

使用運算子

您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下

tests/system/google/cloud/cloud_sql/example_cloud_sql.py[原始碼]

sql_instance_clone = CloudSQLCloneInstanceOperator(
    instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)

範本化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "destination_instance_name",
    "gcp_conn_id",
    "api_version",
)

更多資訊

請參閱 Google Cloud SQL API 文件,了解如何複製執行個體

CloudSQLExecuteQueryOperator

在 Google Cloud SQL 執行個體中執行 DDL 或 DML SQL 查詢。不支援 DQL(從 Google Cloud SQL 檢索資料)。您可以執行 SELECT 查詢,但這些查詢的結果將被捨棄。

您可以指定各種連線方法來連線到正在運行的執行個體,從透過公用 IP 的純連線到透過 SSL 的公用 IP,或透過 Cloud SQL Proxy 的 TCP 和 Socket 連線。 Proxy 會根據運算子的需要動態下載和啟動/停止。

有一個 gcpcloudsql://* 連線類型,您應該使用它來定義您希望運算子使用哪種類型的連線。該連線是一種「元」類型連線。它本身不用於建立實際連線,但它決定了 CloudSQLDatabaseHook 是否應該啟動 Cloud SQL Proxy,以及應該動態建立哪種類型的資料庫連線(Postgres 或 MySQL)以透過公用 IP 位址或透過 Proxy 連線到 Cloud SQL。CloudSqlDatabaseHook 使用 CloudSqlProxyRunner 來管理 Cloud SQL Proxy 生命周期(每個任務都有自己的 Cloud SQL Proxy)

當您建立連線時,您應該使用 CloudSQLDatabaseHook 中描述的連線參數。您可以在下面看到所有可能連線類型的連線範例。這種連線可以在不同的任務(CloudSqlQueryOperator 的執行個體)之間重複使用。如果需要,每個任務都會啟動自己的 Proxy,並具有自己的 TCP 或 UNIX Socket。

如需參數定義,請查看 CloudSQLExecuteQueryOperator

由於查詢運算子可以執行任意查詢,因此不能保證它是等冪的。SQL 查詢設計者應將查詢設計為等冪的。例如,Postgres 和 MySQL 都支援 CREATE TABLE IF NOT EXISTS 語句,可用於以等冪方式建立表。

引數

如果您透過環境變數中定義的 AIRFLOW_CONN_{CONN_ID} URL 定義連線,請確保 URL 中的 URL 組件已進行 URL 編碼。 請參見下面的範例了解詳細資訊。

請注意,在 SSL 連線的情況下,您需要有一種機制,使憑證/金鑰檔案在所有可以運行運算子的 Worker 的預定義位置可用。例如,可以透過在所有 Worker 的相同路徑中掛載類似 NFS 的磁碟區來提供此機制。

適用於所有非 SSL 連線的範例連線定義。請注意,連線 URI 的所有組件都應進行 URL 編碼

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py[原始碼]

# Connect via proxy over TCP
CONNECTION_PROXY_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_use_tcp": "True",
    },
}

# Connect via proxy over UNIX socket (specific proxy version)
CONNECTION_PROXY_SOCKET_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_version": "v1.33.9",
        "sql_proxy_use_tcp": "False",
    },
}

# Connect directly via TCP (non-SSL)
CONNECTION_PUBLIC_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "False",
    },
}

適用於所有啟用 SSL 連線的類似連線定義

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[原始碼]

# Connect directly via TCP (SSL)
CONNECTION_PUBLIC_TCP_SSL_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "True",
    },
}

也可以透過環境變數配置連線(請注意,如果使用標準 AIRFLOW 符號透過環境變數定義連線,則運算子的連線 ID 與 AIRFLOW_CONN_{CONN_ID} 後綴大寫字母匹配)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py[原始碼]


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)
)

# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect via proxy over TCP (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=True".format(**mysql_kwargs)
)

# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
os.environ["AIRFLOW_CONN_PROXY_MYSQL_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[原始碼]


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

使用運算子

下面的範例運算子正在使用先前準備好的連線。它可能是來自 Airflow 資料庫的 connection_id 或透過環境變數配置的連線(請注意,如果使用標準 AIRFLOW 符號透過環境變數定義連線,則運算子的連線 ID 與 AIRFLOW_CONN_{CONN_ID} 後綴大寫字母匹配)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py[原始碼]

                query_task = CloudSQLExecuteQueryOperator(
                    gcp_cloudsql_conn_id=connection_id,
                    task_id=task_id,
                    sql=SQL,
                )

SSL 設定也可以在運算子層級指定。在這種情況下,連線中配置的 SSL 設定將被覆寫。其中一種方法是指定每個憑證檔案的路徑,如下所示。請注意,出於安全原因,這些檔案將被複製到具有最低限度所需權限的臨時位置。

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[原始碼]

        query_task = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_client_cert=ssl_cert_path,
            ssl_server_cert=ssl_server_cert_path,
            ssl_client_key=ssl_key_path,
        )

您也可以將 SSL 憑證儲存到 Google Cloud Secret Manager 中,並提供密鑰 ID。密鑰格式為: .. code-block:: python

{“sslcert”: “”, “sslkey”: “”, “sslrootcert”: “”}

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py[原始碼]

        query_task_secret = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_secret_id=secret_id,
        )

範本化

template_fields: Sequence[str] = (
    "sql",
    "gcp_cloudsql_conn_id",
    "gcp_conn_id",
    "ssl_server_cert",
    "ssl_client_cert",
    "ssl_client_key",
    "ssl_secret_id",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}

更多資訊

請參閱 Google Cloud SQL 文件,了解與 MySQLPostgreSQL 相關的 Proxy。

此條目是否有幫助?