Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_blocks_schemas/client.py: 22%

69 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3from typing import TYPE_CHECKING 1a

4 

5from httpx import HTTPStatusError 1a

6 

7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

8from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a

9 

10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a

11 from uuid import UUID 

12 

13 from prefect.client.schemas.actions import BlockSchemaCreate 

14 from prefect.client.schemas.objects import BlockSchema 

15 

16 

17class BlocksSchemaClient(BaseClient): 1a

18 def create_block_schema(self, block_schema: "BlockSchemaCreate") -> "BlockSchema": 1a

19 """ 

20 Create a block schema in the Prefect API. 

21 """ 

22 try: 

23 response = self.request( 

24 "POST", 

25 "/block_schemas/", 

26 json=block_schema.model_dump( 

27 mode="json", 

28 exclude_unset=True, 

29 exclude={"id", "block_type", "checksum"}, 

30 ), 

31 ) 

32 except HTTPStatusError as e: 

33 if e.response.status_code == 409: 

34 raise ObjectAlreadyExists(http_exc=e) from e 

35 else: 

36 raise 

37 from prefect.client.schemas.objects import BlockSchema 

38 

39 return BlockSchema.model_validate(response.json()) 

40 

41 def read_block_schema_by_checksum( 1a

42 self, checksum: str, version: str | None = None 

43 ) -> "BlockSchema": 

44 """ 

45 Look up a block schema checksum 

46 """ 

47 try: 

48 response = self.request( 

49 "GET", 

50 "/block_schemas/checksum/{checksum}", 

51 path_params={"checksum": checksum}, 

52 **({"params": {"version": version}} if version else {}), 

53 ) 

54 except HTTPStatusError as e: 

55 if e.response.status_code == 404: 

56 raise ObjectNotFound(http_exc=e) from e 

57 else: 

58 raise 

59 from prefect.client.schemas.objects import BlockSchema 

60 

61 return BlockSchema.model_validate(response.json()) 

62 

63 def read_block_schemas(self) -> "list[BlockSchema]": 1a

64 """ 

65 Read all block schemas 

66 Raises: 

67 httpx.RequestError: if a valid block schema was not found 

68 

69 Returns: 

70 A BlockSchema. 

71 """ 

72 response = self.request("POST", "/block_schemas/filter", json={}) 

73 from prefect.client.schemas.objects import BlockSchema 

74 

75 return BlockSchema.model_validate_list(response.json()) 

76 

77 def get_most_recent_block_schema_for_block_type( 1a

78 self, 

79 block_type_id: "UUID", 

80 ) -> "BlockSchema | None": 

81 """ 

82 Fetches the most recent block schema for a specified block type ID. 

83 

84 Args: 

85 block_type_id: The ID of the block type. 

86 

87 Raises: 

88 httpx.RequestError: If the request fails for any reason. 

89 

90 Returns: 

91 The most recent block schema or None. 

92 """ 

93 try: 

94 response = self.request( 

95 "POST", 

96 "/block_schemas/filter", 

97 json={ 

98 "block_schemas": {"block_type_id": {"any_": [str(block_type_id)]}}, 

99 "limit": 1, 

100 }, 

101 ) 

102 except HTTPStatusError: 

103 raise 

104 from prefect.client.schemas.objects import BlockSchema 

105 

106 return next(iter(BlockSchema.model_validate_list(response.json())), None) 

107 

108 

109class BlocksSchemaAsyncClient(BaseAsyncClient): 1a

110 async def create_block_schema( 1a

111 self, block_schema: "BlockSchemaCreate" 

112 ) -> "BlockSchema": 

113 """ 

114 Create a block schema in the Prefect API. 

115 """ 

116 try: 

117 response = await self.request( 

118 "POST", 

119 "/block_schemas/", 

120 json=block_schema.model_dump( 

121 mode="json", 

122 exclude_unset=True, 

123 exclude={"id", "block_type", "checksum"}, 

124 ), 

125 ) 

126 except HTTPStatusError as e: 

127 if e.response.status_code == 409: 

128 raise ObjectAlreadyExists(http_exc=e) from e 

129 else: 

130 raise 

131 from prefect.client.schemas.objects import BlockSchema 

132 

133 return BlockSchema.model_validate(response.json()) 

134 

135 async def read_block_schema_by_checksum( 1a

136 self, checksum: str, version: str | None = None 

137 ) -> "BlockSchema": 

138 """ 

139 Look up a block schema checksum 

140 """ 

141 try: 

142 response = await self.request( 

143 "GET", 

144 "/block_schemas/checksum/{checksum}", 

145 path_params={"checksum": checksum}, 

146 **({"params": {"version": version}} if version else {}), 

147 ) 

148 except HTTPStatusError as e: 

149 if e.response.status_code == 404: 

150 raise ObjectNotFound(http_exc=e) from e 

151 else: 

152 raise 

153 from prefect.client.schemas.objects import BlockSchema 

154 

155 return BlockSchema.model_validate(response.json()) 

156 

157 async def read_block_schemas(self) -> "list[BlockSchema]": 1a

158 """ 

159 Read all block schemas 

160 Raises: 

161 httpx.RequestError: if a valid block schema was not found 

162 

163 Returns: 

164 A BlockSchema. 

165 """ 

166 response = await self.request("POST", "/block_schemas/filter", json={}) 

167 from prefect.client.schemas.objects import BlockSchema 

168 

169 return BlockSchema.model_validate_list(response.json()) 

170 

171 async def get_most_recent_block_schema_for_block_type( 1a

172 self, 

173 block_type_id: "UUID", 

174 ) -> "BlockSchema | None": 

175 """ 

176 Fetches the most recent block schema for a specified block type ID. 

177 

178 Args: 

179 block_type_id: The ID of the block type. 

180 

181 Raises: 

182 httpx.RequestError: If the request fails for any reason. 

183 

184 Returns: 

185 The most recent block schema or None. 

186 """ 

187 try: 

188 response = await self.request( 

189 "POST", 

190 "/block_schemas/filter", 

191 json={ 

192 "block_schemas": {"block_type_id": {"any_": [str(block_type_id)]}}, 

193 "limit": 1, 

194 }, 

195 ) 

196 except HTTPStatusError: 

197 raise 

198 from prefect.client.schemas.objects import BlockSchema 

199 

200 return next(iter(BlockSchema.model_validate_list(response.json())), None)