Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/workers.py: 29%
162 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Routes for interacting with work queue objects.
3"""
5from typing import TYPE_CHECKING, List, Optional 1a
6from uuid import UUID 1a
8import sqlalchemy as sa 1a
9from fastapi import ( 1a
10 Body,
11 Depends,
12 HTTPException,
13 Path,
14 status,
15)
16from packaging.version import Version 1a
17from sqlalchemy.ext.asyncio import AsyncSession 1a
19import prefect.server.api.dependencies as dependencies 1a
20import prefect.server.models as models 1a
21import prefect.server.schemas as schemas 1a
22from prefect._internal.uuid7 import uuid7 1a
23from prefect.server.api.validation import validate_job_variable_defaults_for_work_pool 1a
24from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
25from prefect.server.models.deployments import mark_deployments_ready 1a
26from prefect.server.models.work_queues import ( 1a
27 emit_work_queue_status_event,
28 mark_work_queues_ready,
29)
30from prefect.server.models.workers import emit_work_pool_status_event 1a
31from prefect.server.schemas.statuses import WorkQueueStatus 1a
32from prefect.server.utilities.server import PrefectRouter 1a
33from prefect.types import DateTime 1a
34from prefect.types._datetime import now 1a
36if TYPE_CHECKING: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true1a
37 from prefect.server.database.orm_models import ORMWorkQueue
39router: PrefectRouter = PrefectRouter( 1a
40 prefix="/work_pools",
41 tags=["Work Pools"],
42)
45# -----------------------------------------------------
46# --
47# --
48# -- Utility functions & dependencies
49# --
50# --
51# -----------------------------------------------------
54class WorkerLookups: 1a
55 async def _get_work_pool_id_from_name( 1a
56 self, session: AsyncSession, work_pool_name: str
57 ) -> UUID:
58 """
59 Given a work pool name, return its ID. Used for translating
60 user-facing APIs (which are name-based) to internal ones (which are
61 id-based).
62 """
63 work_pool = await models.workers.read_work_pool_by_name(
64 session=session,
65 work_pool_name=work_pool_name,
66 )
67 if not work_pool:
68 raise HTTPException(
69 status_code=status.HTTP_404_NOT_FOUND,
70 detail=f'Work pool "{work_pool_name}" not found.',
71 )
73 return work_pool.id
75 async def _get_default_work_queue_id_from_work_pool_name( 1a
76 self, session: AsyncSession, work_pool_name: str
77 ):
78 """
79 Given a work pool name, return the ID of its default queue.
80 Used for translating user-facing APIs (which are name-based)
81 to internal ones (which are id-based).
82 """
83 work_pool = await models.workers.read_work_pool_by_name(
84 session=session,
85 work_pool_name=work_pool_name,
86 )
87 if not work_pool:
88 raise HTTPException(
89 status_code=status.HTTP_404_NOT_FOUND,
90 detail=f'Work pool "{work_pool_name}" not found.',
91 )
93 return work_pool.default_queue_id
95 async def _get_work_queue_from_name( 1a
96 self,
97 session: AsyncSession,
98 work_pool_name: str,
99 work_queue_name: str,
100 create_queue_if_not_found: bool = False,
101 ) -> "ORMWorkQueue":
102 """
103 Given a work pool name and work pool queue name, return the ID of the
104 queue. Used for translating user-facing APIs (which are name-based) to
105 internal ones (which are id-based).
106 """
107 work_queue = await models.workers.read_work_queue_by_name(
108 session=session,
109 work_pool_name=work_pool_name,
110 work_queue_name=work_queue_name,
111 )
112 if not work_queue:
113 if not create_queue_if_not_found:
114 raise HTTPException(
115 status_code=status.HTTP_404_NOT_FOUND,
116 detail=(
117 f"Work pool queue '{work_pool_name}/{work_queue_name}' not"
118 " found."
119 ),
120 )
121 work_pool_id = await self._get_work_pool_id_from_name(
122 session=session, work_pool_name=work_pool_name
123 )
124 work_queue = await models.workers.create_work_queue(
125 session=session,
126 work_pool_id=work_pool_id,
127 work_queue=schemas.actions.WorkQueueCreate(name=work_queue_name),
128 )
130 return work_queue
132 async def _get_work_queue_id_from_name( 1a
133 self,
134 session: AsyncSession,
135 work_pool_name: str,
136 work_queue_name: str,
137 create_queue_if_not_found: bool = False,
138 ) -> UUID:
139 queue = await self._get_work_queue_from_name(
140 session=session,
141 work_pool_name=work_pool_name,
142 work_queue_name=work_queue_name,
143 create_queue_if_not_found=create_queue_if_not_found,
144 )
145 return queue.id
148# -----------------------------------------------------
149# --
150# --
151# -- Worker Pools
152# --
153# --
154# -----------------------------------------------------
157@router.post("/", status_code=status.HTTP_201_CREATED) 1a
158async def create_work_pool( 1a
159 work_pool: schemas.actions.WorkPoolCreate,
160 db: PrefectDBInterface = Depends(provide_database_interface),
161 prefect_client_version: Optional[str] = Depends(
162 dependencies.get_prefect_client_version
163 ),
164) -> schemas.core.WorkPool:
165 """
166 Creates a new work pool. If a work pool with the same
167 name already exists, an error will be raised.
169 For more information, see https://docs.prefect.io/v3/concepts/work-pools.
170 """
171 if work_pool.name.lower().startswith("prefect"):
172 raise HTTPException(
173 status_code=status.HTTP_403_FORBIDDEN,
174 detail="Work pools starting with 'Prefect' are reserved for internal use.",
175 )
177 try:
178 async with db.session_context(begin_transaction=True) as session:
179 await validate_job_variable_defaults_for_work_pool(
180 session, work_pool.name, work_pool.base_job_template
181 )
182 model = await models.workers.create_work_pool(
183 session=session, work_pool=work_pool
184 )
186 await emit_work_pool_status_event(
187 event_id=uuid7(),
188 occurred=now("UTC"),
189 pre_update_work_pool=None,
190 work_pool=model,
191 )
193 ret = schemas.core.WorkPool.model_validate(model, from_attributes=True)
194 if prefect_client_version and Version(prefect_client_version) <= Version(
195 "3.3.7"
196 ):
197 # Client versions 3.3.7 and below do not support the default_result_storage_block_id field and will error
198 # when receiving it.
199 del ret.storage_configuration.default_result_storage_block_id
200 return ret
202 except sa.exc.IntegrityError:
203 raise HTTPException(
204 status_code=status.HTTP_409_CONFLICT,
205 detail="A work pool with this name already exists.",
206 )
209@router.get("/{name}") 1a
210async def read_work_pool( 1a
211 work_pool_name: str = Path(..., description="The work pool name", alias="name"),
212 worker_lookups: WorkerLookups = Depends(WorkerLookups),
213 db: PrefectDBInterface = Depends(provide_database_interface),
214 prefect_client_version: Optional[str] = Depends(
215 dependencies.get_prefect_client_version
216 ),
217) -> schemas.core.WorkPool:
218 """
219 Read a work pool by name
220 """
222 async with db.session_context() as session:
223 work_pool_id = await worker_lookups._get_work_pool_id_from_name(
224 session=session, work_pool_name=work_pool_name
225 )
226 orm_work_pool = await models.workers.read_work_pool(
227 session=session, work_pool_id=work_pool_id
228 )
229 work_pool = schemas.core.WorkPool.model_validate(
230 orm_work_pool, from_attributes=True
231 )
233 if prefect_client_version and Version(prefect_client_version) <= Version(
234 "3.3.7"
235 ):
236 # Client versions 3.3.7 and below do not support the default_result_storage_block_id field and will error
237 # when receiving it.
238 del work_pool.storage_configuration.default_result_storage_block_id
240 return work_pool
243@router.post("/filter") 1a
244async def read_work_pools( 1a
245 work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
246 limit: int = dependencies.LimitBody(),
247 offset: int = Body(0, ge=0),
248 db: PrefectDBInterface = Depends(provide_database_interface),
249 prefect_client_version: Optional[str] = Depends(
250 dependencies.get_prefect_client_version
251 ),
252) -> List[schemas.core.WorkPool]:
253 """
254 Read multiple work pools
255 """
256 async with db.session_context() as session:
257 orm_work_pools = await models.workers.read_work_pools(
258 session=session,
259 work_pool_filter=work_pools,
260 offset=offset,
261 limit=limit,
262 )
263 ret = [
264 schemas.core.WorkPool.model_validate(w, from_attributes=True)
265 for w in orm_work_pools
266 ]
267 if prefect_client_version and Version(prefect_client_version) <= Version(
268 "3.3.7"
269 ):
270 # Client versions 3.3.7 and below do not support the default_result_storage_block_id field and will error
271 # when receiving it.
272 for work_pool in ret:
273 del work_pool.storage_configuration.default_result_storage_block_id
274 return ret
277@router.post("/count") 1a
278async def count_work_pools( 1a
279 work_pools: Optional[schemas.filters.WorkPoolFilter] = Body(None, embed=True),
280 db: PrefectDBInterface = Depends(provide_database_interface),
281) -> int:
282 """
283 Count work pools
284 """
285 async with db.session_context() as session:
286 return await models.workers.count_work_pools(
287 session=session, work_pool_filter=work_pools
288 )
291@router.patch("/{name}", status_code=status.HTTP_204_NO_CONTENT) 1a
292async def update_work_pool( 1a
293 work_pool: schemas.actions.WorkPoolUpdate,
294 work_pool_name: str = Path(..., description="The work pool name", alias="name"),
295 worker_lookups: WorkerLookups = Depends(WorkerLookups),
296 db: PrefectDBInterface = Depends(provide_database_interface),
297) -> None:
298 """
299 Update a work pool
300 """
302 # Reserved pools can only updated pause / concurrency
303 update_values = work_pool.model_dump(exclude_unset=True)
304 if work_pool_name.lower().startswith("prefect") and (
305 set(update_values).difference({"is_paused", "concurrency_limit"})
306 ):
307 raise HTTPException(
308 status_code=status.HTTP_403_FORBIDDEN,
309 detail=(
310 "Work pools starting with 'Prefect' are reserved for internal use "
311 "and can only be updated to set concurrency limits or pause."
312 ),
313 )
315 async with db.session_context(begin_transaction=True) as session:
316 work_pool_id = await worker_lookups._get_work_pool_id_from_name(
317 session=session, work_pool_name=work_pool_name
318 )
319 await models.workers.update_work_pool(
320 session=session,
321 work_pool_id=work_pool_id,
322 work_pool=work_pool,
323 emit_status_change=emit_work_pool_status_event,
324 )
327@router.delete("/{name}", status_code=status.HTTP_204_NO_CONTENT) 1a
328async def delete_work_pool( 1a
329 work_pool_name: str = Path(..., description="The work pool name", alias="name"),
330 worker_lookups: WorkerLookups = Depends(WorkerLookups),
331 db: PrefectDBInterface = Depends(provide_database_interface),
332) -> None:
333 """
334 Delete a work pool
335 """
337 if work_pool_name.lower().startswith("prefect"):
338 raise HTTPException(
339 status_code=status.HTTP_403_FORBIDDEN,
340 detail=(
341 "Work pools starting with 'Prefect' are reserved for internal use and"
342 " can not be deleted."
343 ),
344 )
346 async with db.session_context(begin_transaction=True) as session:
347 work_pool_id = await worker_lookups._get_work_pool_id_from_name(
348 session=session, work_pool_name=work_pool_name
349 )
351 await models.workers.delete_work_pool(
352 session=session, work_pool_id=work_pool_id
353 )
356@router.post("/{name}/get_scheduled_flow_runs") 1a
357async def get_scheduled_flow_runs( 1a
358 docket: dependencies.Docket,
359 work_pool_name: str = Path(..., description="The work pool name", alias="name"),
360 work_queue_names: List[str] = Body(
361 None, description="The names of work pool queues"
362 ),
363 scheduled_before: DateTime = Body(
364 None, description="The maximum time to look for scheduled flow runs"
365 ),
366 scheduled_after: DateTime = Body(
367 None, description="The minimum time to look for scheduled flow runs"
368 ),
369 limit: int = dependencies.LimitBody(),
370 worker_lookups: WorkerLookups = Depends(WorkerLookups),
371 db: PrefectDBInterface = Depends(provide_database_interface),
372) -> List[schemas.responses.WorkerFlowRunResponse]:
373 """
374 Load scheduled runs for a worker
375 """
376 async with db.session_context() as session:
377 work_pool_id = await worker_lookups._get_work_pool_id_from_name(
378 session=session, work_pool_name=work_pool_name
379 )
381 if not work_queue_names:
382 work_queues = list(
383 await models.workers.read_work_queues(
384 session=session, work_pool_id=work_pool_id
385 )
386 )
387 # None here instructs get_scheduled_flow_runs to use the default behavior
388 # of just operating on all work queues of the pool
389 work_queue_ids = None
390 else:
391 work_queues = [
392 await worker_lookups._get_work_queue_from_name(
393 session=session,
394 work_pool_name=work_pool_name,
395 work_queue_name=name,
396 )
397 for name in work_queue_names
398 ]
399 work_queue_ids = [wq.id for wq in work_queues]
401 async with db.session_context(begin_transaction=True) as session:
402 queue_response = await models.workers.get_scheduled_flow_runs(
403 session=session,
404 work_pool_ids=[work_pool_id],
405 work_queue_ids=work_queue_ids,
406 scheduled_before=scheduled_before,
407 scheduled_after=scheduled_after,
408 limit=limit,
409 )
411 await docket.add(mark_work_queues_ready)(
412 polled_work_queue_ids=[
413 wq.id for wq in work_queues if wq.status != WorkQueueStatus.NOT_READY
414 ],
415 ready_work_queue_ids=[
416 wq.id for wq in work_queues if wq.status == WorkQueueStatus.NOT_READY
417 ],
418 )
420 await docket.add(mark_deployments_ready)(
421 work_queue_ids=[wq.id for wq in work_queues],
422 )
424 return queue_response
427# -----------------------------------------------------
428# --
429# --
430# -- Work Pool Queues
431# --
432# --
433# -----------------------------------------------------
436@router.post("/{work_pool_name}/queues", status_code=status.HTTP_201_CREATED) 1a
437async def create_work_queue( 1a
438 work_queue: schemas.actions.WorkQueueCreate,
439 work_pool_name: str = Path(..., description="The work pool name"),
440 worker_lookups: WorkerLookups = Depends(WorkerLookups),
441 db: PrefectDBInterface = Depends(provide_database_interface),
442) -> schemas.responses.WorkQueueResponse:
443 """
444 Creates a new work pool queue. If a work pool queue with the same
445 name already exists, an error will be raised.
447 For more information, see https://docs.prefect.io/v3/concepts/work-pools#work-queues.
448 """
450 try:
451 async with db.session_context(begin_transaction=True) as session:
452 work_pool_id = await worker_lookups._get_work_pool_id_from_name(
453 session=session,
454 work_pool_name=work_pool_name,
455 )
457 model = await models.workers.create_work_queue(
458 session=session,
459 work_pool_id=work_pool_id,
460 work_queue=work_queue,
461 )
462 except sa.exc.IntegrityError:
463 raise HTTPException(
464 status_code=status.HTTP_409_CONFLICT,
465 detail=(
466 "A work queue with this name already exists in work pool"
467 " {work_pool_name!r}."
468 ),
469 )
471 return schemas.responses.WorkQueueResponse.model_validate(
472 model, from_attributes=True
473 )
476@router.get("/{work_pool_name}/queues/{name}") 1a
477async def read_work_queue( 1a
478 work_pool_name: str = Path(..., description="The work pool name"),
479 work_queue_name: str = Path(
480 ..., description="The work pool queue name", alias="name"
481 ),
482 worker_lookups: WorkerLookups = Depends(WorkerLookups),
483 db: PrefectDBInterface = Depends(provide_database_interface),
484) -> schemas.responses.WorkQueueResponse:
485 """
486 Read a work pool queue
487 """
489 async with db.session_context(begin_transaction=True) as session:
490 work_queue_id = await worker_lookups._get_work_queue_id_from_name(
491 session=session,
492 work_pool_name=work_pool_name,
493 work_queue_name=work_queue_name,
494 )
496 model = await models.workers.read_work_queue(
497 session=session, work_queue_id=work_queue_id
498 )
500 return schemas.responses.WorkQueueResponse.model_validate(
501 model, from_attributes=True
502 )
505@router.post("/{work_pool_name}/queues/filter") 1a
506async def read_work_queues( 1a
507 work_pool_name: str = Path(..., description="The work pool name"),
508 work_queues: schemas.filters.WorkQueueFilter = None,
509 limit: int = dependencies.LimitBody(),
510 offset: int = Body(0, ge=0),
511 worker_lookups: WorkerLookups = Depends(WorkerLookups),
512 db: PrefectDBInterface = Depends(provide_database_interface),
513) -> List[schemas.responses.WorkQueueResponse]:
514 """
515 Read all work pool queues
516 """
517 async with db.session_context() as session:
518 work_pool_id = await worker_lookups._get_work_pool_id_from_name(
519 session=session,
520 work_pool_name=work_pool_name,
521 )
522 wqs = await models.workers.read_work_queues(
523 session=session,
524 work_pool_id=work_pool_id,
525 work_queue_filter=work_queues,
526 limit=limit,
527 offset=offset,
528 )
530 return [
531 schemas.responses.WorkQueueResponse.model_validate(wq, from_attributes=True)
532 for wq in wqs
533 ]
536@router.patch("/{work_pool_name}/queues/{name}", status_code=status.HTTP_204_NO_CONTENT) 1a
537async def update_work_queue( 1a
538 work_queue: schemas.actions.WorkQueueUpdate,
539 work_pool_name: str = Path(..., description="The work pool name"),
540 work_queue_name: str = Path(
541 ..., description="The work pool queue name", alias="name"
542 ),
543 worker_lookups: WorkerLookups = Depends(WorkerLookups),
544 db: PrefectDBInterface = Depends(provide_database_interface),
545) -> None:
546 """
547 Update a work pool queue
548 """
550 async with db.session_context(begin_transaction=True) as session:
551 work_queue_id = await worker_lookups._get_work_queue_id_from_name(
552 work_pool_name=work_pool_name,
553 work_queue_name=work_queue_name,
554 session=session,
555 )
557 await models.workers.update_work_queue(
558 session=session,
559 work_queue_id=work_queue_id,
560 work_queue=work_queue,
561 emit_status_change=emit_work_queue_status_event,
562 )
565@router.delete( 1a
566 "/{work_pool_name}/queues/{name}", status_code=status.HTTP_204_NO_CONTENT
567)
568async def delete_work_queue( 1a
569 work_pool_name: str = Path(..., description="The work pool name"),
570 work_queue_name: str = Path(
571 ..., description="The work pool queue name", alias="name"
572 ),
573 worker_lookups: WorkerLookups = Depends(WorkerLookups),
574 db: PrefectDBInterface = Depends(provide_database_interface),
575) -> None:
576 """
577 Delete a work pool queue
578 """
580 async with db.session_context(begin_transaction=True) as session:
581 work_queue_id = await worker_lookups._get_work_queue_id_from_name(
582 session=session,
583 work_pool_name=work_pool_name,
584 work_queue_name=work_queue_name,
585 )
587 await models.workers.delete_work_queue(
588 session=session, work_queue_id=work_queue_id
589 )
592# -----------------------------------------------------
593# --
594# --
595# -- Workers
596# --
597# --
598# -----------------------------------------------------
601@router.post( 1a
602 "/{work_pool_name}/workers/heartbeat",
603 status_code=status.HTTP_204_NO_CONTENT,
604)
605async def worker_heartbeat( 1a
606 work_pool_name: str = Path(..., description="The work pool name"),
607 name: str = Body(..., description="The worker process name", embed=True),
608 heartbeat_interval_seconds: Optional[int] = Body(
609 None, description="The worker's heartbeat interval in seconds", embed=True
610 ),
611 worker_lookups: WorkerLookups = Depends(WorkerLookups),
612 db: PrefectDBInterface = Depends(provide_database_interface),
613) -> None:
614 async with db.session_context(begin_transaction=True) as session:
615 work_pool = await models.workers.read_work_pool_by_name(
616 session=session,
617 work_pool_name=work_pool_name,
618 )
619 if not work_pool:
620 raise HTTPException(
621 status_code=status.HTTP_404_NOT_FOUND,
622 detail=f'Work pool "{work_pool_name}" not found.',
623 )
625 await models.workers.worker_heartbeat(
626 session=session,
627 work_pool_id=work_pool.id,
628 worker_name=name,
629 heartbeat_interval_seconds=heartbeat_interval_seconds,
630 )
632 if work_pool.status == schemas.statuses.WorkPoolStatus.NOT_READY:
633 await models.workers.update_work_pool(
634 session=session,
635 work_pool_id=work_pool.id,
636 work_pool=schemas.internal.InternalWorkPoolUpdate(
637 status=schemas.statuses.WorkPoolStatus.READY
638 ),
639 emit_status_change=emit_work_pool_status_event,
640 )
643@router.post("/{work_pool_name}/workers/filter") 1a
644async def read_workers( 1a
645 work_pool_name: str = Path(..., description="The work pool name"),
646 workers: Optional[schemas.filters.WorkerFilter] = None,
647 limit: int = dependencies.LimitBody(),
648 offset: int = Body(0, ge=0),
649 worker_lookups: WorkerLookups = Depends(WorkerLookups),
650 db: PrefectDBInterface = Depends(provide_database_interface),
651) -> List[schemas.responses.WorkerResponse]:
652 """
653 Read all worker processes
654 """
655 async with db.session_context() as session:
656 work_pool_id = await worker_lookups._get_work_pool_id_from_name(
657 session=session, work_pool_name=work_pool_name
658 )
659 return await models.workers.read_workers(
660 session=session,
661 work_pool_id=work_pool_id,
662 worker_filter=workers,
663 limit=limit,
664 offset=offset,
665 )
668@router.delete( 1a
669 "/{work_pool_name}/workers/{name}", status_code=status.HTTP_204_NO_CONTENT
670)
671async def delete_worker( 1a
672 work_pool_name: str = Path(..., description="The work pool name"),
673 worker_name: str = Path(
674 ..., description="The work pool's worker name", alias="name"
675 ),
676 worker_lookups: WorkerLookups = Depends(WorkerLookups),
677 db: PrefectDBInterface = Depends(provide_database_interface),
678) -> None:
679 """
680 Delete a work pool's worker
681 """
683 async with db.session_context(begin_transaction=True) as session:
684 work_pool_id = await worker_lookups._get_work_pool_id_from_name(
685 session=session, work_pool_name=work_pool_name
686 )
687 deleted = await models.workers.delete_worker(
688 session=session, work_pool_id=work_pool_id, worker_name=worker_name
689 )
690 if not deleted:
691 raise HTTPException(
692 status_code=status.HTTP_404_NOT_FOUND, detail="Worker not found."
693 )