Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/__init__.py: 16%

612 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2import asyncio 1a

3import base64 1a

4import datetime 1a

5import ssl 1a

6from collections.abc import Iterable 1a

7from contextlib import AsyncExitStack 1a

8from logging import Logger 1a

9from typing import TYPE_CHECKING, Any, Literal, NoReturn, Optional, Union, overload 1a

10from uuid import UUID 1a

11 

12import certifi 1a

13import httpcore 1a

14import httpx 1a

15 

16import pydantic 1a

17from asgi_lifespan import LifespanManager 1a

18from packaging import version 1a

19from starlette import status 1a

20from typing_extensions import ParamSpec, Self, TypeVar 1a

21 

22from prefect.client.orchestration._artifacts.client import ( 1a

23 ArtifactClient, 

24 ArtifactAsyncClient, 

25 ArtifactCollectionClient, 

26 ArtifactCollectionAsyncClient, 

27) 

28 

29from prefect.client.orchestration._concurrency_limits.client import ( 1a

30 ConcurrencyLimitAsyncClient, 

31 ConcurrencyLimitClient, 

32) 

33 

34from prefect.client.orchestration._logs.client import ( 1a

35 LogClient, 

36 LogAsyncClient, 

37) 

38from prefect.client.orchestration._variables.client import ( 1a

39 VariableClient, 

40 VariableAsyncClient, 

41) 

42 

43from prefect.client.orchestration._deployments.client import ( 1a

44 DeploymentClient, 

45 DeploymentAsyncClient, 

46) 

47from prefect.client.orchestration._automations.client import ( 1a

48 AutomationClient, 

49 AutomationAsyncClient, 

50) 

51 

52from prefect.client.orchestration._work_pools.client import ( 1a

53 WorkPoolClient, 

54 WorkPoolAsyncClient, 

55) 

56 

57from prefect._experimental.sla.client import SlaClient, SlaAsyncClient 1a

58 

59from prefect.client.orchestration._flows.client import ( 1a

60 FlowClient, 

61 FlowAsyncClient, 

62) 

63from prefect.client.orchestration._flow_runs.client import ( 1a

64 FlowRunClient, 

65 FlowRunAsyncClient, 

66) 

67 

68from prefect.client.orchestration._blocks_documents.client import ( 1a

69 BlocksDocumentClient, 

70 BlocksDocumentAsyncClient, 

71) 

72 

73from prefect.client.orchestration._blocks_schemas.client import ( 1a

74 BlocksSchemaClient, 

75 BlocksSchemaAsyncClient, 

76) 

77 

78from prefect.client.orchestration._blocks_types.client import ( 1a

79 BlocksTypeClient, 

80 BlocksTypeAsyncClient, 

81) 

82 

83from prefect.client.orchestration._events.client import ( 1a

84 EventClient, 

85 EventAsyncClient, 

86) 

87 

88import prefect 1a

89import prefect.exceptions 1a

90from prefect.logging.loggers import get_run_logger 1a

91import prefect.states 1a

92from prefect.client.constants import SERVER_API_VERSION 1a

93from prefect.client.schemas import FlowRun, OrchestrationResult, TaskRun 1a

94from prefect.client.schemas.actions import ( 1a

95 TaskRunCreate, 

96 TaskRunUpdate, 

97 WorkQueueCreate, 

98 WorkQueueUpdate, 

99) 

100from prefect.client.schemas.filters import ( 1a

101 DeploymentFilter, 

102 FlowFilter, 

103 FlowRunFilter, 

104 TaskRunFilter, 

105 WorkQueueFilter, 

106 WorkQueueFilterName, 

107) 

108from prefect.client.schemas.objects import ( 1a

109 TaskRunResult, 

110 FlowRunResult, 

111 Parameter, 

112 Constant, 

113 TaskRunPolicy, 

114 WorkQueue, 

115 WorkQueueStatusDetail, 

116) 

117 

118from prefect.client.schemas.sorting import ( 1a

119 TaskRunSort, 

120) 

121from prefect.logging import get_logger 1a

122from prefect.settings import ( 1a

123 PREFECT_API_AUTH_STRING, 

124 PREFECT_API_DATABASE_CONNECTION_URL, 

125 PREFECT_API_ENABLE_HTTP2, 

126 PREFECT_API_KEY, 

127 PREFECT_API_REQUEST_TIMEOUT, 

128 PREFECT_API_SSL_CERT_FILE, 

129 PREFECT_API_TLS_INSECURE_SKIP_VERIFY, 

130 PREFECT_API_URL, 

131 PREFECT_CLIENT_CSRF_SUPPORT_ENABLED, 

132 PREFECT_CLOUD_API_URL, 

133 PREFECT_SERVER_ALLOW_EPHEMERAL_MODE, 

134 PREFECT_TESTING_UNIT_TEST_MODE, 

135) 

136from prefect.types._datetime import now 1a

137 

138if TYPE_CHECKING: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true1a

139 from prefect.tasks import Task as TaskObject 

140 

141from prefect.client.base import ( 1a

142 ASGIApp, 

143 PrefectHttpxAsyncClient, 

144 PrefectHttpxSyncClient, 

145 ServerType, 

146 app_lifespan_context, 

147) 

148 

149P = ParamSpec("P") 1a

150R = TypeVar("R", infer_variance=True) 1a

151T = TypeVar("T") 1a

152 

153# Cache for TypeAdapter instances to avoid repeated instantiation 

154_TYPE_ADAPTER_CACHE: dict[type, pydantic.TypeAdapter[Any]] = {} 1a

155 

156 

157def _get_type_adapter(type_: type) -> pydantic.TypeAdapter[Any]: 1a

158 """Get or create a cached TypeAdapter for the given type.""" 

159 if type_ not in _TYPE_ADAPTER_CACHE: 

160 _TYPE_ADAPTER_CACHE[type_] = pydantic.TypeAdapter(type_) 

161 return _TYPE_ADAPTER_CACHE[type_] 

162 

163 

164@overload 1a

165def get_client( 165 ↛ exitline 165 didn't return from function 'get_client' because 1a

166 httpx_settings: Optional[dict[str, Any]], 

167 sync_client: Literal[False], 

168) -> "PrefectClient": ... 

169 

170 

171@overload 1a

172def get_client(*, httpx_settings: Optional[dict[str, Any]]) -> "PrefectClient": ... 172 ↛ exitline 172 didn't return from function 'get_client' because 1a

173 

174 

175@overload 1a

176def get_client(*, sync_client: Literal[False] = False) -> "PrefectClient": ... 176 ↛ exitline 176 didn't return from function 'get_client' because 1a

177 

178 

179@overload 1a

180def get_client( 180 ↛ exitline 180 didn't return from function 'get_client' because 1a

181 httpx_settings: Optional[dict[str, Any]], sync_client: Literal[True] 

182) -> "SyncPrefectClient": ... 

183 

184 

185@overload 1a

186def get_client(*, sync_client: Literal[True]) -> "SyncPrefectClient": ... 186 ↛ exitline 186 didn't return from function 'get_client' because 1a

187 

188 

189def get_client( 1a

190 httpx_settings: Optional[dict[str, Any]] = None, sync_client: bool = False 

191) -> Union["SyncPrefectClient", "PrefectClient"]: 

192 """ 

193 Retrieve a HTTP client for communicating with the Prefect REST API. 

194 

195 The client must be context managed; for example: 

196 

197 ```python 

198 async with get_client() as client: 

199 await client.hello() 

200 ``` 

201 

202 To return a synchronous client, pass sync_client=True: 

203 

204 ```python 

205 with get_client(sync_client=True) as client: 

206 client.hello() 

207 ``` 

208 """ 

209 import prefect.context 

210 

211 # try to load clients from a client context, if possible 

212 # only load clients that match the provided config / loop 

213 try: 

214 loop = asyncio.get_running_loop() 

215 except RuntimeError: 

216 loop = None 

217 

218 if sync_client: 

219 if client_ctx := prefect.context.SyncClientContext.get(): 

220 if ( 

221 client_ctx.client 

222 and getattr(client_ctx, "_httpx_settings", None) == httpx_settings 

223 ): 

224 return client_ctx.client 

225 else: 

226 if client_ctx := prefect.context.AsyncClientContext.get(): 

227 if ( 

228 client_ctx.client 

229 and getattr(client_ctx, "_httpx_settings", None) == httpx_settings 

230 and loop in (getattr(client_ctx.client, "_loop", None), None) 

231 ): 

232 return client_ctx.client 

233 

234 api: str = PREFECT_API_URL.value() 

235 server_type = None 

236 

237 if not api and PREFECT_SERVER_ALLOW_EPHEMERAL_MODE: 

238 # create an ephemeral API if none was provided 

239 from prefect.server.api.server import SubprocessASGIServer 

240 

241 server = SubprocessASGIServer() 

242 server.start() 

243 assert server.server_process is not None, "Server process did not start" 

244 

245 api = server.api_url 

246 server_type = ServerType.EPHEMERAL 

247 elif not api and not PREFECT_SERVER_ALLOW_EPHEMERAL_MODE: 

248 raise ValueError( 

249 "No Prefect API URL provided. Please set PREFECT_API_URL to the address of a running Prefect server." 

250 ) 

251 

252 if sync_client: 

253 return SyncPrefectClient( 

254 api, 

255 auth_string=PREFECT_API_AUTH_STRING.value(), 

256 api_key=PREFECT_API_KEY.value(), 

257 httpx_settings=httpx_settings, 

258 server_type=server_type, 

259 ) 

260 else: 

261 return PrefectClient( 

262 api, 

263 auth_string=PREFECT_API_AUTH_STRING.value(), 

264 api_key=PREFECT_API_KEY.value(), 

265 httpx_settings=httpx_settings, 

266 server_type=server_type, 

267 ) 

268 

269 

270class PrefectClient( 1a

271 ArtifactAsyncClient, 

272 ArtifactCollectionAsyncClient, 

273 LogAsyncClient, 

274 VariableAsyncClient, 

275 ConcurrencyLimitAsyncClient, 

276 DeploymentAsyncClient, 

277 AutomationAsyncClient, 

278 SlaAsyncClient, 

279 FlowRunAsyncClient, 

280 FlowAsyncClient, 

281 BlocksDocumentAsyncClient, 

282 BlocksSchemaAsyncClient, 

283 BlocksTypeAsyncClient, 

284 WorkPoolAsyncClient, 

285 EventAsyncClient, 

286): 

287 """ 

288 An asynchronous client for interacting with the [Prefect REST API](https://docs.prefect.io/v3/api-ref/rest-api/). 

289 

290 Args: 

291 api: the REST API URL or FastAPI application to connect to 

292 api_key: An optional API key for authentication. 

293 api_version: The API version this client is compatible with. 

294 httpx_settings: An optional dictionary of settings to pass to the underlying 

295 `httpx.AsyncClient` 

296 

297 Examples: 

298 

299 Say hello to a Prefect REST API 

300 

301 ```python 

302 async with get_client() as client: 

303 response = await client.hello() 

304 

305 print(response.json()) 

306 👋 

307 ``` 

308 """ 

309 

310 def __init__( 1a

311 self, 

312 api: Union[str, ASGIApp], 

313 *, 

314 auth_string: Optional[str] = None, 

315 api_key: Optional[str] = None, 

316 api_version: Optional[str] = None, 

317 httpx_settings: Optional[dict[str, Any]] = None, 

318 server_type: Optional[ServerType] = None, 

319 ) -> None: 

320 httpx_settings = httpx_settings.copy() if httpx_settings else {} 

321 httpx_settings.setdefault("headers", {}) 

322 

323 tls_verify = httpx_settings.get("verify") 

324 if not tls_verify or not isinstance(tls_verify, ssl.SSLContext): 

325 if PREFECT_API_TLS_INSECURE_SKIP_VERIFY: 

326 # Create an unverified context for insecure connections 

327 ctx = ssl.create_default_context() 

328 ctx.check_hostname = False 

329 ctx.verify_mode = ssl.CERT_NONE 

330 httpx_settings.setdefault("verify", ctx) 

331 else: 

332 cert_file = PREFECT_API_SSL_CERT_FILE.value() 

333 if not cert_file: 

334 cert_file = certifi.where() 

335 # Create a verified context with the certificate file 

336 ctx = ssl.create_default_context(cafile=cert_file) 

337 httpx_settings.setdefault("verify", ctx) 

338 

339 if api_version is None: 

340 api_version = SERVER_API_VERSION 

341 httpx_settings["headers"].setdefault("X-PREFECT-API-VERSION", api_version) 

342 # Prioritize auth_string if provided, otherwise use api_key 

343 if auth_string: 

344 token = base64.b64encode(auth_string.encode("utf-8")).decode("utf-8") 

345 httpx_settings["headers"]["Authorization"] = ( 

346 f"Basic {token}" # Overwrite if exists 

347 ) 

348 elif api_key: 

349 httpx_settings["headers"]["Authorization"] = ( 

350 f"Bearer {api_key}" # Set if auth_string is not present 

351 ) 

352 

353 # Context management 

354 self._context_stack: int = 0 

355 self._exit_stack = AsyncExitStack() 

356 self._ephemeral_app: Optional[ASGIApp] = None 

357 self.manage_lifespan = True 

358 self.server_type: ServerType 

359 

360 # Only set if this client started the lifespan of the application 

361 self._ephemeral_lifespan: Optional[LifespanManager] = None 

362 

363 self._closed = False 

364 self._started = False 

365 

366 # Connect to an external application 

367 if isinstance(api, str): 

368 if httpx_settings.get("app"): 

369 raise ValueError( 

370 "Invalid httpx settings: `app` cannot be set when providing an " 

371 "api url. `app` is only for use with ephemeral instances. Provide " 

372 "it as the `api` parameter instead." 

373 ) 

374 httpx_settings.setdefault("base_url", api) 

375 

376 # See https://www.python-httpx.org/advanced/#pool-limit-configuration 

377 httpx_settings.setdefault( 

378 "limits", 

379 httpx.Limits( 

380 # We see instability when allowing the client to open many connections at once. 

381 # Limiting concurrency results in more stable performance. 

382 max_connections=16, 

383 max_keepalive_connections=8, 

384 # The Prefect Cloud LB will keep connections alive for 30s. 

385 # Only allow the client to keep connections alive for 25s. 

386 keepalive_expiry=25, 

387 ), 

388 ) 

389 

390 # See https://www.python-httpx.org/http2/ 

391 # Enabling HTTP/2 support on the client does not necessarily mean that your requests 

392 # and responses will be transported over HTTP/2, since both the client and the server 

393 # need to support HTTP/2. If you connect to a server that only supports HTTP/1.1 the 

394 # client will use a standard HTTP/1.1 connection instead. 

395 httpx_settings.setdefault("http2", PREFECT_API_ENABLE_HTTP2.value()) 

396 

397 if server_type: 

398 self.server_type = server_type 

399 else: 

400 self.server_type = ( 

401 ServerType.CLOUD 

402 if api.startswith(PREFECT_CLOUD_API_URL.value()) 

403 else ServerType.SERVER 

404 ) 

405 

406 # Connect to an in-process application 

407 else: 

408 self._ephemeral_app = api 

409 self.server_type = ServerType.EPHEMERAL 

410 

411 # When using an ephemeral server, server-side exceptions can be raised 

412 # client-side breaking all of our response error code handling. To work 

413 # around this, we create an ASGI transport with application exceptions 

414 # disabled instead of using the application directly. 

415 # refs: 

416 # - https://github.com/PrefectHQ/prefect/pull/9637 

417 # - https://github.com/encode/starlette/blob/d3a11205ed35f8e5a58a711db0ff59c86fa7bb31/starlette/middleware/errors.py#L184 

418 # - https://github.com/tiangolo/fastapi/blob/8cc967a7605d3883bd04ceb5d25cc94ae079612f/fastapi/applications.py#L163-L164 

419 httpx_settings.setdefault( 

420 "transport", 

421 httpx.ASGITransport( 

422 app=self._ephemeral_app, raise_app_exceptions=False 

423 ), 

424 ) 

425 httpx_settings.setdefault("base_url", "http://ephemeral-prefect/api") 

426 

427 # See https://www.python-httpx.org/advanced/#timeout-configuration 

428 httpx_settings.setdefault( 

429 "timeout", 

430 httpx.Timeout( 

431 connect=PREFECT_API_REQUEST_TIMEOUT.value(), 

432 read=PREFECT_API_REQUEST_TIMEOUT.value(), 

433 write=PREFECT_API_REQUEST_TIMEOUT.value(), 

434 pool=PREFECT_API_REQUEST_TIMEOUT.value(), 

435 ), 

436 ) 

437 

438 if not PREFECT_TESTING_UNIT_TEST_MODE: 

439 httpx_settings.setdefault("follow_redirects", True) 

440 

441 enable_csrf_support = ( 

442 self.server_type != ServerType.CLOUD 

443 and PREFECT_CLIENT_CSRF_SUPPORT_ENABLED.value() 

444 ) 

445 

446 self._client = PrefectHttpxAsyncClient( 

447 **httpx_settings, enable_csrf_support=enable_csrf_support 

448 ) 

449 self._loop = None 

450 

451 # See https://www.python-httpx.org/advanced/#custom-transports 

452 # 

453 # If we're using an HTTP/S client (not the ephemeral client), adjust the 

454 # transport to add retries _after_ it is instantiated. If we alter the transport 

455 # before instantiation, the transport will not be aware of proxies unless we 

456 # reproduce all of the logic to make it so. 

457 # 

458 # Only alter the transport to set our default of 3 retries, don't modify any 

459 # transport a user may have provided via httpx_settings. 

460 # 

461 # Making liberal use of getattr and isinstance checks here to avoid any 

462 # surprises if the internals of httpx or httpcore change on us 

463 if isinstance(api, str) and not httpx_settings.get("transport"): 

464 transport_for_url = getattr(self._client, "_transport_for_url", None) 

465 if callable(transport_for_url): 

466 server_transport = transport_for_url(httpx.URL(api)) 

467 if isinstance(server_transport, httpx.AsyncHTTPTransport): 

468 pool = getattr(server_transport, "_pool", None) 

469 if isinstance(pool, httpcore.AsyncConnectionPool): 

470 setattr(pool, "_retries", 3) 

471 

472 self.logger: Logger = get_logger("client") 

473 

474 @property 1a

475 def api_url(self) -> httpx.URL: 1a

476 """ 

477 Get the base URL for the API. 

478 """ 

479 return self._client.base_url 

480 

481 # API methods ---------------------------------------------------------------------- 

482 

483 async def api_healthcheck(self) -> Optional[Exception]: 1a

484 """ 

485 Attempts to connect to the API and returns the encountered exception if not 

486 successful. 

487 

488 If successful, returns `None`. 

489 """ 

490 try: 

491 await self._client.get("/health") 

492 return None 

493 except Exception as exc: 

494 return exc 

495 

496 async def hello(self) -> httpx.Response: 1a

497 """ 

498 Send a GET request to /hello for testing purposes. 

499 """ 

500 return await self._client.get("/hello") 

501 

502 async def create_work_queue( 1a

503 self, 

504 name: str, 

505 description: Optional[str] = None, 

506 is_paused: Optional[bool] = None, 

507 concurrency_limit: Optional[int] = None, 

508 priority: Optional[int] = None, 

509 work_pool_name: Optional[str] = None, 

510 ) -> WorkQueue: 

511 """ 

512 Create a work queue. 

513 

514 Args: 

515 name: a unique name for the work queue 

516 description: An optional description for the work queue. 

517 is_paused: Whether or not the work queue is paused. 

518 concurrency_limit: An optional concurrency limit for the work queue. 

519 priority: The queue's priority. Lower values are higher priority (1 is the highest). 

520 work_pool_name: The name of the work pool to use for this queue. 

521 

522 Raises: 

523 prefect.exceptions.ObjectAlreadyExists: If request returns 409 

524 httpx.RequestError: If request fails 

525 

526 Returns: 

527 The created work queue 

528 """ 

529 create_model = WorkQueueCreate(name=name, filter=None) 

530 if description is not None: 

531 create_model.description = description 

532 if is_paused is not None: 

533 create_model.is_paused = is_paused 

534 if concurrency_limit is not None: 

535 create_model.concurrency_limit = concurrency_limit 

536 if priority is not None: 

537 create_model.priority = priority 

538 

539 data = create_model.model_dump(mode="json") 

540 try: 

541 if work_pool_name is not None: 

542 response = await self._client.post( 

543 f"/work_pools/{work_pool_name}/queues", json=data 

544 ) 

545 else: 

546 response = await self._client.post("/work_queues/", json=data) 

547 except httpx.HTTPStatusError as e: 

548 if e.response.status_code == status.HTTP_409_CONFLICT: 

549 raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e 

550 elif e.response.status_code == status.HTTP_404_NOT_FOUND: 

551 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

552 else: 

553 raise 

554 return WorkQueue.model_validate(response.json()) 

555 

556 async def read_work_queue_by_name( 1a

557 self, 

558 name: str, 

559 work_pool_name: Optional[str] = None, 

560 ) -> WorkQueue: 

561 """ 

562 Read a work queue by name. 

563 

564 Args: 

565 name (str): a unique name for the work queue 

566 work_pool_name (str, optional): the name of the work pool 

567 the queue belongs to. 

568 

569 Raises: 

570 prefect.exceptions.ObjectNotFound: if no work queue is found 

571 httpx.HTTPStatusError: other status errors 

572 

573 Returns: 

574 WorkQueue: a work queue API object 

575 """ 

576 try: 

577 if work_pool_name is not None: 

578 response = await self._client.get( 

579 f"/work_pools/{work_pool_name}/queues/{name}" 

580 ) 

581 else: 

582 response = await self._client.get(f"/work_queues/name/{name}") 

583 except httpx.HTTPStatusError as e: 

584 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

585 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

586 else: 

587 raise 

588 

589 return WorkQueue.model_validate(response.json()) 

590 

591 async def update_work_queue(self, id: UUID, **kwargs: Any) -> None: 1a

592 """ 

593 Update properties of a work queue. 

594 

595 Args: 

596 id: the ID of the work queue to update 

597 **kwargs: the fields to update 

598 

599 Raises: 

600 ValueError: if no kwargs are provided 

601 prefect.exceptions.ObjectNotFound: if request returns 404 

602 httpx.RequestError: if the request fails 

603 

604 """ 

605 if not kwargs: 

606 raise ValueError("No fields provided to update.") 

607 

608 data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True) 

609 try: 

610 await self._client.patch(f"/work_queues/{id}", json=data) 

611 except httpx.HTTPStatusError as e: 

612 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

613 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

614 else: 

615 raise 

616 

617 async def get_runs_in_work_queue( 1a

618 self, 

619 id: UUID, 

620 limit: int = 10, 

621 scheduled_before: Optional[datetime.datetime] = None, 

622 ) -> list[FlowRun]: 

623 """ 

624 Read flow runs off a work queue. 

625 

626 Args: 

627 id: the id of the work queue to read from 

628 limit: a limit on the number of runs to return 

629 scheduled_before: a timestamp; only runs scheduled before this time will be returned. 

630 Defaults to now. 

631 

632 Raises: 

633 prefect.exceptions.ObjectNotFound: If request returns 404 

634 httpx.RequestError: If request fails 

635 

636 Returns: 

637 List[FlowRun]: a list of FlowRun objects read from the queue 

638 """ 

639 if scheduled_before is None: 

640 scheduled_before = now("UTC") 

641 

642 try: 

643 response = await self._client.post( 

644 f"/work_queues/{id}/get_runs", 

645 json={ 

646 "limit": limit, 

647 "scheduled_before": scheduled_before.isoformat(), 

648 }, 

649 ) 

650 except httpx.HTTPStatusError as e: 

651 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

652 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

653 else: 

654 raise 

655 return _get_type_adapter(list[FlowRun]).validate_python(response.json()) 

656 

657 async def read_work_queue( 1a

658 self, 

659 id: UUID, 

660 ) -> WorkQueue: 

661 """ 

662 Read a work queue. 

663 

664 Args: 

665 id: the id of the work queue to load 

666 

667 Raises: 

668 prefect.exceptions.ObjectNotFound: If request returns 404 

669 httpx.RequestError: If request fails 

670 

671 Returns: 

672 WorkQueue: an instantiated WorkQueue object 

673 """ 

674 try: 

675 response = await self._client.get(f"/work_queues/{id}") 

676 except httpx.HTTPStatusError as e: 

677 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

678 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

679 else: 

680 raise 

681 return WorkQueue.model_validate(response.json()) 

682 

683 async def read_work_queue_status( 1a

684 self, 

685 id: UUID, 

686 ) -> WorkQueueStatusDetail: 

687 """ 

688 Read a work queue status. 

689 

690 Args: 

691 id: the id of the work queue to load 

692 

693 Raises: 

694 prefect.exceptions.ObjectNotFound: If request returns 404 

695 httpx.RequestError: If request fails 

696 

697 Returns: 

698 WorkQueueStatus: an instantiated WorkQueueStatus object 

699 """ 

700 try: 

701 response = await self._client.get(f"/work_queues/{id}/status") 

702 except httpx.HTTPStatusError as e: 

703 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

704 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

705 else: 

706 raise 

707 return WorkQueueStatusDetail.model_validate(response.json()) 

708 

709 async def match_work_queues( 1a

710 self, 

711 prefixes: list[str], 

712 work_pool_name: Optional[str] = None, 

713 ) -> list[WorkQueue]: 

714 """ 

715 Query the Prefect API for work queues with names with a specific prefix. 

716 

717 Args: 

718 prefixes: a list of strings used to match work queue name prefixes 

719 work_pool_name: an optional work pool name to scope the query to 

720 

721 Returns: 

722 a list of WorkQueue model representations 

723 of the work queues 

724 """ 

725 page_length = 100 

726 current_page = 0 

727 work_queues: list[WorkQueue] = [] 

728 

729 while True: 

730 new_queues = await self.read_work_queues( 

731 work_pool_name=work_pool_name, 

732 offset=current_page * page_length, 

733 limit=page_length, 

734 work_queue_filter=WorkQueueFilter( 

735 name=WorkQueueFilterName(startswith_=prefixes) 

736 ), 

737 ) 

738 if not new_queues: 

739 break 

740 work_queues += new_queues 

741 current_page += 1 

742 

743 return work_queues 

744 

745 async def delete_work_queue_by_id( 1a

746 self, 

747 id: UUID, 

748 ) -> None: 

749 """ 

750 Delete a work queue by its ID. 

751 

752 Args: 

753 id: the id of the work queue to delete 

754 

755 Raises: 

756 prefect.exceptions.ObjectNotFound: If request returns 404 

757 httpx.RequestError: If requests fails 

758 """ 

759 try: 

760 await self._client.delete( 

761 f"/work_queues/{id}", 

762 ) 

763 except httpx.HTTPStatusError as e: 

764 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

765 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

766 else: 

767 raise 

768 

769 async def set_task_run_name(self, task_run_id: UUID, name: str) -> httpx.Response: 1a

770 task_run_data = TaskRunUpdate(name=name) 

771 return await self._client.patch( 

772 f"/task_runs/{task_run_id}", 

773 json=task_run_data.model_dump(mode="json", exclude_unset=True), 

774 ) 

775 

776 async def create_task_run( 1a

777 self, 

778 task: "TaskObject[P, R]", 

779 flow_run_id: Optional[UUID], 

780 dynamic_key: str, 

781 id: Optional[UUID] = None, 

782 name: Optional[str] = None, 

783 extra_tags: Optional[Iterable[str]] = None, 

784 state: Optional[prefect.states.State[R]] = None, 

785 task_inputs: Optional[ 

786 dict[ 

787 str, 

788 list[Union[TaskRunResult, FlowRunResult, Parameter, Constant]], 

789 ] 

790 ] = None, 

791 ) -> TaskRun: 

792 """ 

793 Create a task run 

794 

795 Args: 

796 task: The Task to run 

797 flow_run_id: The flow run id with which to associate the task run 

798 dynamic_key: A key unique to this particular run of a Task within the flow 

799 id: An optional ID for the task run. If not provided, one will be generated 

800 server-side. 

801 name: An optional name for the task run 

802 extra_tags: an optional list of extra tags to apply to the task run in 

803 addition to `task.tags` 

804 state: The initial state for the run. If not provided, defaults to 

805 `Pending` for now. Should always be a `Scheduled` type. 

806 task_inputs: the set of inputs passed to the task 

807 

808 Returns: 

809 The created task run. 

810 """ 

811 tags = set(task.tags).union(extra_tags or []) 

812 

813 if state is None: 

814 state = prefect.states.Pending() 

815 

816 retry_delay = task.retry_delay_seconds 

817 if isinstance(retry_delay, list): 

818 retry_delay = [int(rd) for rd in retry_delay] 

819 elif isinstance(retry_delay, float): 

820 retry_delay = int(retry_delay) 

821 

822 task_run_data = TaskRunCreate( 

823 id=id, 

824 name=name, 

825 flow_run_id=flow_run_id, 

826 task_key=task.task_key, 

827 dynamic_key=str(dynamic_key), 

828 tags=list(tags), 

829 task_version=task.version, 

830 empirical_policy=TaskRunPolicy( 

831 retries=task.retries, 

832 retry_delay=retry_delay, 

833 retry_jitter_factor=task.retry_jitter_factor, 

834 ), 

835 state=prefect.states.to_state_create(state), 

836 task_inputs=task_inputs or {}, 

837 ) 

838 content = task_run_data.model_dump_json(exclude={"id"} if id is None else None) 

839 

840 response = await self._client.post("/task_runs/", content=content) 

841 return TaskRun.model_validate(response.json()) 

842 

843 async def read_task_run(self, task_run_id: UUID) -> TaskRun: 1a

844 """ 

845 Query the Prefect API for a task run by id. 

846 

847 Args: 

848 task_run_id: the task run ID of interest 

849 

850 Returns: 

851 a Task Run model representation of the task run 

852 """ 

853 try: 

854 response = await self._client.get(f"/task_runs/{task_run_id}") 

855 return TaskRun.model_validate(response.json()) 

856 except httpx.HTTPStatusError as e: 

857 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

858 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

859 else: 

860 raise 

861 

862 async def read_task_runs( 1a

863 self, 

864 *, 

865 flow_filter: Optional[FlowFilter] = None, 

866 flow_run_filter: Optional[FlowRunFilter] = None, 

867 task_run_filter: Optional[TaskRunFilter] = None, 

868 deployment_filter: Optional[DeploymentFilter] = None, 

869 sort: Optional[TaskRunSort] = None, 

870 limit: Optional[int] = None, 

871 offset: int = 0, 

872 ) -> list[TaskRun]: 

873 """ 

874 Query the Prefect API for task runs. Only task runs matching all criteria will 

875 be returned. 

876 

877 Args: 

878 flow_filter: filter criteria for flows 

879 flow_run_filter: filter criteria for flow runs 

880 task_run_filter: filter criteria for task runs 

881 deployment_filter: filter criteria for deployments 

882 sort: sort criteria for the task runs 

883 limit: a limit for the task run query 

884 offset: an offset for the task run query 

885 

886 Returns: 

887 a list of Task Run model representations 

888 of the task runs 

889 """ 

890 body: dict[str, Any] = { 

891 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

892 "flow_runs": ( 

893 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

894 if flow_run_filter 

895 else None 

896 ), 

897 "task_runs": ( 

898 task_run_filter.model_dump(mode="json") if task_run_filter else None 

899 ), 

900 "deployments": ( 

901 deployment_filter.model_dump(mode="json") if deployment_filter else None 

902 ), 

903 "sort": sort, 

904 "limit": limit, 

905 "offset": offset, 

906 } 

907 response = await self._client.post("/task_runs/filter", json=body) 

908 return _get_type_adapter(list[TaskRun]).validate_python(response.json()) 

909 

910 async def delete_task_run(self, task_run_id: UUID) -> None: 1a

911 """ 

912 Delete a task run by id. 

913 

914 Args: 

915 task_run_id: the task run ID of interest 

916 Raises: 

917 prefect.exceptions.ObjectNotFound: If request returns 404 

918 httpx.RequestError: If requests fails 

919 """ 

920 try: 

921 await self._client.delete(f"/task_runs/{task_run_id}") 

922 except httpx.HTTPStatusError as e: 

923 if e.response.status_code == 404: 

924 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

925 else: 

926 raise 

927 

928 async def set_task_run_state( 1a

929 self, 

930 task_run_id: UUID, 

931 state: prefect.states.State[T], 

932 force: bool = False, 

933 ) -> OrchestrationResult[T]: 

934 """ 

935 Set the state of a task run. 

936 

937 Args: 

938 task_run_id: the id of the task run 

939 state: the state to set 

940 force: if True, disregard orchestration logic when setting the state, 

941 forcing the Prefect API to accept the state 

942 

943 Returns: 

944 an OrchestrationResult model representation of state orchestration output 

945 """ 

946 state_create = prefect.states.to_state_create(state) 

947 state_create.state_details.task_run_id = task_run_id 

948 response = await self._client.post( 

949 f"/task_runs/{task_run_id}/set_state", 

950 json=dict(state=state_create.model_dump(mode="json"), force=force), 

951 ) 

952 result: OrchestrationResult[T] = OrchestrationResult.model_validate( 

953 response.json() 

954 ) 

955 return result 

956 

957 async def read_task_run_states( 1a

958 self, task_run_id: UUID 

959 ) -> list[prefect.states.State]: 

960 """ 

961 Query for the states of a task run 

962 

963 Args: 

964 task_run_id: the id of the task run 

965 

966 Returns: 

967 a list of State model representations of the task run states 

968 """ 

969 response = await self._client.get( 

970 "/task_run_states/", params=dict(task_run_id=str(task_run_id)) 

971 ) 

972 return _get_type_adapter(list[prefect.states.State]).validate_python( 

973 response.json() 

974 ) 

975 

976 async def read_work_queues( 1a

977 self, 

978 work_pool_name: Optional[str] = None, 

979 work_queue_filter: Optional[WorkQueueFilter] = None, 

980 limit: Optional[int] = None, 

981 offset: Optional[int] = None, 

982 ) -> list[WorkQueue]: 

983 """ 

984 Retrieves queues for a work pool. 

985 

986 Args: 

987 work_pool_name: Name of the work pool for which to get queues. 

988 work_queue_filter: Criteria by which to filter queues. 

989 limit: Limit for the queue query. 

990 offset: Limit for the queue query. 

991 

992 Returns: 

993 List of queues for the specified work pool. 

994 """ 

995 json: dict[str, Any] = { 

996 "work_queues": ( 

997 work_queue_filter.model_dump(mode="json", exclude_unset=True) 

998 if work_queue_filter 

999 else None 

1000 ), 

1001 "limit": limit, 

1002 "offset": offset, 

1003 } 

1004 

1005 if work_pool_name: 

1006 try: 

1007 response = await self._client.post( 

1008 f"/work_pools/{work_pool_name}/queues/filter", 

1009 json=json, 

1010 ) 

1011 except httpx.HTTPStatusError as e: 

1012 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1013 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1014 else: 

1015 raise 

1016 else: 

1017 response = await self._client.post("/work_queues/filter", json=json) 

1018 

1019 return _get_type_adapter(list[WorkQueue]).validate_python(response.json()) 

1020 

1021 async def read_worker_metadata(self) -> dict[str, Any]: 1a

1022 """Reads worker metadata stored in Prefect collection registry.""" 

1023 response = await self._client.get("collections/views/aggregate-worker-metadata") 

1024 response.raise_for_status() 

1025 return response.json() 

1026 

1027 async def api_version(self) -> str: 1a

1028 res = await self._client.get("/admin/version") 

1029 return res.json() 

1030 

1031 def client_version(self) -> str: 1a

1032 return prefect.__version__ 

1033 

1034 @property 1a

1035 def loop(self) -> asyncio.AbstractEventLoop | None: 1a

1036 return self._loop 

1037 

1038 async def raise_for_api_version_mismatch(self) -> None: 1a

1039 # Cloud is always compatible as a server 

1040 if self.server_type == ServerType.CLOUD: 

1041 return 

1042 

1043 try: 

1044 api_version = await self.api_version() 

1045 except Exception as e: 

1046 if "Unauthorized" in str(e): 

1047 raise e 

1048 raise RuntimeError(f"Failed to reach API at {self.api_url}") from e 

1049 

1050 api_version = version.parse(api_version) 

1051 client_version = version.parse(self.client_version()) 

1052 

1053 if api_version.major != client_version.major: 

1054 raise RuntimeError( 

1055 f"Found incompatible versions: client: {client_version}, server: {api_version}. " 

1056 "Major versions must match." 

1057 ) 

1058 if api_version < client_version: 

1059 warning_message = ( 

1060 "Your Prefect server is running an older version of Prefect than your client which may result in unexpected behavior. " 

1061 f"Please upgrade your Prefect server from version {api_version} to version {client_version} or higher." 

1062 ) 

1063 try: 

1064 get_run_logger().warning(warning_message) 

1065 except prefect.context.MissingContextError: 

1066 self.logger.warning(warning_message) 

1067 

1068 async def __aenter__(self) -> Self: 1a

1069 """ 

1070 Start the client. 

1071 

1072 If the client is already started, this will raise an exception. 

1073 

1074 If the client is already closed, this will raise an exception. Use a new client 

1075 instance instead. 

1076 """ 

1077 if self._closed: 

1078 # httpx.AsyncClient does not allow reuse so we will not either. 

1079 raise RuntimeError( 

1080 "The client cannot be started again after closing. " 

1081 "Retrieve a new client with `get_client()` instead." 

1082 ) 

1083 

1084 self._context_stack += 1 

1085 

1086 if self._started: 

1087 # allow reentrancy 

1088 return self 

1089 

1090 self._loop = asyncio.get_running_loop() 

1091 await self._exit_stack.__aenter__() 

1092 

1093 # Enter a lifespan context if using an ephemeral application. 

1094 # See https://github.com/encode/httpx/issues/350 

1095 if self._ephemeral_app and self.manage_lifespan: 

1096 self._ephemeral_lifespan = await self._exit_stack.enter_async_context( 

1097 app_lifespan_context(self._ephemeral_app) 

1098 ) 

1099 

1100 if self._ephemeral_app: 

1101 self.logger.debug( 

1102 "Using ephemeral application with database at " 

1103 f"{PREFECT_API_DATABASE_CONNECTION_URL.value()}" 

1104 ) 

1105 else: 

1106 self.logger.debug(f"Connecting to API at {self.api_url}") 

1107 

1108 # Enter the httpx client's context 

1109 await self._exit_stack.enter_async_context(self._client) 

1110 

1111 self._started = True 

1112 

1113 return self 

1114 

1115 async def __aexit__(self, *exc_info: Any) -> Optional[bool]: 1a

1116 """ 

1117 Shutdown the client. 

1118 """ 

1119 

1120 self._context_stack -= 1 

1121 if self._context_stack > 0: 

1122 return 

1123 self._closed = True 

1124 return await self._exit_stack.__aexit__(*exc_info) 

1125 

1126 def __enter__(self) -> NoReturn: 1a

1127 raise RuntimeError( 

1128 "The `PrefectClient` must be entered with an async context. Use 'async " 

1129 "with PrefectClient(...)' not 'with PrefectClient(...)'" 

1130 ) 

1131 

1132 def __exit__(self, *_: object) -> NoReturn: 1a

1133 assert False, "This should never be called but must be defined for __enter__" 

1134 

1135 

1136class SyncPrefectClient( 1a

1137 ArtifactClient, 

1138 ArtifactCollectionClient, 

1139 LogClient, 

1140 VariableClient, 

1141 ConcurrencyLimitClient, 

1142 DeploymentClient, 

1143 AutomationClient, 

1144 SlaClient, 

1145 FlowRunClient, 

1146 FlowClient, 

1147 BlocksDocumentClient, 

1148 BlocksSchemaClient, 

1149 BlocksTypeClient, 

1150 WorkPoolClient, 

1151 EventClient, 

1152): 

1153 """ 

1154 A synchronous client for interacting with the [Prefect REST API](https://docs.prefect.io/v3/api-ref/rest-api/). 

1155 

1156 Args: 

1157 api: the REST API URL or FastAPI application to connect to 

1158 api_key: An optional API key for authentication. 

1159 api_version: The API version this client is compatible with. 

1160 httpx_settings: An optional dictionary of settings to pass to the underlying 

1161 `httpx.Client` 

1162 

1163 Examples: 

1164 

1165 Say hello to a Prefect REST API 

1166 

1167 ```python 

1168 with get_client(sync_client=True) as client: 

1169 response = client.hello() 

1170 

1171 print(response.json()) 

1172 👋 

1173 ``` 

1174 """ 

1175 

1176 def __init__( 1a

1177 self, 

1178 api: Union[str, ASGIApp], 

1179 *, 

1180 auth_string: Optional[str] = None, 

1181 api_key: Optional[str] = None, 

1182 api_version: Optional[str] = None, 

1183 httpx_settings: Optional[dict[str, Any]] = None, 

1184 server_type: Optional[ServerType] = None, 

1185 ) -> None: 

1186 httpx_settings = httpx_settings.copy() if httpx_settings else {} 

1187 httpx_settings.setdefault("headers", {}) 

1188 

1189 tls_verify = httpx_settings.get("verify") 

1190 if not tls_verify or not isinstance(tls_verify, ssl.SSLContext): 

1191 if PREFECT_API_TLS_INSECURE_SKIP_VERIFY: 

1192 # Create an unverified context for insecure connections 

1193 ctx = ssl.create_default_context() 

1194 ctx.check_hostname = False 

1195 ctx.verify_mode = ssl.CERT_NONE 

1196 httpx_settings.setdefault("verify", ctx) 

1197 else: 

1198 cert_file = PREFECT_API_SSL_CERT_FILE.value() 

1199 if not cert_file: 

1200 cert_file = certifi.where() 

1201 # Create a verified context with the certificate file 

1202 ctx = ssl.create_default_context(cafile=cert_file) 

1203 httpx_settings.setdefault("verify", ctx) 

1204 

1205 if api_version is None: 

1206 api_version = SERVER_API_VERSION 

1207 httpx_settings["headers"].setdefault("X-PREFECT-API-VERSION", api_version) 

1208 # Prioritize auth_string if provided, otherwise use api_key 

1209 if auth_string: 

1210 token = base64.b64encode(auth_string.encode("utf-8")).decode("utf-8") 

1211 httpx_settings["headers"]["Authorization"] = ( 

1212 f"Basic {token}" # Overwrite if exists 

1213 ) 

1214 elif api_key: 

1215 httpx_settings["headers"]["Authorization"] = ( 

1216 f"Bearer {api_key}" # Set if auth_string is not present 

1217 ) 

1218 

1219 # Context management 

1220 self._context_stack: int = 0 

1221 self._ephemeral_app: Optional[ASGIApp] = None 

1222 self.manage_lifespan = True 

1223 self.server_type: ServerType 

1224 

1225 self._closed = False 

1226 self._started = False 

1227 

1228 # Connect to an external application 

1229 if isinstance(api, str): 

1230 if httpx_settings.get("app"): 

1231 raise ValueError( 

1232 "Invalid httpx settings: `app` cannot be set when providing an " 

1233 "api url. `app` is only for use with ephemeral instances. Provide " 

1234 "it as the `api` parameter instead." 

1235 ) 

1236 httpx_settings.setdefault("base_url", api) 

1237 

1238 # See https://www.python-httpx.org/advanced/#pool-limit-configuration 

1239 httpx_settings.setdefault( 

1240 "limits", 

1241 httpx.Limits( 

1242 # We see instability when allowing the client to open many connections at once. 

1243 # Limiting concurrency results in more stable performance. 

1244 max_connections=16, 

1245 max_keepalive_connections=8, 

1246 # The Prefect Cloud LB will keep connections alive for 30s. 

1247 # Only allow the client to keep connections alive for 25s. 

1248 keepalive_expiry=25, 

1249 ), 

1250 ) 

1251 

1252 # See https://www.python-httpx.org/http2/ 

1253 # Enabling HTTP/2 support on the client does not necessarily mean that your requests 

1254 # and responses will be transported over HTTP/2, since both the client and the server 

1255 # need to support HTTP/2. If you connect to a server that only supports HTTP/1.1 the 

1256 # client will use a standard HTTP/1.1 connection instead. 

1257 httpx_settings.setdefault("http2", PREFECT_API_ENABLE_HTTP2.value()) 

1258 

1259 if server_type: 

1260 self.server_type = server_type 

1261 else: 

1262 self.server_type = ( 

1263 ServerType.CLOUD 

1264 if api.startswith(PREFECT_CLOUD_API_URL.value()) 

1265 else ServerType.SERVER 

1266 ) 

1267 

1268 # Connect to an in-process application 

1269 else: 

1270 self._ephemeral_app = api 

1271 self.server_type = ServerType.EPHEMERAL 

1272 

1273 # See https://www.python-httpx.org/advanced/#timeout-configuration 

1274 httpx_settings.setdefault( 

1275 "timeout", 

1276 httpx.Timeout( 

1277 connect=PREFECT_API_REQUEST_TIMEOUT.value(), 

1278 read=PREFECT_API_REQUEST_TIMEOUT.value(), 

1279 write=PREFECT_API_REQUEST_TIMEOUT.value(), 

1280 pool=PREFECT_API_REQUEST_TIMEOUT.value(), 

1281 ), 

1282 ) 

1283 

1284 if not PREFECT_TESTING_UNIT_TEST_MODE: 

1285 httpx_settings.setdefault("follow_redirects", True) 

1286 

1287 enable_csrf_support = ( 

1288 self.server_type != ServerType.CLOUD 

1289 and PREFECT_CLIENT_CSRF_SUPPORT_ENABLED.value() 

1290 ) 

1291 

1292 self._client = PrefectHttpxSyncClient( 

1293 **httpx_settings, enable_csrf_support=enable_csrf_support 

1294 ) 

1295 

1296 # See https://www.python-httpx.org/advanced/#custom-transports 

1297 # 

1298 # If we're using an HTTP/S client (not the ephemeral client), adjust the 

1299 # transport to add retries _after_ it is instantiated. If we alter the transport 

1300 # before instantiation, the transport will not be aware of proxies unless we 

1301 # reproduce all of the logic to make it so. 

1302 # 

1303 # Only alter the transport to set our default of 3 retries, don't modify any 

1304 # transport a user may have provided via httpx_settings. 

1305 # 

1306 # Making liberal use of getattr and isinstance checks here to avoid any 

1307 # surprises if the internals of httpx or httpcore change on us 

1308 if isinstance(api, str) and not httpx_settings.get("transport"): 

1309 transport_for_url = getattr(self._client, "_transport_for_url", None) 

1310 if callable(transport_for_url): 

1311 server_transport = transport_for_url(httpx.URL(api)) 

1312 if isinstance(server_transport, httpx.HTTPTransport): 

1313 pool = getattr(server_transport, "_pool", None) 

1314 if isinstance(pool, httpcore.ConnectionPool): 

1315 setattr(pool, "_retries", 3) 

1316 

1317 self.logger: Logger = get_logger("client") 

1318 

1319 @property 1a

1320 def api_url(self) -> httpx.URL: 1a

1321 """ 

1322 Get the base URL for the API. 

1323 """ 

1324 return self._client.base_url 

1325 

1326 # Context management ---------------------------------------------------------------- 

1327 

1328 def __enter__(self) -> "SyncPrefectClient": 1a

1329 """ 

1330 Start the client. 

1331 

1332 If the client is already started, this will raise an exception. 

1333 

1334 If the client is already closed, this will raise an exception. Use a new client 

1335 instance instead. 

1336 """ 

1337 if self._closed: 

1338 # httpx.Client does not allow reuse so we will not either. 

1339 raise RuntimeError( 

1340 "The client cannot be started again after closing. " 

1341 "Retrieve a new client with `get_client()` instead." 

1342 ) 

1343 

1344 self._context_stack += 1 

1345 

1346 if self._started: 

1347 # allow reentrancy 

1348 return self 

1349 

1350 self._client.__enter__() 

1351 self._started = True 

1352 

1353 return self 

1354 

1355 def __exit__(self, *exc_info: Any) -> None: 1a

1356 """ 

1357 Shutdown the client. 

1358 """ 

1359 self._context_stack -= 1 

1360 if self._context_stack > 0: 

1361 return 

1362 self._closed = True 

1363 self._client.__exit__(*exc_info) 

1364 

1365 # API methods ---------------------------------------------------------------------- 

1366 

1367 def api_healthcheck(self) -> Optional[Exception]: 1a

1368 """ 

1369 Attempts to connect to the API and returns the encountered exception if not 

1370 successful. 

1371 

1372 If successful, returns `None`. 

1373 """ 

1374 try: 

1375 self._client.get("/health") 

1376 return None 

1377 except Exception as exc: 

1378 return exc 

1379 

1380 def hello(self) -> httpx.Response: 1a

1381 """ 

1382 Send a GET request to /hello for testing purposes. 

1383 """ 

1384 return self._client.get("/hello") 

1385 

1386 def api_version(self) -> str: 1a

1387 res = self._client.get("/admin/version") 

1388 return res.json() 

1389 

1390 def client_version(self) -> str: 1a

1391 return prefect.__version__ 

1392 

1393 def raise_for_api_version_mismatch(self) -> None: 1a

1394 # Cloud is always compatible as a server 

1395 if self.server_type == ServerType.CLOUD: 

1396 return 

1397 

1398 try: 

1399 api_version = self.api_version() 

1400 except Exception as e: 

1401 if "Unauthorized" in str(e): 

1402 raise e 

1403 raise RuntimeError(f"Failed to reach API at {self.api_url}") from e 

1404 

1405 api_version = version.parse(api_version) 

1406 client_version = version.parse(self.client_version()) 

1407 

1408 if api_version.major != client_version.major: 

1409 raise RuntimeError( 

1410 f"Found incompatible versions: client: {client_version}, server: {api_version}. " 

1411 "Major versions must match." 

1412 ) 

1413 if api_version < client_version: 

1414 warning_message = ( 

1415 "Your Prefect server is running an older version of Prefect than your client which may result in unexpected behavior. " 

1416 f"Please upgrade your Prefect server from version {api_version} to version {client_version} or higher." 

1417 ) 

1418 try: 

1419 get_run_logger().warning(warning_message) 

1420 except prefect.context.MissingContextError: 

1421 self.logger.warning(warning_message) 

1422 

1423 def set_task_run_name(self, task_run_id: UUID, name: str) -> httpx.Response: 1a

1424 task_run_data = TaskRunUpdate(name=name) 

1425 return self._client.patch( 

1426 f"/task_runs/{task_run_id}", 

1427 json=task_run_data.model_dump(mode="json", exclude_unset=True), 

1428 ) 

1429 

1430 def create_task_run( 1a

1431 self, 

1432 task: "TaskObject[P, R]", 

1433 flow_run_id: Optional[UUID], 

1434 dynamic_key: str, 

1435 id: Optional[UUID] = None, 

1436 name: Optional[str] = None, 

1437 extra_tags: Optional[Iterable[str]] = None, 

1438 state: Optional[prefect.states.State[R]] = None, 

1439 task_inputs: Optional[ 

1440 dict[ 

1441 str, 

1442 list[ 

1443 Union[ 

1444 TaskRunResult, 

1445 FlowRunResult, 

1446 Parameter, 

1447 Constant, 

1448 ] 

1449 ], 

1450 ] 

1451 ] = None, 

1452 ) -> TaskRun: 

1453 """ 

1454 Create a task run 

1455 

1456 Args: 

1457 task: The Task to run 

1458 flow_run_id: The flow run id with which to associate the task run 

1459 dynamic_key: A key unique to this particular run of a Task within the flow 

1460 id: An optional ID for the task run. If not provided, one will be generated 

1461 server-side. 

1462 name: An optional name for the task run 

1463 extra_tags: an optional list of extra tags to apply to the task run in 

1464 addition to `task.tags` 

1465 state: The initial state for the run. If not provided, defaults to 

1466 `Pending` for now. Should always be a `Scheduled` type. 

1467 task_inputs: the set of inputs passed to the task 

1468 

1469 Returns: 

1470 The created task run. 

1471 """ 

1472 tags = set(task.tags).union(extra_tags or []) 

1473 

1474 if state is None: 

1475 state = prefect.states.Pending() 

1476 

1477 retry_delay = task.retry_delay_seconds 

1478 if isinstance(retry_delay, list): 

1479 retry_delay = [int(rd) for rd in retry_delay] 

1480 elif isinstance(retry_delay, float): 

1481 retry_delay = int(retry_delay) 

1482 

1483 task_run_data = TaskRunCreate( 

1484 id=id, 

1485 name=name, 

1486 flow_run_id=flow_run_id, 

1487 task_key=task.task_key, 

1488 dynamic_key=dynamic_key, 

1489 tags=list(tags), 

1490 task_version=task.version, 

1491 empirical_policy=TaskRunPolicy( 

1492 retries=task.retries, 

1493 retry_delay=retry_delay, 

1494 retry_jitter_factor=task.retry_jitter_factor, 

1495 ), 

1496 state=prefect.states.to_state_create(state), 

1497 task_inputs=task_inputs or {}, 

1498 ) 

1499 

1500 content = task_run_data.model_dump_json(exclude={"id"} if id is None else None) 

1501 

1502 response = self._client.post("/task_runs/", content=content) 

1503 return TaskRun.model_validate(response.json()) 

1504 

1505 def read_task_run(self, task_run_id: UUID) -> TaskRun: 1a

1506 """ 

1507 Query the Prefect API for a task run by id. 

1508 

1509 Args: 

1510 task_run_id: the task run ID of interest 

1511 

1512 Returns: 

1513 a Task Run model representation of the task run 

1514 """ 

1515 try: 

1516 response = self._client.get(f"/task_runs/{task_run_id}") 

1517 return TaskRun.model_validate(response.json()) 

1518 except httpx.HTTPStatusError as e: 

1519 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1520 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1521 else: 

1522 raise 

1523 

1524 def read_task_runs( 1a

1525 self, 

1526 *, 

1527 flow_filter: Optional[FlowFilter] = None, 

1528 flow_run_filter: Optional[FlowRunFilter] = None, 

1529 task_run_filter: Optional[TaskRunFilter] = None, 

1530 deployment_filter: Optional[DeploymentFilter] = None, 

1531 sort: Optional[TaskRunSort] = None, 

1532 limit: Optional[int] = None, 

1533 offset: int = 0, 

1534 ) -> list[TaskRun]: 

1535 """ 

1536 Query the Prefect API for task runs. Only task runs matching all criteria will 

1537 be returned. 

1538 

1539 Args: 

1540 flow_filter: filter criteria for flows 

1541 flow_run_filter: filter criteria for flow runs 

1542 task_run_filter: filter criteria for task runs 

1543 deployment_filter: filter criteria for deployments 

1544 sort: sort criteria for the task runs 

1545 limit: a limit for the task run query 

1546 offset: an offset for the task run query 

1547 

1548 Returns: 

1549 a list of Task Run model representations 

1550 of the task runs 

1551 """ 

1552 body: dict[str, Any] = { 

1553 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

1554 "flow_runs": ( 

1555 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

1556 if flow_run_filter 

1557 else None 

1558 ), 

1559 "task_runs": ( 

1560 task_run_filter.model_dump(mode="json") if task_run_filter else None 

1561 ), 

1562 "deployments": ( 

1563 deployment_filter.model_dump(mode="json") if deployment_filter else None 

1564 ), 

1565 "sort": sort, 

1566 "limit": limit, 

1567 "offset": offset, 

1568 } 

1569 response = self._client.post("/task_runs/filter", json=body) 

1570 return _get_type_adapter(list[TaskRun]).validate_python(response.json()) 

1571 

1572 def set_task_run_state( 1a

1573 self, 

1574 task_run_id: UUID, 

1575 state: prefect.states.State[Any], 

1576 force: bool = False, 

1577 ) -> OrchestrationResult[Any]: 

1578 """ 

1579 Set the state of a task run. 

1580 

1581 Args: 

1582 task_run_id: the id of the task run 

1583 state: the state to set 

1584 force: if True, disregard orchestration logic when setting the state, 

1585 forcing the Prefect API to accept the state 

1586 

1587 Returns: 

1588 an OrchestrationResult model representation of state orchestration output 

1589 """ 

1590 state_create = prefect.states.to_state_create(state) 

1591 state_create.state_details.task_run_id = task_run_id 

1592 response = self._client.post( 

1593 f"/task_runs/{task_run_id}/set_state", 

1594 json=dict(state=state_create.model_dump(mode="json"), force=force), 

1595 ) 

1596 result: OrchestrationResult[Any] = OrchestrationResult.model_validate( 

1597 response.json() 

1598 ) 

1599 return result 

1600 

1601 def read_task_run_states(self, task_run_id: UUID) -> list[prefect.states.State]: 1a

1602 """ 

1603 Query for the states of a task run 

1604 

1605 Args: 

1606 task_run_id: the id of the task run 

1607 

1608 Returns: 

1609 a list of State model representations of the task run states 

1610 """ 

1611 response = self._client.get( 

1612 "/task_run_states/", params=dict(task_run_id=str(task_run_id)) 

1613 ) 

1614 return _get_type_adapter(list[prefect.states.State]).validate_python( 

1615 response.json() 

1616 ) 

1617 

1618 def delete_task_run(self, task_run_id: UUID) -> None: 1a

1619 """ 

1620 Delete a task run by id. 

1621 

1622 Args: 

1623 task_run_id: the task run ID of interest 

1624 Raises: 

1625 prefect.exceptions.ObjectNotFound: If request returns 404 

1626 httpx.RequestError: If requests fails 

1627 """ 

1628 try: 

1629 self._client.delete(f"/task_runs/{task_run_id}") 

1630 except httpx.HTTPStatusError as e: 

1631 if e.response.status_code == 404: 

1632 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1633 else: 

1634 raise 

1635 

1636 def create_work_queue( 1a

1637 self, 

1638 name: str, 

1639 description: Optional[str] = None, 

1640 is_paused: Optional[bool] = None, 

1641 concurrency_limit: Optional[int] = None, 

1642 priority: Optional[int] = None, 

1643 work_pool_name: Optional[str] = None, 

1644 ) -> WorkQueue: 

1645 """ 

1646 Create a work queue. 

1647 

1648 Args: 

1649 name: a unique name for the work queue 

1650 description: An optional description for the work queue. 

1651 is_paused: Whether or not the work queue is paused. 

1652 concurrency_limit: An optional concurrency limit for the work queue. 

1653 priority: The queue's priority. Lower values are higher priority (1 is the highest). 

1654 work_pool_name: The name of the work pool to use for this queue. 

1655 

1656 Raises: 

1657 prefect.exceptions.ObjectAlreadyExists: If request returns 409 

1658 httpx.RequestError: If request fails 

1659 

1660 Returns: 

1661 The created work queue 

1662 """ 

1663 create_model = WorkQueueCreate(name=name, filter=None) 

1664 if description is not None: 

1665 create_model.description = description 

1666 if is_paused is not None: 

1667 create_model.is_paused = is_paused 

1668 if concurrency_limit is not None: 

1669 create_model.concurrency_limit = concurrency_limit 

1670 if priority is not None: 

1671 create_model.priority = priority 

1672 

1673 data = create_model.model_dump(mode="json") 

1674 try: 

1675 if work_pool_name is not None: 

1676 response = self._client.post( 

1677 f"/work_pools/{work_pool_name}/queues", json=data 

1678 ) 

1679 else: 

1680 response = self._client.post("/work_queues/", json=data) 

1681 except httpx.HTTPStatusError as e: 

1682 if e.response.status_code == status.HTTP_409_CONFLICT: 

1683 raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e 

1684 elif e.response.status_code == status.HTTP_404_NOT_FOUND: 

1685 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1686 else: 

1687 raise 

1688 return WorkQueue.model_validate(response.json()) 

1689 

1690 def read_work_queue_by_name( 1a

1691 self, 

1692 name: str, 

1693 work_pool_name: Optional[str] = None, 

1694 ) -> WorkQueue: 

1695 """ 

1696 Read a work queue by name. 

1697 

1698 Args: 

1699 name (str): a unique name for the work queue 

1700 work_pool_name (str, optional): the name of the work pool 

1701 the queue belongs to. 

1702 

1703 Raises: 

1704 prefect.exceptions.ObjectNotFound: if no work queue is found 

1705 httpx.HTTPStatusError: other status errors 

1706 

1707 Returns: 

1708 WorkQueue: a work queue API object 

1709 """ 

1710 try: 

1711 if work_pool_name is not None: 

1712 response = self._client.get( 

1713 f"/work_pools/{work_pool_name}/queues/{name}" 

1714 ) 

1715 else: 

1716 response = self._client.get(f"/work_queues/name/{name}") 

1717 except httpx.HTTPStatusError as e: 

1718 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1719 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1720 else: 

1721 raise 

1722 

1723 return WorkQueue.model_validate(response.json()) 

1724 

1725 def update_work_queue(self, id: UUID, **kwargs: Any) -> None: 1a

1726 """ 

1727 Update properties of a work queue. 

1728 

1729 Args: 

1730 id: the ID of the work queue to update 

1731 **kwargs: the fields to update 

1732 

1733 Raises: 

1734 ValueError: if no kwargs are provided 

1735 prefect.exceptions.ObjectNotFound: if request returns 404 

1736 httpx.RequestError: if the request fails 

1737 

1738 """ 

1739 if not kwargs: 

1740 raise ValueError("No fields provided to update.") 

1741 

1742 data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True) 

1743 try: 

1744 self._client.patch(f"/work_queues/{id}", json=data) 

1745 except httpx.HTTPStatusError as e: 

1746 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1747 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1748 else: 

1749 raise 

1750 

1751 def get_runs_in_work_queue( 1a

1752 self, 

1753 id: UUID, 

1754 limit: int = 10, 

1755 scheduled_before: Optional[datetime.datetime] = None, 

1756 ) -> list[FlowRun]: 

1757 """ 

1758 Read flow runs off a work queue. 

1759 

1760 Args: 

1761 id: the id of the work queue to read from 

1762 limit: a limit on the number of runs to return 

1763 scheduled_before: a timestamp; only runs scheduled before this time will be returned. 

1764 Defaults to now. 

1765 

1766 Raises: 

1767 prefect.exceptions.ObjectNotFound: If request returns 404 

1768 httpx.RequestError: If request fails 

1769 

1770 Returns: 

1771 List[FlowRun]: a list of FlowRun objects read from the queue 

1772 """ 

1773 if scheduled_before is None: 

1774 scheduled_before = now("UTC") 

1775 

1776 try: 

1777 response = self._client.post( 

1778 f"/work_queues/{id}/get_runs", 

1779 json={ 

1780 "limit": limit, 

1781 "scheduled_before": scheduled_before.isoformat(), 

1782 }, 

1783 ) 

1784 except httpx.HTTPStatusError as e: 

1785 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1786 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1787 else: 

1788 raise 

1789 return _get_type_adapter(list[FlowRun]).validate_python(response.json()) 

1790 

1791 def read_work_queue( 1a

1792 self, 

1793 id: UUID, 

1794 ) -> WorkQueue: 

1795 """ 

1796 Read a work queue. 

1797 

1798 Args: 

1799 id: the id of the work queue to load 

1800 

1801 Raises: 

1802 prefect.exceptions.ObjectNotFound: If request returns 404 

1803 httpx.RequestError: If request fails 

1804 

1805 Returns: 

1806 WorkQueue: an instantiated WorkQueue object 

1807 """ 

1808 try: 

1809 response = self._client.get(f"/work_queues/{id}") 

1810 except httpx.HTTPStatusError as e: 

1811 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1812 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1813 else: 

1814 raise 

1815 return WorkQueue.model_validate(response.json()) 

1816 

1817 def read_work_queue_status( 1a

1818 self, 

1819 id: UUID, 

1820 ) -> WorkQueueStatusDetail: 

1821 """ 

1822 Read a work queue status. 

1823 

1824 Args: 

1825 id: the id of the work queue to load 

1826 

1827 Raises: 

1828 prefect.exceptions.ObjectNotFound: If request returns 404 

1829 httpx.RequestError: If request fails 

1830 

1831 Returns: 

1832 WorkQueueStatus: an instantiated WorkQueueStatus object 

1833 """ 

1834 try: 

1835 response = self._client.get(f"/work_queues/{id}/status") 

1836 except httpx.HTTPStatusError as e: 

1837 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1838 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1839 else: 

1840 raise 

1841 return WorkQueueStatusDetail.model_validate(response.json()) 

1842 

1843 def match_work_queues( 1a

1844 self, 

1845 prefixes: list[str], 

1846 work_pool_name: Optional[str] = None, 

1847 ) -> list[WorkQueue]: 

1848 """ 

1849 Query the Prefect API for work queues with names with a specific prefix. 

1850 

1851 Args: 

1852 prefixes: a list of strings used to match work queue name prefixes 

1853 work_pool_name: an optional work pool name to scope the query to 

1854 

1855 Returns: 

1856 a list of WorkQueue model representations 

1857 of the work queues 

1858 """ 

1859 page_length = 100 

1860 current_page = 0 

1861 work_queues: list[WorkQueue] = [] 

1862 

1863 while True: 

1864 new_queues = self.read_work_queues( 

1865 work_pool_name=work_pool_name, 

1866 offset=current_page * page_length, 

1867 limit=page_length, 

1868 work_queue_filter=WorkQueueFilter( 

1869 name=WorkQueueFilterName(startswith_=prefixes) 

1870 ), 

1871 ) 

1872 if not new_queues: 

1873 break 

1874 work_queues += new_queues 

1875 current_page += 1 

1876 

1877 return work_queues 

1878 

1879 def delete_work_queue_by_id( 1a

1880 self, 

1881 id: UUID, 

1882 ) -> None: 

1883 """ 

1884 Delete a work queue by its ID. 

1885 

1886 Args: 

1887 id: the id of the work queue to delete 

1888 

1889 Raises: 

1890 prefect.exceptions.ObjectNotFound: If request returns 404 

1891 httpx.RequestError: If requests fails 

1892 """ 

1893 try: 

1894 self._client.delete( 

1895 f"/work_queues/{id}", 

1896 ) 

1897 except httpx.HTTPStatusError as e: 

1898 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1899 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1900 else: 

1901 raise 

1902 

1903 def read_work_queues( 1a

1904 self, 

1905 work_pool_name: Optional[str] = None, 

1906 work_queue_filter: Optional[WorkQueueFilter] = None, 

1907 limit: Optional[int] = None, 

1908 offset: Optional[int] = None, 

1909 ) -> list[WorkQueue]: 

1910 """ 

1911 Retrieves queues for a work pool. 

1912 

1913 Args: 

1914 work_pool_name: Name of the work pool for which to get queues. 

1915 work_queue_filter: Criteria by which to filter queues. 

1916 limit: Limit for the queue query. 

1917 offset: Limit for the queue query. 

1918 

1919 Returns: 

1920 List of queues for the specified work pool. 

1921 """ 

1922 json: dict[str, Any] = { 

1923 "work_queues": ( 

1924 work_queue_filter.model_dump(mode="json", exclude_unset=True) 

1925 if work_queue_filter 

1926 else None 

1927 ), 

1928 "limit": limit, 

1929 "offset": offset, 

1930 } 

1931 

1932 if work_pool_name: 

1933 try: 

1934 response = self._client.post( 

1935 f"/work_pools/{work_pool_name}/queues/filter", 

1936 json=json, 

1937 ) 

1938 except httpx.HTTPStatusError as e: 

1939 if e.response.status_code == status.HTTP_404_NOT_FOUND: 

1940 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e 

1941 else: 

1942 raise 

1943 else: 

1944 response = self._client.post("/work_queues/filter", json=json) 

1945 

1946 return _get_type_adapter(list[WorkQueue]).validate_python(response.json()) 

1947 

1948 def read_worker_metadata(self) -> dict[str, Any]: 1a

1949 """Reads worker metadata stored in Prefect collection registry.""" 

1950 response = self._client.get("collections/views/aggregate-worker-metadata") 

1951 response.raise_for_status() 

1952 return response.json()