Library for interacting with etcd-based service reporting. Intended to be used as a base image.
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

etcdlib.py 3.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #!/usr/bin/env python3
  2. """Library to connect to etcd and manage docker service information."""
  3. import time
  4. import etcd
  5. class Connection:
  6. """A high-level connection to etcd.
  7. Manages a connection to etcd, and provides high-level methods for
  8. interacting with service records and related meta-data.
  9. Args:
  10. host (str): Hostname to connect to etcd on.
  11. port (int): Port to connect to etcd on.
  12. prefix (str): Etcd node under which the service information is stored.
  13. """
  14. def __init__(self, host, port, prefix):
  15. self._client = etcd.Client(host=host, port=port)
  16. self._prefix = prefix
  17. def _read(self, key, **kwargs):
  18. try:
  19. node = self._client.read(self._prefix + key, **kwargs)
  20. return node.value if node else None
  21. except etcd.EtcdKeyNotFound:
  22. return None
  23. def _read_recursive(self, key):
  24. try:
  25. return self._client.read(self._prefix + key, recursive=True)
  26. except etcd.EtcdKeyNotFound:
  27. return None
  28. def _write(self, key, value):
  29. self._client.write(self._prefix + key, value)
  30. def _write_obj(self, prefix, obj):
  31. for key, value in obj.items():
  32. new_prefix = "%s/%s" % (prefix, key)
  33. if isinstance(value, dict):
  34. self._write_obj(new_prefix, value)
  35. else:
  36. self._write(new_prefix, str(value))
  37. def _delete(self, key):
  38. try:
  39. self._client.delete(self._prefix + key, recursive=True)
  40. except etcd.EtcdKeyNotFound:
  41. pass
  42. def wipe(self):
  43. """Deletes all service entries and related structures in etcd."""
  44. self._delete('')
  45. def add_containers(self, new_containers):
  46. """Writes the new containers' information to etcd."""
  47. for container in new_containers:
  48. name = container['name']
  49. print('Adding container %s' % name)
  50. self._write_obj('/containers/%s' % name, container)
  51. for label, value in container['labels'].items():
  52. self._write('/labels/%s/%s' % (label, name), value)
  53. for net, addr in container['net']['addr'].items():
  54. self._write('/networks/%s/%s' % (net, name), addr)
  55. self._write('/hosts/%s/%s' % (container['host'], name), name)
  56. self._notify_update()
  57. def remove_containers(self, old_containers):
  58. """Deletes the containers' entries from etcd."""
  59. for container in old_containers:
  60. name = container['name']
  61. print('Removing container %s' % name)
  62. self._delete('/containers/%s' % name)
  63. for label, _ in container['labels'].items():
  64. self._delete('/labels/%s/%s' % (label, name))
  65. for net, _ in container['net']['addr'].items():
  66. self._delete('/networks/%s/%s' % (net, name))
  67. self._delete('/hosts/%s/%s' % (container['host'], name))
  68. self._notify_update()
  69. def get_label(self, label):
  70. """Gets a map of container names to values for the given label."""
  71. node = self._read_recursive('/labels/%s' % label)
  72. if node:
  73. return {child.key.split('/')[-1]: child.value for child in node.children}
  74. else:
  75. return {}
  76. def _notify_update(self):
  77. print('Update completed', flush=True)
  78. self._write('/_updated', time.time())
  79. def wait_for_update(self):
  80. """Waits for an update to occur.
  81. When writing entries to etcd, a special _updated key is set to the
  82. current unix timestamp. This method watches that key until it is
  83. changed, blocking execution.
  84. """
  85. original_time = self._read('/_updated')
  86. new_time = original_time
  87. while new_time == original_time:
  88. try:
  89. new_time = self._read('/_updated', wait=True)
  90. except etcd.EtcdWatchTimedOut:
  91. new_time = self._read('/_updated')
  92. time.sleep(10)