Skip to content

Task Scheduler

The task package provides a simple task scheduler that can be used to run tasks periodically.

Note: This is not a replacement for bigger tools like Celery, taskiq, or APScheduler. It is just lightweight, easy to use, and safe for running tasks in a distributed system with synchronization.

The key features are:

  • Fast & Easy: Offers simple decorators to define and schedule tasks effortlessly.
  • Interval Task: Allows tasks to run at specified intervals.
  • Synchronization: Controls concurrency using synchronization primitives to manage simultaneous task execution (see the sync package).
  • Dependency Injection: Use FastDepends library to inject dependencies into tasks.
  • Error Handling: Catches and logs errors, ensuring that task execution failures do not stop the scheduling.

Task Manager

The TaskManager class is the main entry point to manage scheduled tasks. You need to start the task manager to run the scheduled tasks using the application lifespan.

from contextlib import asynccontextmanager

from fastapi import FastAPI

from grelmicro.task import TaskManager

task = TaskManager()


@asynccontextmanager
async def lifespan(app: FastAPI):
    async with task:
        yield


app = FastAPI(lifespan=lifespan)
from contextlib import asynccontextmanager

from faststream import ContextRepo, FastStream
from faststream.redis import RedisBroker

from grelmicro.task import TaskManager

task = TaskManager()


@asynccontextmanager
async def lifespan(context: ContextRepo):
    async with task:
        yield


broker = RedisBroker()
app = FastStream(broker, lifespan=lifespan)

Interval Task

To create an IntervalTask, use the interval decorator method of the TaskManager instance. This decorator allows tasks to run at specified intervals.

Note: The interval specifies the waiting time between task executions. Ensure that the task execution duration is considered to meet deadlines effectively.

from grelmicro.task import TaskManager

task = TaskManager()


@task.interval(seconds=5)
async def my_task():
    print("Hello, World!")
from grelmicro.task import TaskRouter

task = TaskRouter()


@task.interval(seconds=5)
async def my_task():
    print("Hello, World!")

Synchronization

The Task can be synchronized using a Synchoronization Primitive to control concurrency and manage simultaneous task execution.

from grelmicro.sync import Lock
from grelmicro.task import TaskManager

lock = Lock("my_task")
task = TaskManager()


@task.interval(seconds=5, sync=lock)
async def my_task():
    async with lock:
        print("Hello, World!")
from grelmicro.sync import LeaderElection
from grelmicro.task import TaskManager

leader = LeaderElection("my_task")
task = TaskManager()
task.add_task(leader)


@task.interval(seconds=5, sync=leader)
async def my_task():
    async with leader:
        print("Hello, World!")

Task Router

For bigger applications, you can use the TaskRouter class to manage tasks in different modules.

from grelmicro.task import TaskRouter


router = TaskRouter()


@router.interval(seconds=5)
async def my_task():
    print("Hello, World!")

Then you can include the TaskRouter into the TaskManager or other routers using the include_router method.

from grelmicro.task.manager import TaskManager

task = TaskManager()
task.include_router(router)

Tip

The TaskRouter follows the same philosophy as the APIRouter in FastAPI or the Router in FastStream.