WebHook broker that accepts notifications from multiple platforms and performs simple actions in response
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

gitea.py 1.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import os
  2. from jsonref import requests
  3. from ..common import get_hook_url
  4. from .service import Service
  5. class Gitea(Service):
  6. def __init__(self, url, token, install_hooks=False):
  7. self._url = url
  8. self._token = token
  9. self._install_hooks = install_hooks
  10. def get_repos(self):
  11. repos = self._request("get", f"user/repos").json()
  12. for repo in repos:
  13. if self._install_hooks:
  14. self._maybe_install_gitea_hook(repo["full_name"])
  15. yield repo["full_name"], repo["ssh_url"], repo["clone_url"]
  16. def _maybe_install_gitea_hook(self, project):
  17. hook_url = get_hook_url("gitea", project)
  18. path = f"repos/{project}/hooks"
  19. hooks = self._request("get", path).json()
  20. if hook_url not in [hook["config"]["url"] for hook in hooks]:
  21. body = {
  22. "active": True,
  23. "config": {"content_type": "json", "url": hook_url},
  24. "events": [
  25. "create",
  26. "delete",
  27. "fork",
  28. "push",
  29. "issues",
  30. "issue_comment",
  31. "pull_request",
  32. "repository",
  33. "release",
  34. ],
  35. "type": "gitea",
  36. }
  37. self._request("post", path, json=body).json()
  38. def _request(self, method, api_path, **kwargs):
  39. if "params" not in kwargs:
  40. kwargs["params"] = {}
  41. kwargs["params"]["access_token"] = self._token
  42. return requests.request(method, f"{self._url}api/v1/{api_path}", **kwargs)
  43. def gitea_factory():
  44. return Gitea(os.environ["LAS_GITEA_URL"], os.environ["LAS_GITEA_TOKEN"])
  45. Service.add_factory(gitea_factory)