DatabricksSqlOperator

使用 DatabricksSqlOperatorDatabricks SQL 倉儲Databricks 叢集 上執行 SQL。

運算子的使用方式

運算子針對已設定的倉儲執行給定的 SQL 查詢。唯一需要的參數是

  • sql - 要執行的 SQL 查詢。有 3 種指定 SQL 查詢的方式

    1. 包含 SQL 陳述式的簡單字串。

    2. 代表 SQL 陳述式的字串列表。

    3. 包含 SQL 查詢的檔案名稱。檔案必須具有 .sql 副檔名。每個查詢應以 ;<new_line> 結尾

  • sql_warehouse_name (要使用的 Databricks SQL 倉儲名稱) 或 http_path (Databricks SQL 倉儲或 Databricks 叢集的 HTTP 路徑) 其中之一。

其他參數是選用的,可以在類別文件中找到。

範例

選取資料

以下是如何使用 DatabricksSqlOperator 從表格中選取資料的範例

tests/system/databricks/example_databricks_sql.py[原始碼]

    # Example of using the Databricks SQL Operator to select data.
    select = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="select_data",
        sql="select * from default.my_airflow_table",
    )

將資料選取到檔案中

以下是如何使用 DatabricksSqlOperator 從表格中選取資料並儲存到檔案中的範例

tests/system/databricks/example_databricks_sql.py[原始碼]

    # Example of using the Databricks SQL Operator to select data into a file with JSONL format.
    select_into_file = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="select_data_into_file",
        sql="select * from default.my_airflow_table",
        output_path="/tmp/1.jsonl",
        output_format="jsonl",
    )

執行多個陳述式

以下是如何使用 DatabricksSqlOperator 執行多個 SQL 陳述式的範例

tests/system/databricks/example_databricks_sql.py[原始碼]

    # Example of using the Databricks SQL Operator to perform multiple operations.
    create = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="create_and_populate_table",
        sql=[
            "drop table if exists default.my_airflow_table",
            "create table default.my_airflow_table(id int, v string)",
            "insert into default.my_airflow_table values (1, 'test 1'), (2, 'test 2')",
        ],
    )

從檔案執行多個陳述式

以下是如何使用 DatabricksSqlOperator 從檔案執行陳述式的範例

tests/system/databricks/example_databricks_sql.py[原始碼]

    # Example of using the Databricks SQL Operator to select data.
    # SQL statements should be in the file with name test.sql
    create_file = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="create_and_populate_from_file",
        sql="test.sql",
    )

DatabricksSqlSensor

使用 DatabricksSqlSensor 為可透過 Databricks SQL 倉儲或互動式叢集存取的表格執行感測器。

感測器的使用方式

感測器執行使用者提供的 SQL 陳述式。唯一需要的參數是

  • sql - 感測器要執行的 SQL 查詢。

  • sql_warehouse_name (要使用的 Databricks SQL 倉儲名稱) 或 http_path (Databricks SQL 倉儲或 Databricks 叢集的 HTTP 路徑) 其中之一。

其他參數是選用的,可以在類別文件中找到。

範例

設定要與感測器搭配使用的 Databricks 連線。

tests/system/databricks/example_databricks_sensors.py[原始碼]

# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"

使用 SQL 陳述式探測特定表格

tests/system/databricks/example_databricks_sensors.py[原始碼]

# Example of using the Databricks SQL Sensor to check existence of data in a table.
sql_sensor = DatabricksSqlSensor(
    databricks_conn_id=connection_id,
    sql_warehouse_name=sql_warehouse_name,
    catalog="hive_metastore",
    task_id="sql_sensor_task",
    sql="select * from hive_metastore.temp.sample_table_3 limit 1",
    timeout=60 * 2,
)

DatabricksPartitionSensor

感測器是一種特殊的運算子,其設計目的只做一件事 - 等待某事發生。它可以是基於時間的,或是等待檔案,或是外部事件,但它們所做的只是等待直到某事發生,然後成功,以便它們的下游任務可以執行。

對於 Databricks 分割區感測器,我們會檢查分割區及其相關值是否存在,如果不存在,它會等待直到分割區值到達。等待時間和檢查間隔可以分別在 timeout 和 poke_interval 參數中設定。

使用 DatabricksPartitionSensor 為可透過 Databricks SQL 倉儲或互動式叢集存取的表格執行感測器。

感測器的使用方式

感測器接受使用者提供的表格名稱和分割區名稱、值,並產生 SQL 查詢來檢查指定的分割區名稱、值是否存在於指定的表格中。

需要的參數是

  • table_name (分割區檢查的表格名稱)。

  • partitions (要檢查的分割區名稱)。

  • partition_operator (分割區的比較運算子,用於值的範圍或限制,例如 partition_name >= partition_value)。支援Databricks 比較運算子

  • sql_warehouse_name (要使用的 Databricks SQL 倉儲名稱) 或 http_path (Databricks SQL 倉儲或 Databricks 叢集的 HTTP 路徑) 其中之一。

其他參數是選用的,可以在類別文件中找到。

範例

設定要與感測器搭配使用的 Databricks 連線。

tests/system/databricks/example_databricks_sensors.py[原始碼]

# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"

探測特定表格是否存在資料/分割區

tests/system/databricks/example_databricks_sensors.py[原始碼]

# Example of using the Databricks Partition Sensor to check the presence
# of the specified partition(s) in a table.
partition_sensor = DatabricksPartitionSensor(
    databricks_conn_id=connection_id,
    sql_warehouse_name=sql_warehouse_name,
    catalog="hive_metastore",
    task_id="partition_sensor_task",
    table_name="sample_table_2",
    schema="temp",
    partitions={"date": "2023-01-03", "name": ["abc", "def"]},
    partition_operator="=",
    timeout=60 * 2,
)

此條目是否有幫助?