import functools
import random
import threading
import time
import grpc
import grpc._channel
import queue
from . import (
etcdrpc,
exceptions,
leases,
locks,
members,
transactions,
utils,
watch,
)
_EXCEPTIONS_BY_CODE = {
grpc.StatusCode.INTERNAL: exceptions.InternalServerError,
grpc.StatusCode.UNAVAILABLE: exceptions.ConnectionFailedError,
grpc.StatusCode.DEADLINE_EXCEEDED: exceptions.ConnectionTimeoutError,
grpc.StatusCode.FAILED_PRECONDITION: exceptions.PreconditionFailedError,
}
_FAILED_EP_CODES = [
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.DEADLINE_EXCEEDED,
grpc.StatusCode.INTERNAL
]
class Transactions(object):
def __init__(self):
self.value = transactions.Value
self.version = transactions.Version
self.create = transactions.Create
self.mod = transactions.Mod
self.put = transactions.Put
self.get = transactions.Get
self.delete = transactions.Delete
self.txn = transactions.Txn
class KVMetadata(object):
def __init__(self, keyvalue, header):
self.key = keyvalue.key
self.create_revision = keyvalue.create_revision
self.mod_revision = keyvalue.mod_revision
self.version = keyvalue.version
self.lease_id = keyvalue.lease
self.response_header = header
class Status(object):
def __init__(self, version, db_size, leader, raft_index, raft_term):
self.version = version
self.db_size = db_size
self.leader = leader
self.raft_index = raft_index
self.raft_term = raft_term
class Alarm(object):
def __init__(self, alarm_type, member_id):
self.alarm_type = alarm_type
self.member_id = member_id
[docs]class Endpoint(object):
"""Represents an etcd cluster endpoint.
:param str host: Endpoint host
:param int port: Endpoint port
:param bool secure: Use secure channel, default True
:param creds: Credentials to use for secure channel, required if
secure=True
:type creds: grpc.ChannelCredentials, optional
:param time_retry: Seconds to wait before retrying this endpoint after
failure, default 300.0
:type time_retry: int or float
:param opts: Additional gRPC options
:type opts: dict, optional
"""
def __init__(self, host, port, secure=True, creds=None, time_retry=300.0,
opts=None):
self.host = host
self.netloc = "{host}:{port}".format(host=host, port=port)
self.secure = secure
self.protocol = 'https' if secure else 'http'
if self.secure and creds is None:
raise ValueError(
'Please set TLS credentials for secure connections')
self.credentials = creds
self.time_retry = time_retry
self.in_use = False
self.last_failed = 0
self.channel = self._mkchannel(opts)
def close(self):
self.channel.close()
[docs] def fail(self):
"""Transition the endpoint to a failed state."""
self.in_use = False
self.last_failed = time.time()
[docs] def use(self):
"""Transition the endpoint to an active state."""
if self.is_failed():
raise ValueError('Trying to use a failed node')
self.in_use = True
self.last_failed = 0
return self.channel
def __str__(self):
return "Endpoint({}://{})".format(self.protocol, self.netloc)
[docs] def is_failed(self):
"""Check if the current endpoint is failed."""
return ((time.time() - self.last_failed) < self.time_retry)
def _mkchannel(self, opts):
if self.secure:
return grpc.secure_channel(self.netloc, self.credentials,
options=opts)
else:
return grpc.insecure_channel(self.netloc, options=opts)
class EtcdTokenCallCredentials(grpc.AuthMetadataPlugin):
"""Metadata wrapper for raw access token credentials."""
def __init__(self, access_token):
self._access_token = access_token
def __call__(self, context, callback):
metadata = (('token', self._access_token),)
callback(metadata, None)
[docs]class MultiEndpointEtcd3Client(object):
"""
etcd v3 API client with multiple endpoints.
When failover is enabled, requests still will not be auto-retried.
Instead, the application may retry the request, and the ``Etcd3Client``
will then attempt to send it to a different endpoint that has not recently
failed. If all configured endpoints have failed and are not ready to be
retried, an ``exceptions.NoServerAvailableError`` will be raised.
:param endpoints: Endpoints to use in lieu of host and port
:type endpoints: Iterable(Endpoint), optional
:param timeout: Timeout for all RPC in seconds
:type timeout: int or float, optional
:param user: Username for authentication
:type user: str, optional
:param password: Password for authentication
:type password: str, optional
:param bool failover: Failover between endpoints, default False
"""
def __init__(self, endpoints=None, timeout=None, user=None, password=None,
failover=False):
self.metadata = None
self.failover = failover
# Cache GRPC stubs here
self._stubs = {}
# Step 1: setup endpoints
self.endpoints = {ep.netloc: ep for ep in endpoints}
self._current_endpoint_label = random.choice(
list(self.endpoints.keys())
)
# Step 2: if auth is enabled, call the auth endpoint
self.timeout = timeout
self.call_credentials = None
cred_params = [c is not None for c in (user, password)]
if all(cred_params):
auth_request = etcdrpc.AuthenticateRequest(
name=user,
password=password
)
resp = self.authstub.Authenticate(auth_request, self.timeout)
self.metadata = (('token', resp.token),)
self.call_credentials = grpc.metadata_call_credentials(
EtcdTokenCallCredentials(resp.token))
elif any(cred_params):
raise Exception(
'if using authentication credentials both user and password '
'must be specified.'
)
self.transactions = Transactions()
def _create_stub_property(name, stub_class):
def get_stub(self):
stub = self._stubs.get(name)
if stub is None:
stub = self._stubs[name] = stub_class(self.channel)
return stub
return property(get_stub)
authstub = _create_stub_property("authstub", etcdrpc.AuthStub)
kvstub = _create_stub_property("kvstub", etcdrpc.KVStub)
clusterstub = _create_stub_property("clusterstub", etcdrpc.ClusterStub)
leasestub = _create_stub_property("leasestub", etcdrpc.LeaseStub)
maintenancestub = _create_stub_property(
"maintenancestub", etcdrpc.MaintenanceStub
)
def get_watcher(self):
watchstub = etcdrpc.WatchStub(self.channel)
return watch.Watcher(
watchstub,
timeout=self.timeout,
call_credentials=self.call_credentials,
metadata=self.metadata
)
@property
def watcher(self):
watcher = self._stubs.get("watcher")
if watcher is None:
watcher = self._stubs["watcher"] = self.get_watcher()
return watcher
@watcher.setter
def watcher(self, value):
self._stubs["watcher"] = value
def _clear_old_stubs(self):
old_watcher = self._stubs.get("watcher")
self._stubs.clear()
if old_watcher:
old_watcher.close()
@property
def _current_endpoint_label(self):
return self._current_ep_label
@_current_endpoint_label.setter
def _current_endpoint_label(self, value):
if getattr(self, "_current_ep_label", None) is not value:
self._clear_old_stubs()
self._current_ep_label = value
@property
def endpoint_in_use(self):
"""Get the current endpoint in use."""
if self._current_endpoint_label is None:
return None
return self.endpoints[self._current_endpoint_label]
@property
def channel(self):
"""
Get an available channel on the first node that's not failed.
Raises an exception if no node is available
"""
try:
return self.endpoint_in_use.use()
except ValueError:
if not self.failover:
raise
# We're failing over. We get the first non-failed channel
# we encounter, and use it by calling this function again,
# recursively
for label, endpoint in self.endpoints.items():
if endpoint.is_failed():
continue
self._current_endpoint_label = label
return self.channel
raise exceptions.NoServerAvailableError(
"No endpoint available and not failed")
[docs] def close(self):
"""Call the GRPC channel close semantics."""
possible_watcher = self._stubs.get("watcher")
if possible_watcher:
possible_watcher.close()
for endpoint in self.endpoints.values():
endpoint.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
@staticmethod
def get_secure_creds(ca_cert, cert_key=None, cert_cert=None):
cert_key_file = None
cert_cert_file = None
with open(ca_cert, 'rb') as f:
ca_cert_file = f.read()
if cert_key is not None:
with open(cert_key, 'rb') as f:
cert_key_file = f.read()
if cert_cert is not None:
with open(cert_cert, 'rb') as f:
cert_cert_file = f.read()
return grpc.ssl_channel_credentials(
ca_cert_file,
cert_key_file,
cert_cert_file
)
def _manage_grpc_errors(self, exc):
code = exc.code()
if code in _FAILED_EP_CODES:
# This sets the current node to failed.
# If others are available, they will be used on
# subsequent requests.
self.endpoint_in_use.fail()
self._clear_old_stubs()
exception = _EXCEPTIONS_BY_CODE.get(code)
if exception is None:
raise
raise exception()
def _handle_errors(payload):
@functools.wraps(payload)
def handler(self, *args, **kwargs):
try:
return payload(self, *args, **kwargs)
except grpc.RpcError as exc:
self._manage_grpc_errors(exc)
return handler
def _handle_generator_errors(payload):
@functools.wraps(payload)
def handler(self, *args, **kwargs):
try:
for item in payload(self, *args, **kwargs):
yield item
except grpc.RpcError as exc:
self._manage_grpc_errors(exc)
return handler
def _build_get_range_request(self, key,
range_end=None,
limit=None,
revision=None,
sort_order=None,
sort_target='key',
serializable=False,
keys_only=False,
count_only=False,
min_mod_revision=None,
max_mod_revision=None,
min_create_revision=None,
max_create_revision=None):
range_request = etcdrpc.RangeRequest()
range_request.key = utils.to_bytes(key)
range_request.keys_only = keys_only
range_request.count_only = count_only
range_request.serializable = serializable
if range_end is not None:
range_request.range_end = utils.to_bytes(range_end)
if limit is not None:
range_request.limit = limit
if revision is not None:
range_request.revision = revision
if min_mod_revision is not None:
range_request.min_mod_revision = min_mod_revision
if max_mod_revision is not None:
range_request.max_mod_revision = max_mod_revision
if min_create_revision is not None:
range_request.min_mod_revision = min_create_revision
if max_create_revision is not None:
range_request.min_mod_revision = max_create_revision
sort_orders = {
None: etcdrpc.RangeRequest.NONE,
'ascend': etcdrpc.RangeRequest.ASCEND,
'descend': etcdrpc.RangeRequest.DESCEND,
}
request_sort_order = sort_orders.get(sort_order)
if request_sort_order is None:
raise ValueError('unknown sort order: "{}"'.format(sort_order))
range_request.sort_order = request_sort_order
sort_targets = {
None: etcdrpc.RangeRequest.KEY,
'key': etcdrpc.RangeRequest.KEY,
'version': etcdrpc.RangeRequest.VERSION,
'create': etcdrpc.RangeRequest.CREATE,
'mod': etcdrpc.RangeRequest.MOD,
'value': etcdrpc.RangeRequest.VALUE,
}
request_sort_target = sort_targets.get(sort_target)
if request_sort_target is None:
raise ValueError('sort_target must be one of "key", '
'"version", "create", "mod" or "value"')
range_request.sort_target = request_sort_target
return range_request
[docs] @_handle_errors
def get_response(self, key, **kwargs):
"""Get the value of a key from etcd."""
range_request = self._build_get_range_request(
key,
**kwargs
)
return self.kvstub.Range(
range_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] def get(self, key, **kwargs):
"""
Get the value of a key from etcd.
example usage:
.. code-block:: python
>>> import pyetcd
>>> etcd = pyetcd.client()
>>> etcd.get('/thing/key')
'hello world'
:param key: key in etcd to get
:returns: value of key and metadata
:rtype: bytes, ``KVMetadata``
"""
range_response = self.get_response(key, **kwargs)
if range_response.count < 1:
return None, None
else:
kv = range_response.kvs.pop()
return kv.value, KVMetadata(kv, range_response.header)
[docs] @_handle_errors
def get_prefix_response(self, key_prefix, **kwargs):
"""Get a range of keys with a prefix."""
if any(kwarg in kwargs for kwarg in ("key", "range_end")):
raise TypeError("Don't use key or range_end with prefix")
range_request = self._build_get_range_request(
key=key_prefix,
range_end=utils.prefix_range_end(utils.to_bytes(key_prefix)),
**kwargs
)
return self.kvstub.Range(
range_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] def get_prefix(self, key_prefix, **kwargs):
"""
Get a range of keys with a prefix.
:param key_prefix: first key in range
:param keys_only: if True, retrieve only the keys, not the values
:returns: sequence of (value, metadata) tuples
"""
range_response = self.get_prefix_response(key_prefix, **kwargs)
return (
(kv.value, KVMetadata(kv, range_response.header))
for kv in range_response.kvs
)
[docs] @_handle_errors
def get_range_response(self, range_start, range_end, sort_order=None,
sort_target='key', **kwargs):
"""Get a range of keys."""
range_request = self._build_get_range_request(
key=range_start,
range_end=range_end,
sort_order=sort_order,
sort_target=sort_target,
**kwargs
)
return self.kvstub.Range(
range_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] def get_range(self, range_start, range_end, **kwargs):
"""
Get a range of keys.
:param range_start: first key in range
:param range_end: last key in range
:returns: sequence of (value, metadata) tuples
"""
range_response = self.get_range_response(range_start, range_end,
**kwargs)
for kv in range_response.kvs:
yield (kv.value, KVMetadata(kv, range_response.header))
[docs] @_handle_errors
def get_all_response(self, sort_order=None, sort_target='key',
keys_only=False):
"""Get all keys currently stored in etcd."""
range_request = self._build_get_range_request(
key=b'\0',
range_end=b'\0',
sort_order=sort_order,
sort_target=sort_target,
keys_only=keys_only,
)
return self.kvstub.Range(
range_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] def get_all(self, **kwargs):
"""
Get all keys currently stored in etcd.
:param keys_only: if True, retrieve only the keys, not the values
:returns: sequence of (value, metadata) tuples
"""
range_response = self.get_all_response(**kwargs)
for kv in range_response.kvs:
yield (kv.value, KVMetadata(kv, range_response.header))
def _build_put_request(self, key, value, lease=None, prev_kv=False):
put_request = etcdrpc.PutRequest()
put_request.key = utils.to_bytes(key)
put_request.value = utils.to_bytes(value)
put_request.lease = utils.lease_to_id(lease)
put_request.prev_kv = prev_kv
return put_request
[docs] @_handle_errors
def put(self, key, value, lease=None, prev_kv=False):
"""
Save a value to etcd.
Example usage:
.. code-block:: python
>>> import pyetcd
>>> etcd = pyetcd.client()
>>> etcd.put('/thing/key', 'hello world')
:param key: key in etcd to set
:param value: value to set key to
:type value: bytes
:param lease: Lease to associate with this key.
:type lease: either :class:`.Lease`, or int (ID of lease)
:param prev_kv: return the previous key-value pair
:type prev_kv: bool
:returns: a response containing a header and the prev_kv
:rtype: :class:`.rpc_pb2.PutResponse`
"""
put_request = self._build_put_request(key, value, lease=lease,
prev_kv=prev_kv)
return self.kvstub.Put(
put_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] @_handle_errors
def put_if_not_exists(self, key, value, lease=None):
"""
Atomically puts a value only if the key previously had no value.
This is the etcdv3 equivalent to setting a key with the etcdv2
parameter prevExist=false.
:param key: key in etcd to put
:param value: value to be written to key
:type value: bytes
:param lease: Lease to associate with this key.
:type lease: either :class:`.Lease`, or int (ID of lease)
:returns: state of transaction, ``True`` if the put was successful,
``False`` otherwise
:rtype: bool
"""
status, _ = self.transaction(
compare=[self.transactions.create(key) == '0'],
success=[self.transactions.put(key, value, lease=lease)],
failure=[],
)
return status
[docs] @_handle_errors
def replace(self, key, initial_value, new_value):
"""
Atomically replace the value of a key with a new value.
This compares the current value of a key, then replaces it with a new
value if it is equal to a specified value. This operation takes place
in a transaction.
:param key: key in etcd to replace
:param initial_value: old value to replace
:type initial_value: bytes
:param new_value: new value of the key
:type new_value: bytes
:returns: status of transaction, ``True`` if the replace was
successful, ``False`` otherwise
:rtype: bool
"""
status, _ = self.transaction(
compare=[self.transactions.value(key) == initial_value],
success=[self.transactions.put(key, new_value)],
failure=[],
)
return status
def _build_delete_request(self, key,
range_end=None,
prev_kv=False):
delete_request = etcdrpc.DeleteRangeRequest()
delete_request.key = utils.to_bytes(key)
delete_request.prev_kv = prev_kv
if range_end is not None:
delete_request.range_end = utils.to_bytes(range_end)
return delete_request
[docs] @_handle_errors
def delete(self, key, prev_kv=False, return_response=False):
"""
Delete a single key in etcd.
:param key: key in etcd to delete
:param prev_kv: return the deleted key-value pair
:type prev_kv: bool
:param return_response: return the full response
:type return_response: bool
:returns: True if the key has been deleted when
``return_response`` is False and a response containing
a header, the number of deleted keys and prev_kvs when
``return_response`` is True
"""
delete_request = self._build_delete_request(key, prev_kv=prev_kv)
delete_response = self.kvstub.DeleteRange(
delete_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
if return_response:
return delete_response
return delete_response.deleted >= 1
[docs] @_handle_errors
def delete_prefix(self, prefix):
"""Delete a range of keys with a prefix in etcd."""
delete_request = self._build_delete_request(
prefix,
range_end=utils.prefix_range_end(utils.to_bytes(prefix))
)
return self.kvstub.DeleteRange(
delete_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] @_handle_errors
def status(self):
"""Get the status of the responding member."""
status_request = etcdrpc.StatusRequest()
status_response = self.maintenancestub.Status(
status_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
for m in self.members:
if m.id == status_response.leader:
leader = m
break
else:
# raise exception?
leader = None
return Status(status_response.version,
status_response.dbSize,
leader,
status_response.raftIndex,
status_response.raftTerm)
[docs] @_handle_errors
def add_watch_callback(self, *args, **kwargs):
"""
Watch a key or range of keys and call a callback on every response.
If timeout was declared during the client initialization and
the watch cannot be created during that time the method raises
a ``WatchTimedOut`` exception.
:param key: key to watch
:param callback: callback function
:returns: watch_id. Later it could be used for cancelling watch.
"""
try:
return self.watcher.add_callback(*args, **kwargs)
except queue.Empty:
raise exceptions.WatchTimedOut()
[docs] @_handle_errors
def add_watch_prefix_callback(self, key_prefix, callback, **kwargs):
"""
Watch a prefix and call a callback on every response.
If timeout was declared during the client initialization and
the watch cannot be created during that time the method raises
a ``WatchTimedOut`` exception.
:param key_prefix: prefix to watch
:param callback: callback function
:returns: watch_id. Later it could be used for cancelling watch.
"""
kwargs['range_end'] = \
utils.prefix_range_end(utils.to_bytes(key_prefix))
return self.add_watch_callback(key_prefix, callback, **kwargs)
[docs] @_handle_errors
def watch_response(self, key, **kwargs):
"""
Watch a key.
Example usage:
.. code-block:: python
responses_iterator, cancel = etcd.watch_response('/doot/key')
for response in responses_iterator:
print(response)
:param key: key to watch
:returns: tuple of ``responses_iterator`` and ``cancel``.
Use ``responses_iterator`` to get the watch responses,
each of which contains a header and a list of events.
Use ``cancel`` to cancel the watch request.
"""
response_queue = queue.Queue()
def callback(response):
response_queue.put(response)
watch_id = self.add_watch_callback(key, callback, **kwargs)
canceled = threading.Event()
def cancel():
canceled.set()
response_queue.put(None)
self.cancel_watch(watch_id)
def iterator():
try:
while not canceled.is_set():
response = response_queue.get()
if response is None:
canceled.set()
if isinstance(response, Exception):
canceled.set()
raise response
if not canceled.is_set():
yield response
except grpc.RpcError as exc:
self._manage_grpc_errors(exc)
return iterator(), cancel
[docs] def watch(self, key, **kwargs):
"""
Watch a key.
Example usage:
.. code-block:: python
events_iterator, cancel = etcd.watch('/doot/key')
for event in events_iterator:
print(event)
:param key: key to watch
:returns: tuple of ``events_iterator`` and ``cancel``.
Use ``events_iterator`` to get the events of key changes
and ``cancel`` to cancel the watch request.
"""
response_iterator, cancel = self.watch_response(key, **kwargs)
return utils.response_to_event_iterator(response_iterator), cancel
[docs] def watch_prefix_response(self, key_prefix, **kwargs):
"""
Watch a range of keys with a prefix.
:param key_prefix: prefix to watch
:returns: tuple of ``responses_iterator`` and ``cancel``.
"""
kwargs['range_end'] = \
utils.prefix_range_end(utils.to_bytes(key_prefix))
return self.watch_response(key_prefix, **kwargs)
[docs] def watch_prefix(self, key_prefix, **kwargs):
"""
Watch a range of keys with a prefix.
:param key_prefix: prefix to watch
:returns: tuple of ``events_iterator`` and ``cancel``.
"""
kwargs['range_end'] = \
utils.prefix_range_end(utils.to_bytes(key_prefix))
return self.watch(key_prefix, **kwargs)
[docs] @_handle_errors
def watch_once_response(self, key, timeout=None, **kwargs):
"""
Watch a key and stop after the first response.
If the timeout was specified and response didn't arrive method
will raise ``WatchTimedOut`` exception.
:param key: key to watch
:param timeout: (optional) timeout in seconds.
:returns: ``WatchResponse``
"""
response_queue = queue.Queue()
def callback(response):
response_queue.put(response)
watch_id = self.add_watch_callback(key, callback, **kwargs)
try:
return response_queue.get(timeout=timeout)
except queue.Empty:
raise exceptions.WatchTimedOut()
finally:
self.cancel_watch(watch_id)
[docs] def watch_once(self, key, timeout=None, **kwargs):
"""
Watch a key and stop after the first event.
If the timeout was specified and event didn't arrive method
will raise ``WatchTimedOut`` exception.
:param key: key to watch
:param timeout: (optional) timeout in seconds.
:returns: ``Event``
"""
response = self.watch_once_response(key, timeout=timeout, **kwargs)
return response.events[0]
[docs] def watch_prefix_once_response(self, key_prefix, timeout=None, **kwargs):
"""
Watch a range of keys with a prefix and stop after the first response.
If the timeout was specified and response didn't arrive method
will raise ``WatchTimedOut`` exception.
"""
kwargs['range_end'] = \
utils.prefix_range_end(utils.to_bytes(key_prefix))
return self.watch_once_response(key_prefix, timeout=timeout, **kwargs)
[docs] def watch_prefix_once(self, key_prefix, timeout=None, **kwargs):
"""
Watch a range of keys with a prefix and stop after the first event.
If the timeout was specified and event didn't arrive method
will raise ``WatchTimedOut`` exception.
"""
kwargs['range_end'] = \
utils.prefix_range_end(utils.to_bytes(key_prefix))
return self.watch_once(key_prefix, timeout=timeout, **kwargs)
[docs] @_handle_errors
def cancel_watch(self, watch_id):
"""
Stop watching a key or range of keys.
:param watch_id: watch_id returned by ``add_watch_callback`` method
"""
self.watcher.cancel(watch_id)
def _ops_to_requests(self, ops):
"""
Return a list of grpc requests.
Returns list from an input list of pyetcd.transactions.{Put, Get,
Delete, Txn} objects.
"""
request_ops = []
for op in ops:
if isinstance(op, transactions.Put):
request = self._build_put_request(op.key, op.value,
op.lease, op.prev_kv)
request_op = etcdrpc.RequestOp(request_put=request)
request_ops.append(request_op)
elif isinstance(op, transactions.Get):
request = self._build_get_range_request(op.key, op.range_end)
request_op = etcdrpc.RequestOp(request_range=request)
request_ops.append(request_op)
elif isinstance(op, transactions.Delete):
request = self._build_delete_request(op.key, op.range_end,
op.prev_kv)
request_op = etcdrpc.RequestOp(request_delete_range=request)
request_ops.append(request_op)
elif isinstance(op, transactions.Txn):
compare = [c.build_message() for c in op.compare]
success_ops = self._ops_to_requests(op.success)
failure_ops = self._ops_to_requests(op.failure)
request = etcdrpc.TxnRequest(compare=compare,
success=success_ops,
failure=failure_ops)
request_op = etcdrpc.RequestOp(request_txn=request)
request_ops.append(request_op)
else:
raise Exception(
'Unknown request class {}'.format(op.__class__))
return request_ops
[docs] @_handle_errors
def transaction(self, compare, success=None, failure=None):
"""
Perform a transaction.
Example usage:
.. code-block:: python
etcd.transaction(
compare=[
etcd.transactions.value('/doot/testing') == 'doot',
etcd.transactions.version('/doot/testing') > 0,
],
success=[
etcd.transactions.put('/doot/testing', 'success'),
],
failure=[
etcd.transactions.put('/doot/testing', 'failure'),
]
)
:param compare: A list of comparisons to make
:param success: A list of operations to perform if all the comparisons
are true
:param failure: A list of operations to perform if any of the
comparisons are false
:return: A tuple of (operation status, responses)
"""
compare = [c.build_message() for c in compare]
success_ops = self._ops_to_requests(success)
failure_ops = self._ops_to_requests(failure)
transaction_request = etcdrpc.TxnRequest(compare=compare,
success=success_ops,
failure=failure_ops)
txn_response = self.kvstub.Txn(
transaction_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
responses = []
for response in txn_response.responses:
response_type = response.WhichOneof('response')
if response_type in ['response_put', 'response_delete_range',
'response_txn']:
responses.append(response)
elif response_type == 'response_range':
range_kvs = []
for kv in response.response_range.kvs:
range_kvs.append((kv.value,
KVMetadata(kv, txn_response.header)))
responses.append(range_kvs)
return txn_response.succeeded, responses
[docs] @_handle_errors
def lease(self, ttl, lease_id=None):
"""
Create a new lease.
All keys attached to this lease will be expired and deleted if the
lease expires. A lease can be sent keep alive messages to refresh the
ttl.
:param ttl: Requested time to live
:param lease_id: Requested ID for the lease
:returns: new lease
:rtype: :class:`.Lease`
"""
lease_grant_request = etcdrpc.LeaseGrantRequest(TTL=ttl, ID=lease_id)
lease_grant_response = self.leasestub.LeaseGrant(
lease_grant_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
return leases.Lease(lease_id=lease_grant_response.ID,
ttl=lease_grant_response.TTL,
etcd_client=self)
[docs] @_handle_errors
def revoke_lease(self, lease_id):
"""
Revoke a lease.
:param lease_id: ID of the lease to revoke.
"""
lease_revoke_request = etcdrpc.LeaseRevokeRequest(ID=lease_id)
self.leasestub.LeaseRevoke(
lease_revoke_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
@_handle_generator_errors
def refresh_lease(self, lease_id):
keep_alive_request = etcdrpc.LeaseKeepAliveRequest(ID=lease_id)
request_stream = [keep_alive_request]
for response in self.leasestub.LeaseKeepAlive(
iter(request_stream),
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata):
yield response
@_handle_errors
def get_lease_info(self, lease_id):
# only available in etcd v3.1.0 and later
ttl_request = etcdrpc.LeaseTimeToLiveRequest(ID=lease_id,
keys=True)
return self.leasestub.LeaseTimeToLive(
ttl_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] @_handle_errors
def lock(self, name, ttl=60):
"""
Create a new lock.
:param name: name of the lock
:type name: string or bytes
:param ttl: length of time for the lock to live for in seconds. The
lock will be released after this time elapses, unless
refreshed
:type ttl: int
:returns: new lock
:rtype: :class:`.Lock`
"""
return locks.Lock(name, ttl=ttl, etcd_client=self)
[docs] @_handle_errors
def add_member(self, urls):
"""
Add a member into the cluster.
:returns: new member
:rtype: :class:`.Member`
"""
member_add_request = etcdrpc.MemberAddRequest(peerURLs=urls)
member_add_response = self.clusterstub.MemberAdd(
member_add_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
member = member_add_response.member
return members.Member(member.ID,
member.name,
member.peerURLs,
member.clientURLs,
etcd_client=self)
[docs] @_handle_errors
def remove_member(self, member_id):
"""
Remove an existing member from the cluster.
:param member_id: ID of the member to remove
"""
member_rm_request = etcdrpc.MemberRemoveRequest(ID=member_id)
self.clusterstub.MemberRemove(
member_rm_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] @_handle_errors
def update_member(self, member_id, peer_urls):
"""
Update the configuration of an existing member in the cluster.
:param member_id: ID of the member to update
:param peer_urls: new list of peer urls the member will use to
communicate with the cluster
"""
member_update_request = etcdrpc.MemberUpdateRequest(ID=member_id,
peerURLs=peer_urls)
self.clusterstub.MemberUpdate(
member_update_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
@property
def members(self):
"""
List of all members associated with the cluster.
:type: sequence of :class:`.Member`
"""
member_list_request = etcdrpc.MemberListRequest()
member_list_response = self.clusterstub.MemberList(
member_list_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
for member in member_list_response.members:
yield members.Member(member.ID,
member.name,
member.peerURLs,
member.clientURLs,
etcd_client=self)
[docs] @_handle_errors
def compact(self, revision, physical=False):
"""
Compact the event history in etcd up to a given revision.
All superseded keys with a revision less than the compaction revision
will be removed.
:param revision: revision for the compaction operation
:param physical: if set to True, the request will wait until the
compaction is physically applied to the local database
such that compacted entries are totally removed from
the backend database
"""
compact_request = etcdrpc.CompactionRequest(revision=revision,
physical=physical)
self.kvstub.Compact(
compact_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] @_handle_errors
def defragment(self):
"""Defragment a member's backend database to recover storage space."""
defrag_request = etcdrpc.DefragmentRequest()
self.maintenancestub.Defragment(
defrag_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
[docs] @_handle_errors
def hash(self):
"""
Return the hash of the local KV state.
:returns: kv state hash
:rtype: int
"""
hash_request = etcdrpc.HashRequest()
return self.maintenancestub.Hash(hash_request).hash
def _build_alarm_request(self, alarm_action, member_id, alarm_type):
alarm_request = etcdrpc.AlarmRequest()
if alarm_action == 'get':
alarm_request.action = etcdrpc.AlarmRequest.GET
elif alarm_action == 'activate':
alarm_request.action = etcdrpc.AlarmRequest.ACTIVATE
elif alarm_action == 'deactivate':
alarm_request.action = etcdrpc.AlarmRequest.DEACTIVATE
else:
raise ValueError('Unknown alarm action: {}'.format(alarm_action))
alarm_request.memberID = member_id
if alarm_type == 'none':
alarm_request.alarm = etcdrpc.NONE
elif alarm_type == 'no space':
alarm_request.alarm = etcdrpc.NOSPACE
else:
raise ValueError('Unknown alarm type: {}'.format(alarm_type))
return alarm_request
[docs] @_handle_errors
def create_alarm(self, member_id=0):
"""Create an alarm.
If no member id is given, the alarm is activated for all the
members of the cluster. Only the `no space` alarm can be raised.
:param member_id: The cluster member id to create an alarm to.
If 0, the alarm is created for all the members
of the cluster.
:returns: list of :class:`.Alarm`
"""
alarm_request = self._build_alarm_request('activate',
member_id,
'no space')
alarm_response = self.maintenancestub.Alarm(
alarm_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
return [Alarm(alarm.alarm, alarm.memberID)
for alarm in alarm_response.alarms]
[docs] @_handle_errors
def list_alarms(self, member_id=0, alarm_type='none'):
"""List the activated alarms.
:param member_id:
:param alarm_type: The cluster member id to create an alarm to.
If 0, the alarm is created for all the members
of the cluster.
:returns: sequence of :class:`.Alarm`
"""
alarm_request = self._build_alarm_request('get',
member_id,
alarm_type)
alarm_response = self.maintenancestub.Alarm(
alarm_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
for alarm in alarm_response.alarms:
yield Alarm(alarm.alarm, alarm.memberID)
[docs] @_handle_errors
def disarm_alarm(self, member_id=0):
"""Cancel an alarm.
:param member_id: The cluster member id to cancel an alarm.
If 0, the alarm is canceled for all the members
of the cluster.
:returns: List of :class:`.Alarm`
"""
alarm_request = self._build_alarm_request('deactivate',
member_id,
'no space')
alarm_response = self.maintenancestub.Alarm(
alarm_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
return [Alarm(alarm.alarm, alarm.memberID)
for alarm in alarm_response.alarms]
[docs] @_handle_errors
def snapshot(self, file_obj):
"""Take a snapshot of the database.
:param file_obj: A file-like object to write the database contents in.
"""
snapshot_request = etcdrpc.SnapshotRequest()
snapshot_response = self.maintenancestub.Snapshot(
snapshot_request,
self.timeout,
credentials=self.call_credentials,
metadata=self.metadata
)
for response in snapshot_response:
file_obj.write(response.blob)
[docs]class Etcd3Client(MultiEndpointEtcd3Client):
"""
etcd v3 API client.
:param host: Host to connect to, 'localhost' if not specified
:type host: str, optional
:param port: Port to connect to on host, 2379 if not specified
:type port: int, optional
:param ca_cert: Filesystem path of etcd CA certificate
:type ca_cert: str or os.PathLike, optional
:param cert_key: Filesystem path of client key
:type cert_key: str or os.PathLike, optional
:param cert_cert: Filesystem path of client certificate
:type cert_cert: str or os.PathLike, optional
:param timeout: Timeout for all RPC in seconds
:type timeout: int or float, optional
:param user: Username for authentication
:type user: str, optional
:param password: Password for authentication
:type password: str, optional
:param dict grpc_options: Additional gRPC options
:type grpc_options: dict, optional
"""
def __init__(self, host='localhost', port=2379, ca_cert=None,
cert_key=None, cert_cert=None, timeout=None, user=None,
password=None, grpc_options=None):
# Step 1: verify credentials
cert_params = [c is not None for c in (cert_cert, cert_key)]
if ca_cert is not None:
if all(cert_params):
credentials = self.get_secure_creds(
ca_cert,
cert_key,
cert_cert
)
self.uses_secure_channel = True
elif any(cert_params):
# some of the cert parameters are set
raise ValueError(
'to use a secure channel ca_cert is required by itself, '
'or cert_cert and cert_key must both be specified.')
else:
credentials = self.get_secure_creds(ca_cert, None, None)
self.uses_secure_channel = True
else:
self.uses_secure_channel = False
credentials = None
# Step 2: create Endpoint
ep = Endpoint(host, port, secure=self.uses_secure_channel,
creds=credentials, opts=grpc_options)
super(Etcd3Client, self).__init__(endpoints=[ep], timeout=timeout,
user=user, password=password)
def client(host='localhost', port=2379,
ca_cert=None, cert_key=None, cert_cert=None, timeout=None,
user=None, password=None, grpc_options=None):
"""Return an instance of an Etcd3Client."""
return Etcd3Client(host=host,
port=port,
ca_cert=ca_cert,
cert_key=cert_key,
cert_cert=cert_cert,
timeout=timeout,
user=user,
password=password,
grpc_options=grpc_options)