Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_blocks_schemas/client.py: 22%
69 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from typing import TYPE_CHECKING 1a
5from httpx import HTTPStatusError 1a
7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
8from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a
10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a
11 from uuid import UUID
13 from prefect.client.schemas.actions import BlockSchemaCreate
14 from prefect.client.schemas.objects import BlockSchema
17class BlocksSchemaClient(BaseClient): 1a
18 def create_block_schema(self, block_schema: "BlockSchemaCreate") -> "BlockSchema": 1a
19 """
20 Create a block schema in the Prefect API.
21 """
22 try:
23 response = self.request(
24 "POST",
25 "/block_schemas/",
26 json=block_schema.model_dump(
27 mode="json",
28 exclude_unset=True,
29 exclude={"id", "block_type", "checksum"},
30 ),
31 )
32 except HTTPStatusError as e:
33 if e.response.status_code == 409:
34 raise ObjectAlreadyExists(http_exc=e) from e
35 else:
36 raise
37 from prefect.client.schemas.objects import BlockSchema
39 return BlockSchema.model_validate(response.json())
41 def read_block_schema_by_checksum( 1a
42 self, checksum: str, version: str | None = None
43 ) -> "BlockSchema":
44 """
45 Look up a block schema checksum
46 """
47 try:
48 response = self.request(
49 "GET",
50 "/block_schemas/checksum/{checksum}",
51 path_params={"checksum": checksum},
52 **({"params": {"version": version}} if version else {}),
53 )
54 except HTTPStatusError as e:
55 if e.response.status_code == 404:
56 raise ObjectNotFound(http_exc=e) from e
57 else:
58 raise
59 from prefect.client.schemas.objects import BlockSchema
61 return BlockSchema.model_validate(response.json())
63 def read_block_schemas(self) -> "list[BlockSchema]": 1a
64 """
65 Read all block schemas
66 Raises:
67 httpx.RequestError: if a valid block schema was not found
69 Returns:
70 A BlockSchema.
71 """
72 response = self.request("POST", "/block_schemas/filter", json={})
73 from prefect.client.schemas.objects import BlockSchema
75 return BlockSchema.model_validate_list(response.json())
77 def get_most_recent_block_schema_for_block_type( 1a
78 self,
79 block_type_id: "UUID",
80 ) -> "BlockSchema | None":
81 """
82 Fetches the most recent block schema for a specified block type ID.
84 Args:
85 block_type_id: The ID of the block type.
87 Raises:
88 httpx.RequestError: If the request fails for any reason.
90 Returns:
91 The most recent block schema or None.
92 """
93 try:
94 response = self.request(
95 "POST",
96 "/block_schemas/filter",
97 json={
98 "block_schemas": {"block_type_id": {"any_": [str(block_type_id)]}},
99 "limit": 1,
100 },
101 )
102 except HTTPStatusError:
103 raise
104 from prefect.client.schemas.objects import BlockSchema
106 return next(iter(BlockSchema.model_validate_list(response.json())), None)
109class BlocksSchemaAsyncClient(BaseAsyncClient): 1a
110 async def create_block_schema( 1a
111 self, block_schema: "BlockSchemaCreate"
112 ) -> "BlockSchema":
113 """
114 Create a block schema in the Prefect API.
115 """
116 try:
117 response = await self.request(
118 "POST",
119 "/block_schemas/",
120 json=block_schema.model_dump(
121 mode="json",
122 exclude_unset=True,
123 exclude={"id", "block_type", "checksum"},
124 ),
125 )
126 except HTTPStatusError as e:
127 if e.response.status_code == 409:
128 raise ObjectAlreadyExists(http_exc=e) from e
129 else:
130 raise
131 from prefect.client.schemas.objects import BlockSchema
133 return BlockSchema.model_validate(response.json())
135 async def read_block_schema_by_checksum( 1a
136 self, checksum: str, version: str | None = None
137 ) -> "BlockSchema":
138 """
139 Look up a block schema checksum
140 """
141 try:
142 response = await self.request(
143 "GET",
144 "/block_schemas/checksum/{checksum}",
145 path_params={"checksum": checksum},
146 **({"params": {"version": version}} if version else {}),
147 )
148 except HTTPStatusError as e:
149 if e.response.status_code == 404:
150 raise ObjectNotFound(http_exc=e) from e
151 else:
152 raise
153 from prefect.client.schemas.objects import BlockSchema
155 return BlockSchema.model_validate(response.json())
157 async def read_block_schemas(self) -> "list[BlockSchema]": 1a
158 """
159 Read all block schemas
160 Raises:
161 httpx.RequestError: if a valid block schema was not found
163 Returns:
164 A BlockSchema.
165 """
166 response = await self.request("POST", "/block_schemas/filter", json={})
167 from prefect.client.schemas.objects import BlockSchema
169 return BlockSchema.model_validate_list(response.json())
171 async def get_most_recent_block_schema_for_block_type( 1a
172 self,
173 block_type_id: "UUID",
174 ) -> "BlockSchema | None":
175 """
176 Fetches the most recent block schema for a specified block type ID.
178 Args:
179 block_type_id: The ID of the block type.
181 Raises:
182 httpx.RequestError: If the request fails for any reason.
184 Returns:
185 The most recent block schema or None.
186 """
187 try:
188 response = await self.request(
189 "POST",
190 "/block_schemas/filter",
191 json={
192 "block_schemas": {"block_type_id": {"any_": [str(block_type_id)]}},
193 "limit": 1,
194 },
195 )
196 except HTTPStatusError:
197 raise
198 from prefect.client.schemas.objects import BlockSchema
200 return next(iter(BlockSchema.model_validate_list(response.json())), None)