WebHook broker that accepts notifications from multiple platforms and performs simple actions in response
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

gitea.py 1.6KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from jsonref import requests
  2. from ..common import get_hook_url
  3. from .service import Service
  4. class Gitea(Service):
  5. def __init__(self, url, token, install_hooks=False):
  6. super().__init__("gitea")
  7. self._url = url
  8. self._token = token
  9. self._install_hooks = install_hooks
  10. def get_repos(self):
  11. repos = self._request("get", f"user/repos").json()
  12. for repo in repos:
  13. if self._install_hooks:
  14. self._maybe_install_gitea_hook(repo["full_name"])
  15. yield repo["full_name"], repo["ssh_url"], repo["clone_url"]
  16. def _maybe_install_gitea_hook(self, project):
  17. hook_url = get_hook_url(self.type, project)
  18. path = f"repos/{project}/hooks"
  19. hooks = self._request("get", path).json()
  20. if hook_url not in [hook["config"]["url"] for hook in hooks]:
  21. body = {
  22. "active": True,
  23. "config": {"content_type": "json", "url": hook_url},
  24. "events": [
  25. "create",
  26. "delete",
  27. "fork",
  28. "push",
  29. "issues",
  30. "issue_comment",
  31. "pull_request",
  32. "repository",
  33. "release",
  34. ],
  35. "type": "gitea",
  36. }
  37. self._request("post", path, json=body).json()
  38. def _request(self, method, api_path, **kwargs):
  39. if "params" not in kwargs:
  40. kwargs["params"] = {}
  41. kwargs["params"]["access_token"] = self._token
  42. return requests.request(method, f"{self._url}api/v1/{api_path}", **kwargs)