Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/background_workers.py: 70%

25 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import asyncio 1a

2from contextlib import asynccontextmanager 1a

3from typing import AsyncGenerator 1a

4 

5from docket import Docket, Worker 1a

6 

7from prefect.server.api.flow_runs import delete_flow_run_logs 1a

8from prefect.server.api.task_runs import delete_task_run_logs 1a

9from prefect.server.models.deployments import mark_deployments_ready 1a

10from prefect.server.models.work_queues import mark_work_queues_ready 1a

11 

12 

13@asynccontextmanager 1a

14async def background_worker(docket: Docket) -> AsyncGenerator[None, None]: 1a

15 worker_task: asyncio.Task[None] | None = None 1b

16 try: 1b

17 docket.register(mark_work_queues_ready) 1b

18 docket.register(mark_deployments_ready) 1b

19 docket.register(delete_task_run_logs) 1b

20 docket.register(delete_flow_run_logs) 1b

21 

22 async with Worker(docket) as worker: 1b

23 worker_task = asyncio.create_task(worker.run_forever()) 1b

24 yield 1b

25 

26 finally: 

27 if worker_task: 

28 worker_task.cancel() 

29 try: 

30 await worker_task 

31 except asyncio.CancelledError: 

32 pass