WebHook broker that accepts notifications from multiple platforms and performs simple actions in response
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.py 2.3KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import hashlib
  2. import jenkins
  3. import requests
  4. import os
  5. from bs4 import BeautifulSoup
  6. from flask import Flask
  7. BASE_URL = os.environ["LAS_BASE_URL"]
  8. SECRET = os.environ["LAS_SECRET"]
  9. server = jenkins.Jenkins(
  10. os.environ["LAS_JENKINS_URL"],
  11. username=os.environ["LAS_JENKINS_USER"],
  12. password=os.environ["LAS_JENKINS_PASSWORD"],
  13. )
  14. def get_hook_url(service, identifier):
  15. nonce = (service + SECRET + identifier).encode("ascii")
  16. token = hashlib.sha256(nonce).hexdigest()
  17. return f"{BASE_URL}hooks/{service}/{identifier}/{token}"
  18. def get_jenkins_jobs():
  19. for job in server.get_all_jobs():
  20. config = BeautifulSoup(server.get_job_config(job["fullname"]), features="xml")
  21. for git_config in config.find_all("scm", class_="hudson.plugins.git.GitSCM"):
  22. branch_spec = git_config.find("branches").find("name").text
  23. yield job["fullname"], branch_spec, git_config.find("url").text
  24. def maybe_install_gitea_hook(project):
  25. gitea_url = f"{os.environ['LAS_GITEA_URL']}api/v1/repos/{project}/hooks"
  26. hook_url = get_hook_url("gitea", project)
  27. hooks = requests.get(
  28. gitea_url, params={"access_token": os.environ["LAS_GITEA_TOKEN"]}
  29. ).json()
  30. if hook_url not in [hook["config"]["url"] for hook in hooks]:
  31. body = {
  32. "active": True,
  33. "config": {"content_type": "json", "url": hook_url},
  34. "events": [
  35. "create",
  36. "delete",
  37. "fork",
  38. "push",
  39. "issues",
  40. "issue_comment",
  41. "pull_request",
  42. "repository",
  43. "release",
  44. ],
  45. "type": "gitea",
  46. }
  47. requests.post(
  48. gitea_url,
  49. json=body,
  50. params={"access_token": "5b8de94a7201bc923e99813850327caf75b85e70"},
  51. ).json()
  52. def get_gitea_repos():
  53. repos = requests.get(
  54. f"{os.environ['LAS_GITEA_URL']}api/v1/user/repos",
  55. params={"access_token": os.environ["LAS_GITEA_TOKEN"]},
  56. ).json()
  57. for repo in repos:
  58. maybe_install_gitea_hook(repo["full_name"])
  59. yield repo["full_name"], repo["ssh_url"], repo["clone_url"]
  60. app = Flask(__name__)
  61. @app.route("/")
  62. def handle_index():
  63. return app.send_static_file("index.html")
  64. app.run()