Task Scheduler
The task
package provides a simple task scheduler that can be used to run tasks periodically.
Note: This is not a replacement for bigger tools like Celery, taskiq, or APScheduler. It is just lightweight, easy to use, and safe for running tasks in a distributed system with synchronization.
The key features are:
- Fast & Easy: Offers simple decorators to define and schedule tasks effortlessly.
- Interval Task: Allows tasks to run at specified intervals.
- Synchronization: Controls concurrency using synchronization primitives to manage simultaneous task execution (see the
sync
package). - Dependency Injection: Use FastDepends library to inject dependencies into tasks.
- Error Handling: Catches and logs errors, ensuring that task execution failures do not stop the scheduling.
Task Manager
The TaskManager
class is the main entry point to manage scheduled tasks. You need to start the task manager to run the scheduled tasks using the application lifespan.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from grelmicro.task import TaskManager
task = TaskManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
async with task:
yield
app = FastAPI(lifespan=lifespan)
from contextlib import asynccontextmanager
from faststream import ContextRepo, FastStream
from faststream.redis import RedisBroker
from grelmicro.task import TaskManager
task = TaskManager()
@asynccontextmanager
async def lifespan(context: ContextRepo):
async with task:
yield
broker = RedisBroker()
app = FastStream(broker, lifespan=lifespan)
Interval Task
To create an IntervalTask
, use the interval
decorator method of the TaskManager
instance. This decorator allows tasks to run at specified intervals.
Note: The interval specifies the waiting time between task executions. Ensure that the task execution duration is considered to meet deadlines effectively.
from grelmicro.task import TaskManager
task = TaskManager()
@task.interval(seconds=5)
async def my_task():
print("Hello, World!")
from grelmicro.task import TaskRouter
task = TaskRouter()
@task.interval(seconds=5)
async def my_task():
print("Hello, World!")
Synchronization
The Task can be synchronized using a Synchoronization Primitive to control concurrency and manage simultaneous task execution.
from grelmicro.sync import Lock
from grelmicro.task import TaskManager
lock = Lock("my_task")
task = TaskManager()
@task.interval(seconds=5, sync=lock)
async def my_task():
async with lock:
print("Hello, World!")
from grelmicro.sync import LeaderElection
from grelmicro.task import TaskManager
leader = LeaderElection("my_task")
task = TaskManager()
task.add_task(leader)
@task.interval(seconds=5, sync=leader)
async def my_task():
async with leader:
print("Hello, World!")
Task Router
For bigger applications, you can use the TaskRouter
class to manage tasks in different modules.
from grelmicro.task import TaskRouter
router = TaskRouter()
@router.interval(seconds=5)
async def my_task():
print("Hello, World!")
Then you can include the TaskRouter
into the TaskManager
or other routers using the include_router
method.
from grelmicro.task.manager import TaskManager
task = TaskManager()
task.include_router(router)
Tip
The TaskRouter
follows the same philosophy as the APIRouter
in FastAPI or the Router in FastStream.