Distributed Locks with Heartbeats using Redis

Published

I was recently tasked with building a high-scale data processing service in Python and Kubernetes. The service creates n-number of worker jobs, which create 1 pod per job. For simplicity and scalability, I wanted each pod to be identical - I didn't want any kind of "master" pod keeping track of the data processing's various phases of execution.

This means that the worker pods need to coordinate among themselves so that tasks like the "initializing phase" only run once, and do not overlap. Redis is a very good tool to use for this, as it is extremely efficient and reliable. Additionally, since Redis is single-threaded, all operations are atomic by default.

Redis-py, the official Redis Python client, provides a really easy to use locking mechanism through its Lock object:

from redis.asyncio import Redis
from redis.asyncio.lock import Lock

redis = Redis(...)
lock = Lock(name="my-lock")

# The Lock object provides a context manager like so:
async with lock:
    do_something_cool()

# Or you can manually acquire and release a lock:
if await lock.acquire():
    do_something_cool()
    await lock.release()

# You can even do a non-blocking lock that will return True or False
# immediately based on whether it acquired the lock
if await lock.acquire(blocking=False):
    do_something_cool()
    await lock.release()

This lock works great out of the box. Using a Lock, I can have each worker do something like:

  1. Attempt to aquire the lock
  2. If successful, run an initialization step and release the lock

This ensures that only one worker will do the initialization. It does have one issue however: if you don't specify a timeout, and a process dies, then it will never release the lock. To avoid this, you need to specify a timeout:

lock = Lock(name="my-lock", timeout=5)

async with lock:
    do_something_cool()

If your task is something simple and short, then a fixed 5 second timeout will work fine. But what if your task takes longer than 5 seconds? You will have to set your timeout equal to the greatest time you can imagine your task taking. If the time it takes for your task to complete varies significantly, then your other processes are going to be wasting a lot of time waiting for the lock to be released if the process dies.

What I ended up doing was making a very simple wrapper that creates a heartbeat function that resets the lock timeout regularly:

import asyncio

from redis.asyncio.lock import Lock
from typing import Awaitable, TypeVar

T = TypeVar("T")

async def map_lock(lock: Lock, coro: Awaitable[T], ttl: int, blocking: bool = True) -> T | None:
    if await lock.acquire(blocking=blocking):
        async def _heartbeat() -> None:
            while True:
                await lock.extend(ttl, replace_ttl=True)
                await asyncio.sleep(ttl / 2)

        # Start heartbeat task in the background
        heartbeat_task = asyncio.create_task(_heartbeat())

        try:
            return await coro
        finally:
            # Release the lock and cancel the heartbeat task
            await lock.release()
            heartbeat_task.cancel()
    else:
        return None

Then, you can call map_lock on the function you want to run with a Lock:

lock = Lock(name="my-lock", timeout=5)
await map_lock(lock, do_something_cool(), ttl=5)

This will use asyncio to create a background task that resets the lock TTL back to 5 seconds every 2.5 seconds. Now your tasks can take an arbitrarily long time.

If the process exits normally, it will release the lock and stop the heartbeat function. But if the process dies, for example due to a power failure or killed by the kernel due to using too much memory (pretty darn common!), the lock TTL will expire within 5 seconds and the lock will automatically be released.