運算子¶
使用 GithubOperator
以在 GitHub 中執行操作。
您可以透過使用 GithubOperator
並從頂層 PyGithub 方法傳遞 github_method
和 github_method_args
來建立您自己的運算子。
您可以進一步使用您喜歡的 result_processor
Callable 處理結果。
列出使用者擁有的所有儲存庫的範例,client.get_user().get_repos() 可以如下實作
github_list_repos = GithubOperator(
task_id="github_list_repos",
github_method="get_user",
result_processor=lambda user: logger.info(list(user.get_repos())),
)
列出儲存庫中標籤的範例,client.get_repo(full_name_or_id=’apache/airflow’).get_tags() 可以如下實作
list_repo_tags = GithubOperator(
task_id="list_repo_tags",
github_method="get_repo",
github_method_args={"full_name_or_id": "apache/airflow"},
result_processor=lambda repo: logger.info(list(repo.get_tags())),
)
感測器¶
您可以使用 GithubSensor
建立您自己的感測器,
您也可以使用 BaseGithubRepositorySensor
在儲存庫上實作您自己的感測器,GithubTagSensor
就是一個範例
使用 GithubTagSensor
以等待 GitHub 中標籤的建立。
標籤 v1.0 的範例
tag_sensor = GithubTagSensor(
task_id="example_tag_sensor",
tag_name="v1.0",
repository_name="apache/airflow",
timeout=60,
poke_interval=10,
)
類似的功能可以透過直接使用 GithubSensor
來達成。
def tag_checker(repo: Any, tag_name: str) -> bool | None:
result = None
try:
if repo is not None and tag_name is not None:
all_tags = [x.name for x in repo.get_tags()]
result = tag_name in all_tags
except GithubException as github_error: # type: ignore[misc]
raise AirflowException(f"Failed to execute GithubSensor, error: {github_error}")
except Exception as e:
raise AirflowException(f"GitHub operator error: {e}")
return result
github_sensor = GithubSensor(
task_id="example_sensor",
method_name="get_repo",
method_params={"full_name_or_id": "apache/airflow"},
result_processor=lambda repo: tag_checker(repo, "v1.0"),
timeout=60,
poke_interval=10,
)