Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/work_queues.py: 39%
67 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Routes for interacting with work queue objects.
3"""
5from typing import List, Optional 1a
6from uuid import UUID 1a
8from fastapi import ( 1a
9 Body,
10 Depends,
11 Header,
12 HTTPException,
13 Path,
14 status,
15)
16from sqlalchemy.exc import IntegrityError 1a
18import prefect.server.api.dependencies as dependencies 1a
19import prefect.server.models as models 1a
20import prefect.server.schemas as schemas 1a
21from prefect.server.database import ( 1a
22 PrefectDBInterface,
23 provide_database_interface,
24)
25from prefect.server.models.deployments import mark_deployments_ready 1a
26from prefect.server.models.work_queues import ( 1a
27 emit_work_queue_status_event,
28 mark_work_queues_ready,
29)
30from prefect.server.schemas.statuses import WorkQueueStatus 1a
31from prefect.server.utilities.server import PrefectRouter 1a
32from prefect.types import DateTime 1a
34router: PrefectRouter = PrefectRouter(prefix="/work_queues", tags=["Work Queues"]) 1a
37@router.post("/", status_code=status.HTTP_201_CREATED) 1a
38async def create_work_queue( 1a
39 work_queue: schemas.actions.WorkQueueCreate,
40 db: PrefectDBInterface = Depends(provide_database_interface),
41) -> schemas.responses.WorkQueueResponse:
42 """
43 Creates a new work queue.
45 If a work queue with the same name already exists, an error
46 will be raised.
48 For more information, see https://docs.prefect.io/v3/concepts/work-pools#work-queues.
49 """
51 try:
52 async with db.session_context(begin_transaction=True) as session:
53 model = await models.work_queues.create_work_queue(
54 session=session, work_queue=work_queue
55 )
56 except IntegrityError:
57 raise HTTPException(
58 status_code=status.HTTP_409_CONFLICT,
59 detail="A work queue with this name already exists.",
60 )
62 return schemas.responses.WorkQueueResponse.model_validate(
63 model, from_attributes=True
64 )
67@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a
68async def update_work_queue( 1a
69 work_queue: schemas.actions.WorkQueueUpdate,
70 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"),
71 db: PrefectDBInterface = Depends(provide_database_interface),
72) -> None:
73 """
74 Updates an existing work queue.
75 """
76 async with db.session_context(begin_transaction=True) as session:
77 result = await models.work_queues.update_work_queue(
78 session=session,
79 work_queue_id=work_queue_id,
80 work_queue=work_queue,
81 emit_status_change=emit_work_queue_status_event,
82 )
83 if not result:
84 raise HTTPException(
85 status_code=status.HTTP_404_NOT_FOUND, detail=f"Work Queue {id} not found"
86 )
89@router.get("/name/{name}") 1a
90async def read_work_queue_by_name( 1a
91 name: str = Path(..., description="The work queue name"),
92 db: PrefectDBInterface = Depends(provide_database_interface),
93) -> schemas.responses.WorkQueueResponse:
94 """
95 Get a work queue by id.
96 """
97 async with db.session_context() as session:
98 work_queue = await models.work_queues.read_work_queue_by_name(
99 session=session, name=name
100 )
101 if not work_queue:
102 raise HTTPException(
103 status_code=status.HTTP_404_NOT_FOUND, detail="work queue not found"
104 )
105 return schemas.responses.WorkQueueResponse.model_validate(
106 work_queue, from_attributes=True
107 )
110@router.get("/{id:uuid}") 1a
111async def read_work_queue( 1a
112 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"),
113 db: PrefectDBInterface = Depends(provide_database_interface),
114) -> schemas.responses.WorkQueueResponse:
115 """
116 Get a work queue by id.
117 """
118 async with db.session_context() as session:
119 work_queue = await models.work_queues.read_work_queue(
120 session=session, work_queue_id=work_queue_id
121 )
122 if not work_queue:
123 raise HTTPException(
124 status_code=status.HTTP_404_NOT_FOUND, detail="work queue not found"
125 )
126 return schemas.responses.WorkQueueResponse.model_validate(
127 work_queue, from_attributes=True
128 )
131@router.post("/{id:uuid}/get_runs") 1a
132async def read_work_queue_runs( 1a
133 docket: dependencies.Docket,
134 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"),
135 limit: int = dependencies.LimitBody(),
136 scheduled_before: DateTime = Body(
137 None,
138 description=(
139 "Only flow runs scheduled to start before this time will be returned."
140 ),
141 ),
142 x_prefect_ui: Optional[bool] = Header(
143 default=False,
144 description="A header to indicate this request came from the Prefect UI.",
145 ),
146 db: PrefectDBInterface = Depends(provide_database_interface),
147) -> List[schemas.responses.FlowRunResponse]:
148 """
149 Get flow runs from the work queue.
150 """
151 async with db.session_context(begin_transaction=True) as session:
152 work_queue, flow_runs = await models.work_queues.get_runs_in_work_queue(
153 session=session,
154 work_queue_id=work_queue_id,
155 scheduled_before=scheduled_before,
156 limit=limit,
157 )
159 # The Prefect UI often calls this route to see which runs are enqueued.
160 # We do not want to record this as an actual poll event.
161 if x_prefect_ui:
162 return flow_runs
164 await docket.add(mark_work_queues_ready)(
165 polled_work_queue_ids=[work_queue_id],
166 ready_work_queue_ids=(
167 [work_queue_id] if work_queue.status == WorkQueueStatus.NOT_READY else []
168 ),
169 )
171 await docket.add(mark_deployments_ready)(
172 work_queue_ids=[work_queue_id],
173 )
175 return flow_runs
178@router.post("/filter") 1a
179async def read_work_queues( 1a
180 limit: int = dependencies.LimitBody(),
181 offset: int = Body(0, ge=0),
182 work_queues: Optional[schemas.filters.WorkQueueFilter] = None,
183 db: PrefectDBInterface = Depends(provide_database_interface),
184) -> List[schemas.responses.WorkQueueResponse]:
185 """
186 Query for work queues.
187 """
188 async with db.session_context() as session:
189 wqs = await models.work_queues.read_work_queues(
190 session=session, offset=offset, limit=limit, work_queue_filter=work_queues
191 )
193 return [
194 schemas.responses.WorkQueueResponse.model_validate(wq, from_attributes=True)
195 for wq in wqs
196 ]
199@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a
200async def delete_work_queue( 1a
201 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"),
202 db: PrefectDBInterface = Depends(provide_database_interface),
203) -> None:
204 """
205 Delete a work queue by id.
206 """
207 async with db.session_context(begin_transaction=True) as session:
208 result = await models.work_queues.delete_work_queue(
209 session=session, work_queue_id=work_queue_id
210 )
211 if not result:
212 raise HTTPException(
213 status_code=status.HTTP_404_NOT_FOUND, detail="work queue not found"
214 )
217@router.get("/{id:uuid}/status") 1a
218async def read_work_queue_status( 1a
219 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"),
220 db: PrefectDBInterface = Depends(provide_database_interface),
221) -> schemas.core.WorkQueueStatusDetail:
222 """
223 Get the status of a work queue.
224 """
225 async with db.session_context() as session:
226 work_queue_status = await models.work_queues.read_work_queue_status(
227 session=session, work_queue_id=work_queue_id
228 )
229 return work_queue_status