Google Cloud SQL 運算子¶
先決條件任務¶
要使用這些運算子,您必須完成幾件事
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件所述。
啟用 API,如 Cloud Console 文件所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
CloudSQLCreateInstanceDatabaseOperator¶
在 Cloud SQL 執行個體內建立新的資料庫。
如需參數定義,請查看 CloudSQLCreateInstanceDatabaseOperator
。
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
body=db_create_body, instance=INSTANCE_NAME, task_id="sql_db_create_task"
)
範例請求主體
db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project": PROJECT_ID}
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,了解如何在執行個體內建立新的資料庫。
CloudSQLDeleteInstanceDatabaseOperator¶
從 Cloud SQL 執行個體刪除資料庫。
如需參數定義,請查看 CloudSQLDeleteInstanceDatabaseOperator
。
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
instance=INSTANCE_NAME,
database=DB_NAME,
task_id="sql_db_delete_task",
trigger_rule=TriggerRule.ALL_DONE,
)
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"database",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
CloudSQLPatchInstanceDatabaseOperator¶
使用修補語意更新 Cloud SQL 執行個體內資料庫的資源資訊。請參閱:https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
如需參數定義,請查看 CloudSQLPatchInstanceDatabaseOperator
。
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_db_patch_task = CloudSQLPatchInstanceDatabaseOperator(
body=db_patch_body,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id="sql_db_patch_task",
)
範例請求主體
db_patch_body = {"charset": "utf16", "collation": "utf16_general_ci"}
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"database",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
CloudSQLDeleteInstanceOperator¶
刪除 Google Cloud 中的 Cloud SQL 執行個體。
它也用於刪除讀取複本和容錯移轉複本。
如需參數定義,請查看 CloudSQLDeleteInstanceOperator
。
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
instance=INSTANCE_NAME, task_id="sql_instance_delete_task", trigger_rule=TriggerRule.ALL_DONE
)
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,了解如何刪除 SQL 執行個體。
CloudSQLExportInstanceOperator¶
將資料從 Cloud SQL 執行個體匯出到 Cloud Storage 儲存桶,作為 SQL 傾印或 CSV 檔案。
注意
此運算子是等冪的。如果使用相同的匯出檔案 URI 多次執行,GCS 中的匯出檔案將被簡單地覆寫。
如需參數定義,請查看 CloudSQLExportInstanceOperator
。
引數¶
定義匯出操作的範例主體
export_body = {
"exportContext": {
"fileType": "sql",
"uri": FILE_URI,
"sqlExportOptions": {"schemaOnly": False},
"offload": True,
}
}
export_body_deferrable = {
"exportContext": {
"fileType": "sql",
"uri": FILE_URI_DEFERRABLE,
"sqlExportOptions": {"schemaOnly": False},
"offload": True,
}
}
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_export_task = CloudSQLExportInstanceOperator(
body=export_body, instance=INSTANCE_NAME, task_id="sql_export_task"
)
對於所有這些操作,您也可以在可延遲模式下使用運算子
sql_export_def_task = CloudSQLExportInstanceOperator(
body=export_body_deferrable,
instance=INSTANCE_NAME,
task_id="sql_export_def_task",
deferrable=True,
)
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
疑難排解¶
如果您在 Google Cloud 中收到「未授權」錯誤,請確保 Cloud SQL 執行個體的服務帳戶已授權寫入選定的 GCS 儲存桶。
與 GCS 通訊的不是在 Airflow 中設定的服務帳戶,而是特定 Cloud SQL 執行個體的服務帳戶。
要授予服務帳戶對 GCS 儲存桶的適當 WRITE 權限,您可以使用 GCSBucketCreateAclEntryOperator
,如範例所示
sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
entity=f"user-{service_account_email}",
role="WRITER",
bucket=file_url_split[1], # netloc (bucket)
task_id="sql_gcp_add_bucket_permission_task",
)
CloudSQLImportInstanceOperator¶
將資料從 Cloud Storage 中的 SQL 傾印或 CSV 檔案匯入到 Cloud SQL 執行個體。
CSV 匯入:¶
對於 CSV 匯入,此運算子不是等冪的。如果同一個檔案被多次匯入,匯入的資料將在資料庫中重複。此外,如果存在任何唯一約束,重複匯入可能會導致錯誤。
SQL 匯入:¶
如果 SQL 匯入也是由 Cloud SQL 匯出的,則此運算子對於 SQL 匯入是等冪的。匯出的 SQL 包含所有要匯入的表的 'DROP TABLE IF EXISTS' 語句。
如果匯入檔案是以不同的方式產生的,則不保證等冪性。必須在 SQL 檔案層級確保等冪性。
如需參數定義,請查看 CloudSQLImportInstanceOperator
。
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_import_task = CloudSQLImportInstanceOperator(
body=import_body, instance=INSTANCE_NAME, task_id="sql_import_task"
)
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
疑難排解¶
如果您在 Google Cloud 中收到「未授權」錯誤,請確保 Cloud SQL 執行個體的服務帳戶已授權從選定的 GCS 物件讀取。
與 GCS 通訊的不是在 Airflow 中設定的服務帳戶,而是特定 Cloud SQL 執行個體的服務帳戶。
要授予服務帳戶對 GCS 物件的適當 READ 權限,您可以使用 GCSBucketCreateAclEntryOperator
,如範例所示
sql_gcp_add_object_permission_task = GCSObjectCreateAclEntryOperator(
entity=f"user-{service_account_email}",
role="READER",
bucket=file_url_split[1], # netloc (bucket)
object_name=file_url_split[2][1:], # path (strip first '/')
task_id="sql_gcp_add_object_permission_task",
)
CloudSQLCreateInstanceOperator¶
在 Google Cloud 中建立新的 Cloud SQL 執行個體。
它也用於建立讀取複本。
如需參數定義,請查看 CloudSQLCreateInstanceOperator
。
如果已存在同名的執行個體,則不會採取任何動作,並且運算子將成功。
引數¶
定義具有容錯移轉複本的執行個體範例主體
body = {
"name": INSTANCE_NAME,
"settings": {
"tier": "db-n1-standard-1",
"backupConfiguration": {"binaryLogEnabled": True, "enabled": True, "startTime": "05:00"},
"activationPolicy": "ALWAYS",
"dataDiskSizeGb": 30,
"dataDiskType": "PD_SSD",
"databaseFlags": [],
"ipConfiguration": {
"ipv4Enabled": True,
"requireSsl": True,
},
"locationPreference": {"zone": "europe-west4-a"},
"maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
"pricingPlan": "PER_USE",
"storageAutoResize": True,
"storageAutoResizeLimit": 0,
"userLabels": {"my-key": "my-value"},
},
"databaseVersion": "MYSQL_5_7",
"region": "europe-west4",
}
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_instance_create_task = CloudSQLCreateInstanceOperator(
body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
)
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
CloudSQLInstancePatchOperator¶
更新 Google Cloud 中 Cloud SQL 執行個體的設定(部分更新)。
如需參數定義,請查看 CloudSQLInstancePatchOperator
。
這是部分更新,因此僅會設定/更新主體中指定的設定值。其餘現有執行個體的配置將保持不變。
引數¶
定義執行個體的範例主體
patch_body = {
"name": INSTANCE_NAME,
"settings": {
"dataDiskSizeGb": 35,
"maintenanceWindow": {"hour": 3, "day": 6, "updateTrack": "canary"},
"userLabels": {"my-key-patch": "my-value-patch"},
},
}
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_instance_patch_task = CloudSQLInstancePatchOperator(
body=patch_body, instance=INSTANCE_NAME, task_id="sql_instance_patch_task"
)
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
CloudSQLCloneInstanceOperator¶
複製 Cloud SQL 執行個體。
如需參數定義,請查看 CloudSQLCloneInstanceOperator
。
引數¶
對於 clone_context
物件屬性,請參閱 CloneContext
使用運算子¶
您可以建立帶有或不帶有專案 ID 的運算子。如果缺少專案 ID,它將從使用的 Google Cloud 連線中檢索。兩種變體都顯示如下
sql_instance_clone = CloudSQLCloneInstanceOperator(
instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)
範本化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"destination_instance_name",
"gcp_conn_id",
"api_version",
)
CloudSQLExecuteQueryOperator¶
在 Google Cloud SQL 執行個體中執行 DDL 或 DML SQL 查詢。不支援 DQL(從 Google Cloud SQL 檢索資料)。您可以執行 SELECT 查詢,但這些查詢的結果將被捨棄。
您可以指定各種連線方法來連線到正在運行的執行個體,從透過公用 IP 的純連線到透過 SSL 的公用 IP,或透過 Cloud SQL Proxy 的 TCP 和 Socket 連線。 Proxy 會根據運算子的需要動態下載和啟動/停止。
有一個 gcpcloudsql://*
連線類型,您應該使用它來定義您希望運算子使用哪種類型的連線。該連線是一種「元」類型連線。它本身不用於建立實際連線,但它決定了 CloudSQLDatabaseHook
是否應該啟動 Cloud SQL Proxy,以及應該動態建立哪種類型的資料庫連線(Postgres 或 MySQL)以透過公用 IP 位址或透過 Proxy 連線到 Cloud SQL。CloudSqlDatabaseHook
使用 CloudSqlProxyRunner
來管理 Cloud SQL Proxy 生命周期(每個任務都有自己的 Cloud SQL Proxy)
當您建立連線時,您應該使用 CloudSQLDatabaseHook
中描述的連線參數。您可以在下面看到所有可能連線類型的連線範例。這種連線可以在不同的任務(CloudSqlQueryOperator
的執行個體)之間重複使用。如果需要,每個任務都會啟動自己的 Proxy,並具有自己的 TCP 或 UNIX Socket。
如需參數定義,請查看 CloudSQLExecuteQueryOperator
。
由於查詢運算子可以執行任意查詢,因此不能保證它是等冪的。SQL 查詢設計者應將查詢設計為等冪的。例如,Postgres 和 MySQL 都支援 CREATE TABLE IF NOT EXISTS 語句,可用於以等冪方式建立表。
引數¶
如果您透過環境變數中定義的 AIRFLOW_CONN_{CONN_ID}
URL 定義連線,請確保 URL 中的 URL 組件已進行 URL 編碼。 請參見下面的範例了解詳細資訊。
請注意,在 SSL 連線的情況下,您需要有一種機制,使憑證/金鑰檔案在所有可以運行運算子的 Worker 的預定義位置可用。例如,可以透過在所有 Worker 的相同路徑中掛載類似 NFS 的磁碟區來提供此機制。
適用於所有非 SSL 連線的範例連線定義。請注意,連線 URI 的所有組件都應進行 URL 編碼
# Connect via proxy over TCP
CONNECTION_PROXY_TCP_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "True",
"sql_proxy_use_tcp": "True",
},
}
# Connect via proxy over UNIX socket (specific proxy version)
CONNECTION_PROXY_SOCKET_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "True",
"sql_proxy_version": "v1.33.9",
"sql_proxy_use_tcp": "False",
},
}
# Connect directly via TCP (non-SSL)
CONNECTION_PUBLIC_TCP_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "False",
"use_ssl": "False",
},
}
適用於所有啟用 SSL 連線的類似連線定義
# Connect directly via TCP (SSL)
CONNECTION_PUBLIC_TCP_SSL_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "False",
"use_ssl": "True",
},
}
也可以透過環境變數配置連線(請注意,如果使用標準 AIRFLOW 符號透過環境變數定義連線,則運算子的連線 ID 與 AIRFLOW_CONN_{CONN_ID}
後綴大寫字母匹配)
# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).
postgres_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
)
# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_SOCKET"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_version=v1.13&"
"sql_proxy_use_tcp=False".format(**postgres_kwargs)
)
# Postgres: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=False".format(**postgres_kwargs)
)
# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**postgres_kwargs)
)
mysql_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# MySQL: connect via proxy over TCP (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_MYSQL_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_version=v1.13&"
"sql_proxy_use_tcp=True".format(**mysql_kwargs)
)
# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
os.environ["AIRFLOW_CONN_PROXY_MYSQL_SOCKET"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_use_tcp=False".format(**mysql_kwargs)
)
# MySQL: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=False".format(**mysql_kwargs)
)
# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).
postgres_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**postgres_kwargs)
)
mysql_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# MySQL: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
使用運算子¶
下面的範例運算子正在使用先前準備好的連線。它可能是來自 Airflow 資料庫的 connection_id 或透過環境變數配置的連線(請注意,如果使用標準 AIRFLOW 符號透過環境變數定義連線,則運算子的連線 ID 與 AIRFLOW_CONN_{CONN_ID}
後綴大寫字母匹配)
query_task = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=connection_id,
task_id=task_id,
sql=SQL,
)
SSL 設定也可以在運算子層級指定。在這種情況下,連線中配置的 SSL 設定將被覆寫。其中一種方法是指定每個憑證檔案的路徑,如下所示。請注意,出於安全原因,這些檔案將被複製到具有最低限度所需權限的臨時位置。
query_task = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=conn_id,
task_id=task_id,
sql=SQL,
ssl_client_cert=ssl_cert_path,
ssl_server_cert=ssl_server_cert_path,
ssl_client_key=ssl_key_path,
)
您也可以將 SSL 憑證儲存到 Google Cloud Secret Manager 中,並提供密鑰 ID。密鑰格式為: .. code-block:: python
{“sslcert”: “”, “sslkey”: “”, “sslrootcert”: “”}
query_task_secret = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=conn_id,
task_id=task_id,
sql=SQL,
ssl_secret_id=secret_id,
)
範本化¶
template_fields: Sequence[str] = (
"sql",
"gcp_cloudsql_conn_id",
"gcp_conn_id",
"ssl_server_cert",
"ssl_client_cert",
"ssl_client_key",
"ssl_secret_id",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}
更多資訊¶
請參閱 Google Cloud SQL 文件,了解與 MySQL 和 PostgreSQL 相關的 Proxy。