Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/background_workers.py: 70%
25 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import asyncio 1a
2from contextlib import asynccontextmanager 1a
3from typing import AsyncGenerator 1a
5from docket import Docket, Worker 1a
7from prefect.server.api.flow_runs import delete_flow_run_logs 1a
8from prefect.server.api.task_runs import delete_task_run_logs 1a
9from prefect.server.models.deployments import mark_deployments_ready 1a
10from prefect.server.models.work_queues import mark_work_queues_ready 1a
13@asynccontextmanager 1a
14async def background_worker(docket: Docket) -> AsyncGenerator[None, None]: 1a
15 worker_task: asyncio.Task[None] | None = None 1b
16 try: 1b
17 docket.register(mark_work_queues_ready) 1b
18 docket.register(mark_deployments_ready) 1b
19 docket.register(delete_task_run_logs) 1b
20 docket.register(delete_flow_run_logs) 1b
22 async with Worker(docket) as worker: 1b
23 worker_task = asyncio.create_task(worker.run_forever()) 1b
24 yield 1b
26 finally:
27 if worker_task:
28 worker_task.cancel()
29 try:
30 await worker_task
31 except asyncio.CancelledError:
32 pass