Synchronization Primitives
The sync
package provides synchronization primitives for distributed systems.
The primitives are technology agnostic, supporting multiple backends (see more in the Backends section).
The available primitives are:
- Leader Election: A single worker is elected as the leader for performing tasks only once in a cluster.
- Lock: A distributed lock that can be used to synchronize access to shared resources.
The synchronization primitives can be used in combination with the TaskManager
and TaskRouter
to control task execution in a distributed system (see more in Task Scheduler).
Backend
You must load a synchronization backend before using synchronization primitives.
Note
Although Grelmicro use AnyIO for concurrency, the backends generally depend on asyncio
, therefore Trio is not supported.
You can initialize a backend like this:
from grelmicro.sync.redis import RedisSyncBackend
backend = RedisSyncBackend("redis://localhost:6379/0")
from grelmicro.sync.postgres import PostgresSyncBackend
backend = PostgresSyncBackend("postgresql://user:password@localhost:5432/db")
from grelmicro.sync.memory import MemorySyncBackend
backend = MemorySyncBackend()
Warning
Please make sure to use a proper way to store connection url, such as environment variables (not like the example above).
Tip
Feel free to create your own backend and contribute it. In the sync.abc
module, you can find the protocol for creating new backends.
Leader Election
Leader election ensures that only one worker in the cluster is designated as the leader at any given time using a distributed lock.
The leader election service is responsible for acquiring and renewing the distributed lock. It runs as an AnyIO Task that can be easily started with the Task Manager. This service operates in the background, automatically renewing the lock to prevent other workers from acquiring it. The lock is released automatically when the task is cancelled or during shutdown.
from grelmicro.sync import LeaderElection
from grelmicro.task import TaskManager
leader = LeaderElection("cluster_group")
task = TaskManager()
task.add_task(leader)
from anyio import create_task_group, sleep_forever
from grelmicro.sync.leaderelection import LeaderElection
leader = LeaderElection("cluster_group")
async def main():
async with create_task_group() as tg:
await tg.start(leader)
await sleep_forever()
Lock
The lock is a distributed lock that can be used to synchronize access to shared resources.
The lock supports the following features:
- Async: The lock must be acquired and released asynchronously.
- Distributed: The lock must be distributed across multiple workers.
- Reentrant: The lock must allow the same token to acquire it multiple times to extend the lease.
- Expiring: The lock must have a timeout to auto-release after an interval to prevent deadlocks.
- Non-blocking: Lock operations must not block the async event loop.
- Vendor-agnostic: Must support multiple backends (Redis, Postgres, ConfigMap, etc.).
from grelmicro.sync import Lock
lock = Lock("resource_name")
async def main():
async with lock:
print("Protected resource accessed")
Warning
The lock is designed for use within an async event loop and is not thread-safe or process-safe.