Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/workers.py: 29%

162 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Routes for interacting with work queue objects. 

3""" 

4 

5from typing import TYPE_CHECKING, List, Optional 1a

6from uuid import UUID 1a

7 

8import sqlalchemy as sa 1a

9from fastapi import ( 1a

10 Body, 

11 Depends, 

12 HTTPException, 

13 Path, 

14 status, 

15) 

16from packaging.version import Version 1a

17from sqlalchemy.ext.asyncio import AsyncSession 1a

18 

19import prefect.server.api.dependencies as dependencies 1a

20import prefect.server.models as models 1a

21import prefect.server.schemas as schemas 1a

22from prefect._internal.uuid7 import uuid7 1a

23from prefect.server.api.validation import validate_job_variable_defaults_for_work_pool 1a

24from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

25from prefect.server.models.deployments import mark_deployments_ready 1a

26from prefect.server.models.work_queues import ( 1a

27 emit_work_queue_status_event, 

28 mark_work_queues_ready, 

29) 

30from prefect.server.models.workers import emit_work_pool_status_event 1a

31from prefect.server.schemas.statuses import WorkQueueStatus 1a

32from prefect.server.utilities.server import PrefectRouter 1a

33from prefect.types import DateTime 1a

34from prefect.types._datetime import now 1a

35 

36if TYPE_CHECKING: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true1a

37 from prefect.server.database.orm_models import ORMWorkQueue 

38 

39router: PrefectRouter = PrefectRouter( 1a

40 prefix="/work_pools", 

41 tags=["Work Pools"], 

42) 

43 

44 

45# ----------------------------------------------------- 

46# -- 

47# -- 

48# -- Utility functions & dependencies 

49# -- 

50# -- 

51# ----------------------------------------------------- 

52 

53 

54class WorkerLookups: 1a

55 async def _get_work_pool_id_from_name( 1a

56 self, session: AsyncSession, work_pool_name: str 

57 ) -> UUID: 

58 """ 

59 Given a work pool name, return its ID. Used for translating 

60 user-facing APIs (which are name-based) to internal ones (which are 

61 id-based). 

62 """ 

63 work_pool = await models.workers.read_work_pool_by_name( 

64 session=session, 

65 work_pool_name=work_pool_name, 

66 ) 

67 if not work_pool: 

68 raise HTTPException( 

69 status_code=status.HTTP_404_NOT_FOUND, 

70 detail=f'Work pool "{work_pool_name}" not found.', 

71 ) 

72 

73 return work_pool.id 

74 

75 async def _get_default_work_queue_id_from_work_pool_name( 1a

76 self, session: AsyncSession, work_pool_name: str 

77 ): 

78 """ 

79 Given a work pool name, return the ID of its default queue. 

80 Used for translating user-facing APIs (which are name-based) 

81 to internal ones (which are id-based). 

82 """ 

83 work_pool = await models.workers.read_work_pool_by_name( 

84 session=session, 

85 work_pool_name=work_pool_name, 

86 ) 

87 if not work_pool: 

88 raise HTTPException( 

89 status_code=status.HTTP_404_NOT_FOUND, 

90 detail=f'Work pool "{work_pool_name}" not found.', 

91 ) 

92 

93 return work_pool.default_queue_id 

94 

95 async def _get_work_queue_from_name( 1a

96 self, 

97 session: AsyncSession, 

98 work_pool_name: str, 

99 work_queue_name: str, 

100 create_queue_if_not_found: bool = False, 

101 ) -> "ORMWorkQueue": 

102 """ 

103 Given a work pool name and work pool queue name, return the ID of the 

104 queue. Used for translating user-facing APIs (which are name-based) to 

105 internal ones (which are id-based). 

106 """ 

107 work_queue = await models.workers.read_work_queue_by_name( 

108 session=session, 

109 work_pool_name=work_pool_name, 

110 work_queue_name=work_queue_name, 

111 ) 

112 if not work_queue: 

113 if not create_queue_if_not_found: 

114 raise HTTPException( 

115 status_code=status.HTTP_404_NOT_FOUND, 

116 detail=( 

117 f"Work pool queue '{work_pool_name}/{work_queue_name}' not" 

118 " found." 

119 ), 

120 ) 

121 work_pool_id = await self._get_work_pool_id_from_name( 

122 session=session, work_pool_name=work_pool_name 

123 ) 

124 work_queue = await models.workers.create_work_queue( 

125 session=session, 

126 work_pool_id=work_pool_id, 

127 work_queue=schemas.actions.WorkQueueCreate(name=work_queue_name), 

128 ) 

129 

130 return work_queue 

131 

132 async def _get_work_queue_id_from_name( 1a

133 self, 

134 session: AsyncSession, 

135 work_pool_name: str, 

136 work_queue_name: str, 

137 create_queue_if_not_found: bool = False, 

138 ) -> UUID: 

139 queue = await self._get_work_queue_from_name( 

140 session=session, 

141 work_pool_name=work_pool_name, 

142 work_queue_name=work_queue_name, 

143 create_queue_if_not_found=create_queue_if_not_found, 

144 ) 

145 return queue.id 

146 

147 

148# ----------------------------------------------------- 

149# -- 

150# -- 

151# -- Worker Pools 

152# -- 

153# -- 

154# ----------------------------------------------------- 

155 

156 

157@router.post("/", status_code=status.HTTP_201_CREATED) 1a

158async def create_work_pool( 1a

159 work_pool: schemas.actions.WorkPoolCreate, 

160 db: PrefectDBInterface = Depends(provide_database_interface), 

161 prefect_client_version: Optional[str] = Depends( 

162 dependencies.get_prefect_client_version 

163 ), 

164) -> schemas.core.WorkPool: 

165 """ 

166 Creates a new work pool. If a work pool with the same 

167 name already exists, an error will be raised. 

168 

169 For more information, see https://docs.prefect.io/v3/concepts/work-pools. 

170 """ 

171 if work_pool.name.lower().startswith("prefect"): 

172 raise HTTPException( 

173 status_code=status.HTTP_403_FORBIDDEN, 

174 detail="Work pools starting with 'Prefect' are reserved for internal use.", 

175 ) 

176 

177 try: 

178 async with db.session_context(begin_transaction=True) as session: 

179 await validate_job_variable_defaults_for_work_pool( 

180 session, work_pool.name, work_pool.base_job_template 

181 ) 

182 model = await models.workers.create_work_pool( 

183 session=session, work_pool=work_pool 

184 ) 

185 

186 await emit_work_pool_status_event( 

187 event_id=uuid7(), 

188 occurred=now("UTC"), 

189 pre_update_work_pool=None, 

190 work_pool=model, 

191 ) 

192 

193 ret = schemas.core.WorkPool.model_validate(model, from_attributes=True) 

194 if prefect_client_version and Version(prefect_client_version) <= Version( 

195 "3.3.7" 

196 ): 

197 # Client versions 3.3.7 and below do not support the default_result_storage_block_id field and will error 

198 # when receiving it. 

199 del ret.storage_configuration.default_result_storage_block_id 

200 return ret 

201 

202 except sa.exc.IntegrityError: 

203 raise HTTPException( 

204 status_code=status.HTTP_409_CONFLICT, 

205 detail="A work pool with this name already exists.", 

206 ) 

207 

208 

209@router.get("/{name}") 1a

210async def read_work_pool( 1a

211 work_pool_name: str = Path(..., description="The work pool name", alias="name"), 

212 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

213 db: PrefectDBInterface = Depends(provide_database_interface), 

214 prefect_client_version: Optional[str] = Depends( 

215 dependencies.get_prefect_client_version 

216 ), 

217) -> schemas.core.WorkPool: 

218 """ 

219 Read a work pool by name 

220 """ 

221 

222 async with db.session_context() as session: 

223 work_pool_id = await worker_lookups._get_work_pool_id_from_name( 

224 session=session, work_pool_name=work_pool_name 

225 ) 

226 orm_work_pool = await models.workers.read_work_pool( 

227 session=session, work_pool_id=work_pool_id 

228 ) 

229 work_pool = schemas.core.WorkPool.model_validate( 

230 orm_work_pool, from_attributes=True 

231 ) 

232 

233 if prefect_client_version and Version(prefect_client_version) <= Version( 

234 "3.3.7" 

235 ): 

236 # Client versions 3.3.7 and below do not support the default_result_storage_block_id field and will error 

237 # when receiving it. 

238 del work_pool.storage_configuration.default_result_storage_block_id 

239 

240 return work_pool 

241 

242 

243@router.post("/filter") 1a

244async def read_work_pools( 1a

245 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

246 limit: int = dependencies.LimitBody(), 

247 offset: int = Body(0, ge=0), 

248 db: PrefectDBInterface = Depends(provide_database_interface), 

249 prefect_client_version: Optional[str] = Depends( 

250 dependencies.get_prefect_client_version 

251 ), 

252) -> List[schemas.core.WorkPool]: 

253 """ 

254 Read multiple work pools 

255 """ 

256 async with db.session_context() as session: 

257 orm_work_pools = await models.workers.read_work_pools( 

258 session=session, 

259 work_pool_filter=work_pools, 

260 offset=offset, 

261 limit=limit, 

262 ) 

263 ret = [ 

264 schemas.core.WorkPool.model_validate(w, from_attributes=True) 

265 for w in orm_work_pools 

266 ] 

267 if prefect_client_version and Version(prefect_client_version) <= Version( 

268 "3.3.7" 

269 ): 

270 # Client versions 3.3.7 and below do not support the default_result_storage_block_id field and will error 

271 # when receiving it. 

272 for work_pool in ret: 

273 del work_pool.storage_configuration.default_result_storage_block_id 

274 return ret 

275 

276 

277@router.post("/count") 1a

278async def count_work_pools( 1a

279 work_pools: Optional[schemas.filters.WorkPoolFilter] = Body(None, embed=True), 

280 db: PrefectDBInterface = Depends(provide_database_interface), 

281) -> int: 

282 """ 

283 Count work pools 

284 """ 

285 async with db.session_context() as session: 

286 return await models.workers.count_work_pools( 

287 session=session, work_pool_filter=work_pools 

288 ) 

289 

290 

291@router.patch("/{name}", status_code=status.HTTP_204_NO_CONTENT) 1a

292async def update_work_pool( 1a

293 work_pool: schemas.actions.WorkPoolUpdate, 

294 work_pool_name: str = Path(..., description="The work pool name", alias="name"), 

295 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

296 db: PrefectDBInterface = Depends(provide_database_interface), 

297) -> None: 

298 """ 

299 Update a work pool 

300 """ 

301 

302 # Reserved pools can only updated pause / concurrency 

303 update_values = work_pool.model_dump(exclude_unset=True) 

304 if work_pool_name.lower().startswith("prefect") and ( 

305 set(update_values).difference({"is_paused", "concurrency_limit"}) 

306 ): 

307 raise HTTPException( 

308 status_code=status.HTTP_403_FORBIDDEN, 

309 detail=( 

310 "Work pools starting with 'Prefect' are reserved for internal use " 

311 "and can only be updated to set concurrency limits or pause." 

312 ), 

313 ) 

314 

315 async with db.session_context(begin_transaction=True) as session: 

316 work_pool_id = await worker_lookups._get_work_pool_id_from_name( 

317 session=session, work_pool_name=work_pool_name 

318 ) 

319 await models.workers.update_work_pool( 

320 session=session, 

321 work_pool_id=work_pool_id, 

322 work_pool=work_pool, 

323 emit_status_change=emit_work_pool_status_event, 

324 ) 

325 

326 

327@router.delete("/{name}", status_code=status.HTTP_204_NO_CONTENT) 1a

328async def delete_work_pool( 1a

329 work_pool_name: str = Path(..., description="The work pool name", alias="name"), 

330 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

331 db: PrefectDBInterface = Depends(provide_database_interface), 

332) -> None: 

333 """ 

334 Delete a work pool 

335 """ 

336 

337 if work_pool_name.lower().startswith("prefect"): 

338 raise HTTPException( 

339 status_code=status.HTTP_403_FORBIDDEN, 

340 detail=( 

341 "Work pools starting with 'Prefect' are reserved for internal use and" 

342 " can not be deleted." 

343 ), 

344 ) 

345 

346 async with db.session_context(begin_transaction=True) as session: 

347 work_pool_id = await worker_lookups._get_work_pool_id_from_name( 

348 session=session, work_pool_name=work_pool_name 

349 ) 

350 

351 await models.workers.delete_work_pool( 

352 session=session, work_pool_id=work_pool_id 

353 ) 

354 

355 

356@router.post("/{name}/get_scheduled_flow_runs") 1a

357async def get_scheduled_flow_runs( 1a

358 docket: dependencies.Docket, 

359 work_pool_name: str = Path(..., description="The work pool name", alias="name"), 

360 work_queue_names: List[str] = Body( 

361 None, description="The names of work pool queues" 

362 ), 

363 scheduled_before: DateTime = Body( 

364 None, description="The maximum time to look for scheduled flow runs" 

365 ), 

366 scheduled_after: DateTime = Body( 

367 None, description="The minimum time to look for scheduled flow runs" 

368 ), 

369 limit: int = dependencies.LimitBody(), 

370 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

371 db: PrefectDBInterface = Depends(provide_database_interface), 

372) -> List[schemas.responses.WorkerFlowRunResponse]: 

373 """ 

374 Load scheduled runs for a worker 

375 """ 

376 async with db.session_context() as session: 

377 work_pool_id = await worker_lookups._get_work_pool_id_from_name( 

378 session=session, work_pool_name=work_pool_name 

379 ) 

380 

381 if not work_queue_names: 

382 work_queues = list( 

383 await models.workers.read_work_queues( 

384 session=session, work_pool_id=work_pool_id 

385 ) 

386 ) 

387 # None here instructs get_scheduled_flow_runs to use the default behavior 

388 # of just operating on all work queues of the pool 

389 work_queue_ids = None 

390 else: 

391 work_queues = [ 

392 await worker_lookups._get_work_queue_from_name( 

393 session=session, 

394 work_pool_name=work_pool_name, 

395 work_queue_name=name, 

396 ) 

397 for name in work_queue_names 

398 ] 

399 work_queue_ids = [wq.id for wq in work_queues] 

400 

401 async with db.session_context(begin_transaction=True) as session: 

402 queue_response = await models.workers.get_scheduled_flow_runs( 

403 session=session, 

404 work_pool_ids=[work_pool_id], 

405 work_queue_ids=work_queue_ids, 

406 scheduled_before=scheduled_before, 

407 scheduled_after=scheduled_after, 

408 limit=limit, 

409 ) 

410 

411 await docket.add(mark_work_queues_ready)( 

412 polled_work_queue_ids=[ 

413 wq.id for wq in work_queues if wq.status != WorkQueueStatus.NOT_READY 

414 ], 

415 ready_work_queue_ids=[ 

416 wq.id for wq in work_queues if wq.status == WorkQueueStatus.NOT_READY 

417 ], 

418 ) 

419 

420 await docket.add(mark_deployments_ready)( 

421 work_queue_ids=[wq.id for wq in work_queues], 

422 ) 

423 

424 return queue_response 

425 

426 

427# ----------------------------------------------------- 

428# -- 

429# -- 

430# -- Work Pool Queues 

431# -- 

432# -- 

433# ----------------------------------------------------- 

434 

435 

436@router.post("/{work_pool_name}/queues", status_code=status.HTTP_201_CREATED) 1a

437async def create_work_queue( 1a

438 work_queue: schemas.actions.WorkQueueCreate, 

439 work_pool_name: str = Path(..., description="The work pool name"), 

440 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

441 db: PrefectDBInterface = Depends(provide_database_interface), 

442) -> schemas.responses.WorkQueueResponse: 

443 """ 

444 Creates a new work pool queue. If a work pool queue with the same 

445 name already exists, an error will be raised. 

446 

447 For more information, see https://docs.prefect.io/v3/concepts/work-pools#work-queues. 

448 """ 

449 

450 try: 

451 async with db.session_context(begin_transaction=True) as session: 

452 work_pool_id = await worker_lookups._get_work_pool_id_from_name( 

453 session=session, 

454 work_pool_name=work_pool_name, 

455 ) 

456 

457 model = await models.workers.create_work_queue( 

458 session=session, 

459 work_pool_id=work_pool_id, 

460 work_queue=work_queue, 

461 ) 

462 except sa.exc.IntegrityError: 

463 raise HTTPException( 

464 status_code=status.HTTP_409_CONFLICT, 

465 detail=( 

466 "A work queue with this name already exists in work pool" 

467 " {work_pool_name!r}." 

468 ), 

469 ) 

470 

471 return schemas.responses.WorkQueueResponse.model_validate( 

472 model, from_attributes=True 

473 ) 

474 

475 

476@router.get("/{work_pool_name}/queues/{name}") 1a

477async def read_work_queue( 1a

478 work_pool_name: str = Path(..., description="The work pool name"), 

479 work_queue_name: str = Path( 

480 ..., description="The work pool queue name", alias="name" 

481 ), 

482 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

483 db: PrefectDBInterface = Depends(provide_database_interface), 

484) -> schemas.responses.WorkQueueResponse: 

485 """ 

486 Read a work pool queue 

487 """ 

488 

489 async with db.session_context(begin_transaction=True) as session: 

490 work_queue_id = await worker_lookups._get_work_queue_id_from_name( 

491 session=session, 

492 work_pool_name=work_pool_name, 

493 work_queue_name=work_queue_name, 

494 ) 

495 

496 model = await models.workers.read_work_queue( 

497 session=session, work_queue_id=work_queue_id 

498 ) 

499 

500 return schemas.responses.WorkQueueResponse.model_validate( 

501 model, from_attributes=True 

502 ) 

503 

504 

505@router.post("/{work_pool_name}/queues/filter") 1a

506async def read_work_queues( 1a

507 work_pool_name: str = Path(..., description="The work pool name"), 

508 work_queues: schemas.filters.WorkQueueFilter = None, 

509 limit: int = dependencies.LimitBody(), 

510 offset: int = Body(0, ge=0), 

511 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

512 db: PrefectDBInterface = Depends(provide_database_interface), 

513) -> List[schemas.responses.WorkQueueResponse]: 

514 """ 

515 Read all work pool queues 

516 """ 

517 async with db.session_context() as session: 

518 work_pool_id = await worker_lookups._get_work_pool_id_from_name( 

519 session=session, 

520 work_pool_name=work_pool_name, 

521 ) 

522 wqs = await models.workers.read_work_queues( 

523 session=session, 

524 work_pool_id=work_pool_id, 

525 work_queue_filter=work_queues, 

526 limit=limit, 

527 offset=offset, 

528 ) 

529 

530 return [ 

531 schemas.responses.WorkQueueResponse.model_validate(wq, from_attributes=True) 

532 for wq in wqs 

533 ] 

534 

535 

536@router.patch("/{work_pool_name}/queues/{name}", status_code=status.HTTP_204_NO_CONTENT) 1a

537async def update_work_queue( 1a

538 work_queue: schemas.actions.WorkQueueUpdate, 

539 work_pool_name: str = Path(..., description="The work pool name"), 

540 work_queue_name: str = Path( 

541 ..., description="The work pool queue name", alias="name" 

542 ), 

543 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

544 db: PrefectDBInterface = Depends(provide_database_interface), 

545) -> None: 

546 """ 

547 Update a work pool queue 

548 """ 

549 

550 async with db.session_context(begin_transaction=True) as session: 

551 work_queue_id = await worker_lookups._get_work_queue_id_from_name( 

552 work_pool_name=work_pool_name, 

553 work_queue_name=work_queue_name, 

554 session=session, 

555 ) 

556 

557 await models.workers.update_work_queue( 

558 session=session, 

559 work_queue_id=work_queue_id, 

560 work_queue=work_queue, 

561 emit_status_change=emit_work_queue_status_event, 

562 ) 

563 

564 

565@router.delete( 1a

566 "/{work_pool_name}/queues/{name}", status_code=status.HTTP_204_NO_CONTENT 

567) 

568async def delete_work_queue( 1a

569 work_pool_name: str = Path(..., description="The work pool name"), 

570 work_queue_name: str = Path( 

571 ..., description="The work pool queue name", alias="name" 

572 ), 

573 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

574 db: PrefectDBInterface = Depends(provide_database_interface), 

575) -> None: 

576 """ 

577 Delete a work pool queue 

578 """ 

579 

580 async with db.session_context(begin_transaction=True) as session: 

581 work_queue_id = await worker_lookups._get_work_queue_id_from_name( 

582 session=session, 

583 work_pool_name=work_pool_name, 

584 work_queue_name=work_queue_name, 

585 ) 

586 

587 await models.workers.delete_work_queue( 

588 session=session, work_queue_id=work_queue_id 

589 ) 

590 

591 

592# ----------------------------------------------------- 

593# -- 

594# -- 

595# -- Workers 

596# -- 

597# -- 

598# ----------------------------------------------------- 

599 

600 

601@router.post( 1a

602 "/{work_pool_name}/workers/heartbeat", 

603 status_code=status.HTTP_204_NO_CONTENT, 

604) 

605async def worker_heartbeat( 1a

606 work_pool_name: str = Path(..., description="The work pool name"), 

607 name: str = Body(..., description="The worker process name", embed=True), 

608 heartbeat_interval_seconds: Optional[int] = Body( 

609 None, description="The worker's heartbeat interval in seconds", embed=True 

610 ), 

611 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

612 db: PrefectDBInterface = Depends(provide_database_interface), 

613) -> None: 

614 async with db.session_context(begin_transaction=True) as session: 

615 work_pool = await models.workers.read_work_pool_by_name( 

616 session=session, 

617 work_pool_name=work_pool_name, 

618 ) 

619 if not work_pool: 

620 raise HTTPException( 

621 status_code=status.HTTP_404_NOT_FOUND, 

622 detail=f'Work pool "{work_pool_name}" not found.', 

623 ) 

624 

625 await models.workers.worker_heartbeat( 

626 session=session, 

627 work_pool_id=work_pool.id, 

628 worker_name=name, 

629 heartbeat_interval_seconds=heartbeat_interval_seconds, 

630 ) 

631 

632 if work_pool.status == schemas.statuses.WorkPoolStatus.NOT_READY: 

633 await models.workers.update_work_pool( 

634 session=session, 

635 work_pool_id=work_pool.id, 

636 work_pool=schemas.internal.InternalWorkPoolUpdate( 

637 status=schemas.statuses.WorkPoolStatus.READY 

638 ), 

639 emit_status_change=emit_work_pool_status_event, 

640 ) 

641 

642 

643@router.post("/{work_pool_name}/workers/filter") 1a

644async def read_workers( 1a

645 work_pool_name: str = Path(..., description="The work pool name"), 

646 workers: Optional[schemas.filters.WorkerFilter] = None, 

647 limit: int = dependencies.LimitBody(), 

648 offset: int = Body(0, ge=0), 

649 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

650 db: PrefectDBInterface = Depends(provide_database_interface), 

651) -> List[schemas.responses.WorkerResponse]: 

652 """ 

653 Read all worker processes 

654 """ 

655 async with db.session_context() as session: 

656 work_pool_id = await worker_lookups._get_work_pool_id_from_name( 

657 session=session, work_pool_name=work_pool_name 

658 ) 

659 return await models.workers.read_workers( 

660 session=session, 

661 work_pool_id=work_pool_id, 

662 worker_filter=workers, 

663 limit=limit, 

664 offset=offset, 

665 ) 

666 

667 

668@router.delete( 1a

669 "/{work_pool_name}/workers/{name}", status_code=status.HTTP_204_NO_CONTENT 

670) 

671async def delete_worker( 1a

672 work_pool_name: str = Path(..., description="The work pool name"), 

673 worker_name: str = Path( 

674 ..., description="The work pool's worker name", alias="name" 

675 ), 

676 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

677 db: PrefectDBInterface = Depends(provide_database_interface), 

678) -> None: 

679 """ 

680 Delete a work pool's worker 

681 """ 

682 

683 async with db.session_context(begin_transaction=True) as session: 

684 work_pool_id = await worker_lookups._get_work_pool_id_from_name( 

685 session=session, work_pool_name=work_pool_name 

686 ) 

687 deleted = await models.workers.delete_worker( 

688 session=session, work_pool_id=work_pool_id, worker_name=worker_name 

689 ) 

690 if not deleted: 

691 raise HTTPException( 

692 status_code=status.HTTP_404_NOT_FOUND, detail="Worker not found." 

693 )