Library for interacting with etcd-based service reporting. Intended to be used as a base image.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

etcdlib.py 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #!/usr/bin/env python3
  2. """Library to connect to etcd and manage docker service information."""
  3. import time
  4. import etcd
  5. class Connection:
  6. """A high-level connection to etcd.
  7. Manages a connection to etcd, and provides high-level methods for
  8. interacting with service records and related meta-data.
  9. Args:
  10. host (str): Hostname to connect to etcd on.
  11. port (int): Port to connect to etcd on.
  12. prefix (str): Etcd node under which the service information is stored.
  13. """
  14. def __init__(self, host, port, prefix):
  15. self._client = etcd.Client(host=host, port=port)
  16. self._prefix = prefix
  17. def _read(self, key, **kwargs):
  18. try:
  19. node = self._client.read(self._prefix + key, **kwargs)
  20. return node.value if node else None
  21. except etcd.EtcdKeyNotFound:
  22. return None
  23. def _read_recursive(self, key):
  24. try:
  25. return self._client.read(self._prefix + key, recursive=True)
  26. except etcd.EtcdKeyNotFound:
  27. return None
  28. def _read_map(self, key):
  29. node = self._read_recursive(key)
  30. if node:
  31. return {child.key.split('/')[-1]: child.value for child in node.children}
  32. else:
  33. return {}
  34. def _write(self, key, value):
  35. self._client.write(self._prefix + key, value)
  36. def _write_obj(self, prefix, obj):
  37. for key, value in obj.items():
  38. new_prefix = "%s/%s" % (prefix, key)
  39. if isinstance(value, dict):
  40. self._write_obj(new_prefix, value)
  41. else:
  42. self._write(new_prefix, str(value))
  43. def _delete(self, key):
  44. try:
  45. self._client.delete(self._prefix + key, recursive=True)
  46. except etcd.EtcdKeyNotFound:
  47. pass
  48. def wipe(self):
  49. """Deletes all service entries and related structures in etcd."""
  50. self._delete('')
  51. def add_containers(self, new_containers):
  52. """Writes the new containers' information to etcd."""
  53. for container in new_containers:
  54. name = container['name']
  55. print('Adding container %s' % name)
  56. self._write_obj('/containers/%s' % name, container)
  57. for label, value in container['labels'].items():
  58. self._write('/labels/%s/%s' % (label, name), value)
  59. for net, addr in container['net']['addr'].items():
  60. self._write('/networks/%s/%s' % (net, name), addr)
  61. self._write('/hosts/%s/%s' % (container['host'], name), name)
  62. self._notify_update()
  63. def remove_containers(self, old_containers):
  64. """Deletes the containers' entries from etcd."""
  65. for container in old_containers:
  66. name = container['name']
  67. print('Removing container %s' % name)
  68. self._delete('/containers/%s' % name)
  69. for label, _ in container['labels'].items():
  70. self._delete('/labels/%s/%s' % (label, name))
  71. for net, _ in container['net']['addr'].items():
  72. self._delete('/networks/%s/%s' % (net, name))
  73. self._delete('/hosts/%s/%s' % (container['host'], name))
  74. self._notify_update()
  75. def get_label(self, label):
  76. """Gets a map of container names to values for the given label."""
  77. return self._read_map('/labels/%s' % label)
  78. def get_networks(self, container):
  79. """Gets a map of network names to the specified container's IP on that network."""
  80. return self._read_map('/containers/%s/net/addr' % container)
  81. def _notify_update(self):
  82. print('Update completed', flush=True)
  83. self._write('/_updated', time.time())
  84. def wait_for_update(self):
  85. """Waits for an update to occur.
  86. When writing entries to etcd, a special _updated key is set to the
  87. current unix timestamp. This method watches that key until it is
  88. changed, blocking execution.
  89. """
  90. original_time = self._read('/_updated')
  91. new_time = original_time
  92. while new_time == original_time:
  93. try:
  94. new_time = self._read('/_updated', wait=True)
  95. except etcd.EtcdWatchTimedOut:
  96. new_time = self._read('/_updated')
  97. time.sleep(10)