Skip to content

Synchronization Primitives

The sync package provides synchronization primitives for distributed systems.

The primitives are technology agnostic, supporting multiple backends (see more in the Backends section).

The available primitives are:

  • Leader Election: A single worker is elected as the leader for performing tasks only once in a cluster.
  • Lock: A distributed lock that can be used to synchronize access to shared resources.

The synchronization primitives can be used in combination with the TaskManager and TaskRouter to control task execution in a distributed system (see more in Task Scheduler).

Backend

You must load a synchronization backend before using synchronization primitives.

Note

Although Grelmicro use AnyIO for concurrency, the backends generally depend on asyncio, therefore Trio is not supported.

You can initialize a backend like this:

from grelmicro.sync.redis import RedisSyncBackend

backend = RedisSyncBackend("redis://localhost:6379/0")
from grelmicro.sync.postgres import PostgresSyncBackend

backend = PostgresSyncBackend("postgresql://user:password@localhost:5432/db")
from grelmicro.sync.memory import MemorySyncBackend

backend = MemorySyncBackend()

Warning

Please make sure to use a proper way to store connection url, such as environment variables (not like the example above).

Tip

Feel free to create your own backend and contribute it. In the sync.abc module, you can find the protocol for creating new backends.

Leader Election

Leader election ensures that only one worker in the cluster is designated as the leader at any given time using a distributed lock.

The leader election service is responsible for acquiring and renewing the distributed lock. It runs as an AnyIO Task that can be easily started with the Task Manager. This service operates in the background, automatically renewing the lock to prevent other workers from acquiring it. The lock is released automatically when the task is cancelled or during shutdown.

from grelmicro.sync import LeaderElection
from grelmicro.task import TaskManager

leader = LeaderElection("cluster_group")
task = TaskManager()
task.add_task(leader)
from anyio import create_task_group, sleep_forever

from grelmicro.sync.leaderelection import LeaderElection

leader = LeaderElection("cluster_group")


async def main():
    async with create_task_group() as tg:
        await tg.start(leader)
        await sleep_forever()

Lock

The lock is a distributed lock that can be used to synchronize access to shared resources.

The lock supports the following features:

  • Async: The lock must be acquired and released asynchronously.
  • Distributed: The lock must be distributed across multiple workers.
  • Reentrant: The lock must allow the same token to acquire it multiple times to extend the lease.
  • Expiring: The lock must have a timeout to auto-release after an interval to prevent deadlocks.
  • Non-blocking: Lock operations must not block the async event loop.
  • Vendor-agnostic: Must support multiple backends (Redis, Postgres, ConfigMap, etc.).
from grelmicro.sync import Lock

lock = Lock("resource_name")


async def main():
    async with lock:
        print("Protected resource accessed")

Warning

The lock is designed for use within an async event loop and is not thread-safe or process-safe.