HTTP 操作器

以下程式碼範例使用 http_default 連線,這表示請求會傳送到 httpbin 網站以執行基本的 HTTP 操作。

HttpSensor

使用 HttpSensor 來輪詢,直到 response_check 可調用物件評估為 true 為止。

在這裡,我們輪詢直到 httpbin 給我們包含 httpbin 的回應文字。

tests/system/http/example_http.py[原始碼]

task_http_sensor_check = HttpSensor(
    task_id="http_sensor_check",
    http_conn_id="http_default",
    endpoint="",
    request_params={},
    response_check=lambda response: "httpbin" in response.text,
    poke_interval=5,
    dag=dag,
)

此感測器也可以在可延遲模式中使用

tests/system/http/example_http.py[原始碼]

task_http_sensor_check_async = HttpSensor(
    task_id="http_sensor_check_async",
    http_conn_id="http_default",
    endpoint="",
    deferrable=True,
    request_params={},
    poke_interval=5,
    dag=dag,
)

HttpOperator

使用 HttpOperator 來呼叫 HTTP 請求並取得回應文字。

警告

透過 HttpOperator 設定 https 是違反直覺的

由於歷史原因,透過 HTTP 操作器設定 HTTPS 連線能力,嗯,是困難且違反直覺的。操作器預設為 http 協定,您可以透過 scheme 連線屬性來變更操作器使用的協定。然而,此欄位最初是為了資料庫類型的 URI 而新增到連線中,在資料庫類型的 URI 中,資料庫協定傳統上設定為 URI path 的第一個組件。因此,如果您想透過 URI 設定為 https 連線,您需要將 https 協定傳遞給 HttpOperator。儘管看起來很愚蠢,但您的連線 URI 會看起來像這樣:http://your_host:443/https。然後,如果您想在 HttpOperator 中使用不同的 URL 路徑,您應該在執行任務時將您的路徑作為 endpoint 參數傳遞。例如,要對 https://your_host:443/my_endpoint 執行查詢,您需要將 endpoint 參數設定為 my_endpoint。或者,如果您願意,您也可以對主機進行百分比編碼,包括 https:// 前綴,只要它包含 :// (百分比編碼 %3a%2f%2f),路徑的第一個組件將不會被用作協定。您的 URI 定義可能會看起來像 http://https%3a%2f%2fyour_host:443/。然而,在這種情況下,path 將完全不會被使用 - 如果您希望使用特定路徑發出請求,您仍然需要在任務中使用 endpoint 參數。儘管這違反直覺,但這是歷史上操作器/Hook 的運作方式,而且在不破壞向後相容性的情況下很難更改,因為還有其他操作器建立在 HttpOperator 之上,依賴該功能,而且已經有很多用戶在使用它了。

在第一個範例中,我們呼叫一個帶有 json 資料的 POST 請求,並且當我們收到相同的 json 資料時成功,否則任務將會失敗。

tests/system/http/example_http.py[原始碼]

task_post_op = HttpOperator(
    task_id="post_op",
    endpoint="post",
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    response_check=lambda response: response.json()["json"]["priority"] == 5,
    dag=dag,
)

在這裡,我們呼叫一個 GET 請求並傳遞參數給它。無論回應文字為何,任務都會成功。

tests/system/http/example_http.py[原始碼]

task_get_op = HttpOperator(
    task_id="get_op",
    method="GET",
    endpoint="get",
    data={"param1": "value1", "param2": "value2"},
    headers={},
    dag=dag,
)

HttpOperator 預設會將回應主體以文字形式傳回。如果您想在將回應傳遞到下一個下游任務之前修改它,請使用 response_filter。這在以下情況下很有用:

  • 您正在使用的 API 傳回大型 JSON 酬載,而您只對資料的子集感興趣

  • API 傳回 xml 或 csv 格式的資料,而您想將其轉換為 JSON

  • 您對回應的標頭感興趣,而不是主體

以下是從 REST API 檢索資料並僅傳回巢狀屬性而不是完整回應主體的範例。

tests/system/http/example_http.py[原始碼]

task_get_op_response_filter = HttpOperator(
    task_id="get_op_response_filter",
    method="GET",
    endpoint="get",
    response_filter=lambda response: response.json()["nested"]["property"],
    dag=dag,
)

在第三個範例中,我們執行 PUT 操作以根據提供給請求的資料來放置/設定資料。

tests/system/http/example_http.py[原始碼]

task_put_op = HttpOperator(
    task_id="put_op",
    method="PUT",
    endpoint="put",
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    dag=dag,
)

在此範例中,我們呼叫 DELETE 操作到 delete 端點。這次我們將表單資料傳遞給請求。

tests/system/http/example_http.py[原始碼]

task_del_op = HttpOperator(
    task_id="del_op",
    method="DELETE",
    endpoint="delete",
    data="some=data",
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    dag=dag,
)

在這裡,我們將表單資料傳遞給 POST 操作,這相當於一般的表單提交。

tests/system/http/example_http.py[原始碼]

task_post_op_formenc = HttpOperator(
    task_id="post_op_formenc",
    endpoint="post",
    data="name=Joe",
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    dag=dag,
)

HttpOperator 也允許重複呼叫 API 端點,通常用於循環遍歷其頁面。所有 API 回應都由 Operator 儲存在記憶體中,並以單一結果傳回。因此,與非分頁呼叫相比,它可能更耗費記憶體和 CPU。

預設情況下,HttpOperator 的結果將會變成 Response.text 的列表(而不是單個 Response.text 物件)。

範例 - 假設您的 API 傳回包含游標的 JSON 主體:您可以編寫一個 pagination_function,它將接收請求的原始 request.Response 物件,並根據此游標產生新的請求參數(作為 dict)。HttpOperator 將重複呼叫 API,直到該函數停止傳回任何內容。

tests/system/http/example_http.py[原始碼]



def get_next_page_cursor(response) -> dict | None:
    """
    Take the raw `request.Response` object, and check for a cursor.
    If a cursor exists, this function creates and return parameters to call
    the next page of result.
    """
    next_cursor = response.json().get("cursor")
    if next_cursor:
        return dict(data={"cursor": next_cursor})
    return None


task_get_paginated = HttpOperator(
    task_id="get_paginated",
    method="GET",
    endpoint="get",
    data={"cursor": ""},
    pagination_function=get_next_page_cursor,
    dag=dag,
)

這個條目對您有幫助嗎?