Source code for pyetcd.locks
import threading
import time
import uuid
from . import events, exceptions
[docs]class Lock(object):
"""
A distributed lock.
This can be used as a context manager, with the lock being acquired and
released as you would expect:
.. code-block:: python
etcd = pyetcd.client()
# create a lock that expires after 20 seconds
with etcd.lock('toot', ttl=20) as lock:
# do something that requires the lock
print(lock.is_acquired())
# refresh the timeout on the lease
lock.refresh()
:param name: name of the lock
:type name: string or bytes
:param ttl: length of time for the lock to live for in seconds. The lock
will be released after this time elapses, unless refreshed
:type ttl: int
"""
lock_prefix = '/locks/'
def __init__(self, name, ttl=60,
etcd_client=None):
self.name = name
self.ttl = ttl
if etcd_client is not None:
self.etcd_client = etcd_client
self.key = self.lock_prefix + self.name
self.lease = None
# store uuid as bytes, since it avoids having to decode each time we
# need to compare
self.uuid = uuid.uuid1().bytes
[docs] def acquire(self, timeout=10):
"""Acquire the lock.
:params timeout: Maximum time to wait before returning. `None` means
forever, any other value equal or greater than 0 is
the number of seconds.
:returns: True if the lock has been acquired, False otherwise.
"""
if timeout is not None:
deadline = time.time() + timeout
while True:
if self._try_acquire():
return True
if timeout is not None:
remaining_timeout = max(deadline - time.time(), 0)
if remaining_timeout == 0:
return False
else:
remaining_timeout = None
self._wait_delete_event(remaining_timeout)
def _try_acquire(self):
self.lease = self.etcd_client.lease(self.ttl)
success, metadata = self.etcd_client.transaction(
compare=[
self.etcd_client.transactions.create(self.key) == 0
],
success=[
self.etcd_client.transactions.put(self.key, self.uuid,
lease=self.lease)
],
failure=[
self.etcd_client.transactions.get(self.key)
]
)
if success is True:
self.revision = metadata[0].response_put.header.revision
return True
self.revision = metadata[0][0][1].mod_revision
self.lease = None
return False
def _wait_delete_event(self, timeout):
try:
event_iter, cancel = self.etcd_client.watch(
self.key, start_revision=self.revision + 1)
except exceptions.WatchTimedOut:
return
if timeout is not None:
timer = threading.Timer(timeout, cancel)
timer.start()
else:
timer = None
for event in event_iter:
if isinstance(event, events.DeleteEvent):
if timer is not None:
timer.cancel()
cancel()
break
[docs] def release(self):
"""Release the lock."""
success, _ = self.etcd_client.transaction(
compare=[
self.etcd_client.transactions.value(self.key) == self.uuid
],
success=[self.etcd_client.transactions.delete(self.key)],
failure=[]
)
return success
[docs] def refresh(self):
"""Refresh the time to live on this lock."""
if self.lease is not None:
return self.lease.refresh()
else:
raise ValueError('No lease associated with this lock - have you '
'acquired the lock yet?')
[docs] def is_acquired(self):
"""Check if this lock is currently acquired."""
uuid, _ = self.etcd_client.get(self.key)
if uuid is None:
return False
return uuid == self.uuid
def __enter__(self):
self.acquire()
return self
def __exit__(self, exception_type, exception_value, traceback):
self.release()