Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/work_queues.py: 84%

67 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Routes for interacting with work queue objects. 

3""" 

4 

5from typing import List, Optional 1a

6from uuid import UUID 1a

7 

8from fastapi import ( 1a

9 Body, 

10 Depends, 

11 Header, 

12 HTTPException, 

13 Path, 

14 status, 

15) 

16from sqlalchemy.exc import IntegrityError 1a

17 

18import prefect.server.api.dependencies as dependencies 1a

19import prefect.server.models as models 1a

20import prefect.server.schemas as schemas 1a

21from prefect.server.database import ( 1a

22 PrefectDBInterface, 

23 provide_database_interface, 

24) 

25from prefect.server.models.deployments import mark_deployments_ready 1a

26from prefect.server.models.work_queues import ( 1a

27 emit_work_queue_status_event, 

28 mark_work_queues_ready, 

29) 

30from prefect.server.schemas.statuses import WorkQueueStatus 1a

31from prefect.server.utilities.server import PrefectRouter 1a

32from prefect.types import DateTime 1a

33 

34router: PrefectRouter = PrefectRouter(prefix="/work_queues", tags=["Work Queues"]) 1a

35 

36 

37@router.post("/", status_code=status.HTTP_201_CREATED) 1a

38async def create_work_queue( 1a

39 work_queue: schemas.actions.WorkQueueCreate, 

40 db: PrefectDBInterface = Depends(provide_database_interface), 

41) -> schemas.responses.WorkQueueResponse: 

42 """ 

43 Creates a new work queue. 

44 

45 If a work queue with the same name already exists, an error 

46 will be raised. 

47 

48 For more information, see https://docs.prefect.io/v3/concepts/work-pools#work-queues. 

49 """ 

50 

51 try: 1b

52 async with db.session_context(begin_transaction=True) as session: 1b

53 model = await models.work_queues.create_work_queue( 1b

54 session=session, work_queue=work_queue 

55 ) 

56 except IntegrityError: 1b

57 raise HTTPException( 1b

58 status_code=status.HTTP_409_CONFLICT, 

59 detail="A work queue with this name already exists.", 

60 ) 

61 

62 return schemas.responses.WorkQueueResponse.model_validate( 1b

63 model, from_attributes=True 

64 ) 

65 

66 

67@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

68async def update_work_queue( 1a

69 work_queue: schemas.actions.WorkQueueUpdate, 

70 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"), 

71 db: PrefectDBInterface = Depends(provide_database_interface), 

72) -> None: 

73 """ 

74 Updates an existing work queue. 

75 """ 

76 async with db.session_context(begin_transaction=True) as session: 1b

77 result = await models.work_queues.update_work_queue( 1b

78 session=session, 

79 work_queue_id=work_queue_id, 

80 work_queue=work_queue, 

81 emit_status_change=emit_work_queue_status_event, 

82 ) 

83 if not result: 83 ↛ exitline 83 didn't return from function 'update_work_queue' because the condition on line 83 was always true1b

84 raise HTTPException( 1b

85 status_code=status.HTTP_404_NOT_FOUND, detail=f"Work Queue {id} not found" 

86 ) 

87 

88 

89@router.get("/name/{name}") 1a

90async def read_work_queue_by_name( 1a

91 name: str = Path(..., description="The work queue name"), 

92 db: PrefectDBInterface = Depends(provide_database_interface), 

93) -> schemas.responses.WorkQueueResponse: 

94 """ 

95 Get a work queue by id. 

96 """ 

97 async with db.session_context() as session: 1bc

98 work_queue = await models.work_queues.read_work_queue_by_name( 1bc

99 session=session, name=name 

100 ) 

101 if not work_queue: 1bc

102 raise HTTPException( 1bc

103 status_code=status.HTTP_404_NOT_FOUND, detail="work queue not found" 

104 ) 

105 return schemas.responses.WorkQueueResponse.model_validate( 1b

106 work_queue, from_attributes=True 

107 ) 

108 

109 

110@router.get("/{id:uuid}") 1a

111async def read_work_queue( 1a

112 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"), 

113 db: PrefectDBInterface = Depends(provide_database_interface), 

114) -> schemas.responses.WorkQueueResponse: 

115 """ 

116 Get a work queue by id. 

117 """ 

118 async with db.session_context() as session: 1b

119 work_queue = await models.work_queues.read_work_queue( 1b

120 session=session, work_queue_id=work_queue_id 

121 ) 

122 if not work_queue: 122 ↛ 126line 122 didn't jump to line 126 because the condition on line 122 was always true1b

123 raise HTTPException( 1b

124 status_code=status.HTTP_404_NOT_FOUND, detail="work queue not found" 

125 ) 

126 return schemas.responses.WorkQueueResponse.model_validate( 

127 work_queue, from_attributes=True 

128 ) 

129 

130 

131@router.post("/{id:uuid}/get_runs") 1a

132async def read_work_queue_runs( 1a

133 docket: dependencies.Docket, 

134 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"), 

135 limit: int = dependencies.LimitBody(), 

136 scheduled_before: DateTime = Body( 

137 None, 

138 description=( 

139 "Only flow runs scheduled to start before this time will be returned." 

140 ), 

141 ), 

142 x_prefect_ui: Optional[bool] = Header( 

143 default=False, 

144 description="A header to indicate this request came from the Prefect UI.", 

145 ), 

146 db: PrefectDBInterface = Depends(provide_database_interface), 

147) -> List[schemas.responses.FlowRunResponse]: 

148 """ 

149 Get flow runs from the work queue. 

150 """ 

151 async with db.session_context(begin_transaction=True) as session: 1bc

152 work_queue, flow_runs = await models.work_queues.get_runs_in_work_queue( 1bc

153 session=session, 

154 work_queue_id=work_queue_id, 

155 scheduled_before=scheduled_before, 

156 limit=limit, 

157 ) 

158 

159 # The Prefect UI often calls this route to see which runs are enqueued. 

160 # We do not want to record this as an actual poll event. 

161 if x_prefect_ui: 

162 return flow_runs 

163 

164 await docket.add(mark_work_queues_ready)( 

165 polled_work_queue_ids=[work_queue_id], 

166 ready_work_queue_ids=( 

167 [work_queue_id] if work_queue.status == WorkQueueStatus.NOT_READY else [] 

168 ), 

169 ) 

170 

171 await docket.add(mark_deployments_ready)( 

172 work_queue_ids=[work_queue_id], 

173 ) 

174 

175 return flow_runs 

176 

177 

178@router.post("/filter") 1a

179async def read_work_queues( 1a

180 limit: int = dependencies.LimitBody(), 

181 offset: int = Body(0, ge=0), 

182 work_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

183 db: PrefectDBInterface = Depends(provide_database_interface), 

184) -> List[schemas.responses.WorkQueueResponse]: 

185 """ 

186 Query for work queues. 

187 """ 

188 async with db.session_context() as session: 1bc

189 wqs = await models.work_queues.read_work_queues( 1bc

190 session=session, offset=offset, limit=limit, work_queue_filter=work_queues 

191 ) 

192 

193 return [ 1bc

194 schemas.responses.WorkQueueResponse.model_validate(wq, from_attributes=True) 

195 for wq in wqs 

196 ] 

197 

198 

199@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

200async def delete_work_queue( 1a

201 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"), 

202 db: PrefectDBInterface = Depends(provide_database_interface), 

203) -> None: 

204 """ 

205 Delete a work queue by id. 

206 """ 

207 async with db.session_context(begin_transaction=True) as session: 1b

208 result = await models.work_queues.delete_work_queue( 1b

209 session=session, work_queue_id=work_queue_id 

210 ) 

211 if not result: 211 ↛ exitline 211 didn't return from function 'delete_work_queue' because the condition on line 211 was always true1b

212 raise HTTPException( 1b

213 status_code=status.HTTP_404_NOT_FOUND, detail="work queue not found" 

214 ) 

215 

216 

217@router.get("/{id:uuid}/status") 1a

218async def read_work_queue_status( 1a

219 work_queue_id: UUID = Path(..., description="The work queue id", alias="id"), 

220 db: PrefectDBInterface = Depends(provide_database_interface), 

221) -> schemas.core.WorkQueueStatusDetail: 

222 """ 

223 Get the status of a work queue. 

224 """ 

225 async with db.session_context() as session: 1b

226 work_queue_status = await models.work_queues.read_work_queue_status( 1b

227 session=session, work_queue_id=work_queue_id 

228 ) 

229 return work_queue_status