SFTP 感測器

在伺服器中使用 SFTP 協定尋找特定檔案或具有特定模式的檔案。若要取得更多關於此感測器的資訊,請造訪 SFTPSensor

tests/system/sftp/example_sftp_sensor.py[原始碼]

sftp_with_operator = SFTPSensor(task_id="sftp_operator", path=FULL_FILE_PATH, poke_interval=10)

我們也可以使用 Taskflow API。它接受與 SFTPSensor 相同的參數以及 -

op_args (選填)

在呼叫您的可呼叫物件時將解壓縮的位置引數列表 (樣板化)

op_kwargs (選填)

將在您的函式中解壓縮的關鍵字引數字典 (樣板化)

Python 可呼叫物件傳回的任何內容都會放入 XCom。

tests/system/sftp/example_sftp_sensor.py[原始碼]

@task.sftp_sensor(  # type: ignore[attr-defined]
    task_id="sftp_sensor",  # type: ignore[attr-defined]
    path=FULL_FILE_PATH,
    poke_interval=10,
)
def sftp_sensor_decorator():
    print("Files were successfully found!")
    # add your logic
    return "done"

檢查 SFTP 伺服器上檔案是否存在(在可延遲模式下)

tests/system/sftp/example_sftp_sensor.py[原始碼]

sftp_sensor_with_async = SFTPSensor(
    task_id="sftp_operator_async", path=FULL_FILE_PATH, poke_interval=10, deferrable=True
)

這個條目是否有幫助?