Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_artifacts/client.py: 32%

94 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from typing import TYPE_CHECKING, Annotated, Optional 1a

2 

3from httpx import HTTPStatusError 1a

4from pydantic import Field 1a

5from typing_extensions import TypedDict, Unpack 1a

6 

7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

8from prefect.exceptions import ObjectNotFound 1a

9 

10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a

11 from uuid import UUID 

12 

13 from prefect.client.schemas.actions import ArtifactCreate, ArtifactUpdate 

14 from prefect.client.schemas.filters import ( 

15 ArtifactCollectionFilter, 

16 ArtifactFilter, 

17 FlowRunFilter, 

18 TaskRunFilter, 

19 ) 

20 from prefect.client.schemas.objects import Artifact, ArtifactCollection 

21 from prefect.client.schemas.sorting import ArtifactCollectionSort, ArtifactSort 

22 

23 

24class BaseArtifactReadParams(TypedDict, total=False): 1a

25 flow_run_filter: Annotated[Optional["FlowRunFilter"], Field(default=None)] 1a

26 task_run_filter: Annotated[Optional["TaskRunFilter"], Field(default=None)] 1a

27 limit: Annotated[Optional[int], Field(default=None)] 1a

28 offset: Annotated[int, Field(default=0)] 1a

29 

30 

31class ArtifactReadParams(BaseArtifactReadParams, total=False): 1a

32 artifact_filter: Annotated[Optional["ArtifactFilter"], Field(default=None)] 1a

33 sort: Annotated[Optional["ArtifactSort"], Field(default=None)] 1a

34 

35 

36class ArtifactCollectionReadParams(BaseArtifactReadParams, total=False): 1a

37 artifact_filter: Annotated[ 1a

38 Optional["ArtifactCollectionFilter"], Field(default=None) 

39 ] 

40 sort: Annotated[Optional["ArtifactCollectionSort"], Field(default=None)] 1a

41 

42 

43class ArtifactClient(BaseClient): 1a

44 def create_artifact(self, artifact: "ArtifactCreate") -> "Artifact": 1a

45 response = self.request( 

46 "POST", 

47 "/artifacts/", 

48 json=artifact.model_dump(mode="json", exclude_unset=True), 

49 ) 

50 from prefect.client.schemas.objects import Artifact 

51 from prefect.events.utilities import emit_event 

52 

53 created = Artifact.model_validate(response.json()) 

54 

55 # Emit an event for artifact creation 

56 resource = { 

57 "prefect.resource.id": f"prefect.artifact.{created.id}", 

58 } 

59 if created.key: 

60 resource["prefect.resource.name"] = created.key 

61 

62 payload = { 

63 k: v 

64 for k, v in { 

65 "key": created.key, 

66 "type": created.type, 

67 "description": created.description, 

68 }.items() 

69 } 

70 

71 emit_event( 

72 event="prefect.artifact.created", 

73 resource=resource, 

74 payload=payload, 

75 ) 

76 

77 return created 

78 

79 def update_artifact(self, artifact_id: "UUID", artifact: "ArtifactUpdate") -> None: 1a

80 from prefect.events.utilities import emit_event 

81 

82 self.request( 

83 "PATCH", 

84 "/artifacts/{id}", 

85 json=artifact.model_dump(mode="json", exclude_unset=True), 

86 path_params={"id": artifact_id}, 

87 ) 

88 # Emit an event for artifact update 

89 resource = { 

90 "prefect.resource.id": f"prefect.artifact.{artifact_id}", 

91 } 

92 payload = artifact.model_dump(mode="json", exclude_unset=True) 

93 emit_event( 

94 event="prefect.artifact.updated", 

95 resource=resource, 

96 payload=payload, 

97 ) 

98 return None 

99 

100 def delete_artifact(self, artifact_id: "UUID") -> None: 1a

101 try: 

102 self.request( 

103 "DELETE", 

104 "/artifacts/{id}", 

105 path_params={"id": artifact_id}, 

106 ) 

107 except HTTPStatusError as e: 

108 if e.response.status_code == 404: 

109 raise ObjectNotFound(http_exc=e) from e 

110 else: 

111 raise 

112 return None 

113 

114 def read_artifacts( 1a

115 self, **kwargs: Unpack["ArtifactReadParams"] 

116 ) -> list["Artifact"]: 

117 response = self.request( 

118 "POST", 

119 "/artifacts/filter", 

120 json={ 

121 "artifacts": ( 

122 artifact_filter.model_dump(mode="json", exclude_unset=True) 

123 if (artifact_filter := kwargs.get("artifact_filter")) 

124 else None 

125 ), 

126 "flow_runs": ( 

127 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

128 if (flow_run_filter := kwargs.get("flow_run_filter")) 

129 else None 

130 ), 

131 "task_runs": ( 

132 task_run_filter.model_dump(mode="json", exclude_unset=True) 

133 if (task_run_filter := kwargs.get("task_run_filter")) 

134 else None 

135 ), 

136 "limit": kwargs.get("limit"), 

137 "offset": kwargs.get("offset"), 

138 "sort": kwargs.get("sort"), 

139 }, 

140 ) 

141 from prefect.client.schemas.objects import Artifact 

142 

143 return Artifact.model_validate_list(response.json()) 

144 

145 

146class ArtifactAsyncClient(BaseAsyncClient): 1a

147 async def create_artifact(self, artifact: "ArtifactCreate") -> "Artifact": 1a

148 response = await self.request( 

149 "POST", 

150 "/artifacts/", 

151 json=artifact.model_dump(mode="json", exclude_unset=True), 

152 ) 

153 from prefect.client.schemas.objects import Artifact 

154 from prefect.events.utilities import emit_event 

155 

156 created = Artifact.model_validate(response.json()) 

157 

158 # Emit an event for artifact creation 

159 resource = { 

160 "prefect.resource.id": f"prefect.artifact.{created.id}", 

161 } 

162 if created.key: 

163 resource["prefect.resource.name"] = created.key 

164 

165 payload = { 

166 k: v 

167 for k, v in { 

168 "key": created.key, 

169 "type": created.type, 

170 "description": created.description, 

171 }.items() 

172 if v is not None 

173 } 

174 

175 emit_event( 

176 event="prefect.artifact.created", 

177 resource=resource, 

178 payload=payload, 

179 ) 

180 

181 return created 

182 

183 async def update_artifact( 1a

184 self, artifact_id: "UUID", artifact: "ArtifactUpdate" 

185 ) -> None: 

186 from prefect.events.utilities import emit_event 

187 

188 await self.request( 

189 "PATCH", 

190 "/artifacts/{id}", 

191 path_params={"id": artifact_id}, 

192 json=artifact.model_dump(mode="json", exclude_unset=True), 

193 ) 

194 # Emit an event for artifact update 

195 resource = { 

196 "prefect.resource.id": f"prefect.artifact.{artifact_id}", 

197 } 

198 payload = artifact.model_dump(mode="json", exclude_unset=True) 

199 emit_event( 

200 event="prefect.artifact.updated", 

201 resource=resource, 

202 payload=payload or None, 

203 ) 

204 return None 

205 

206 async def read_artifacts( 1a

207 self, **kwargs: Unpack["ArtifactReadParams"] 

208 ) -> list["Artifact"]: 

209 response = await self.request( 

210 "POST", 

211 "/artifacts/filter", 

212 json={ 

213 "artifacts": ( 

214 artifact_filter.model_dump(mode="json", exclude_unset=True) 

215 if (artifact_filter := kwargs.get("artifact_filter")) 

216 else None 

217 ), 

218 "flow_runs": ( 

219 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

220 if (flow_run_filter := kwargs.get("flow_run_filter")) 

221 else None 

222 ), 

223 "task_runs": ( 

224 task_run_filter.model_dump(mode="json", exclude_unset=True) 

225 if (task_run_filter := kwargs.get("task_run_filter")) 

226 else None 

227 ), 

228 "limit": kwargs.get("limit", None), 

229 "offset": kwargs.get("offset", 0), 

230 "sort": kwargs.get("sort", None), 

231 }, 

232 ) 

233 from prefect.client.schemas.objects import Artifact 

234 

235 return Artifact.model_validate_list(response.json()) 

236 

237 async def delete_artifact(self, artifact_id: "UUID") -> None: 1a

238 try: 

239 await self.request( 

240 "DELETE", 

241 "/artifacts/{id}", 

242 path_params={"id": artifact_id}, 

243 ) 

244 except HTTPStatusError as e: 

245 if e.response.status_code == 404: 

246 raise ObjectNotFound(http_exc=e) from e 

247 else: 

248 raise 

249 

250 

251class ArtifactCollectionClient(BaseClient): 1a

252 def read_latest_artifacts( 1a

253 self, **kwargs: Unpack["ArtifactCollectionReadParams"] 

254 ) -> list["ArtifactCollection"]: 

255 response = self.request( 

256 "POST", 

257 "/artifacts/latest/filter", 

258 json={ 

259 "artifacts": ( 

260 artifact_filter.model_dump(mode="json", exclude_unset=True) 

261 if (artifact_filter := kwargs.get("artifact_filter")) 

262 else None 

263 ), 

264 "flow_runs": ( 

265 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

266 if (flow_run_filter := kwargs.get("flow_run_filter")) 

267 else None 

268 ), 

269 "task_runs": ( 

270 task_run_filter.model_dump(mode="json", exclude_unset=True) 

271 if (task_run_filter := kwargs.get("task_run_filter")) 

272 else None 

273 ), 

274 "limit": kwargs.get("limit", None), 

275 "offset": kwargs.get("offset", 0), 

276 "sort": kwargs.get("sort", None), 

277 }, 

278 ) 

279 from prefect.client.schemas.objects import ArtifactCollection 

280 

281 return ArtifactCollection.model_validate_list(response.json()) 

282 

283 

284class ArtifactCollectionAsyncClient(BaseAsyncClient): 1a

285 async def read_latest_artifacts( 1a

286 self, **kwargs: Unpack["ArtifactCollectionReadParams"] 

287 ) -> list["ArtifactCollection"]: 

288 response = await self.request( 

289 "POST", 

290 "/artifacts/latest/filter", 

291 json={ 

292 "artifacts": ( 

293 artifact_filter.model_dump(mode="json", exclude_unset=True) 

294 if (artifact_filter := kwargs.get("artifact_filter")) 

295 else None 

296 ), 

297 "flow_runs": ( 

298 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

299 if (flow_run_filter := kwargs.get("flow_run_filter")) 

300 else None 

301 ), 

302 "task_runs": ( 

303 task_run_filter.model_dump(mode="json", exclude_unset=True) 

304 if (task_run_filter := kwargs.get("task_run_filter")) 

305 else None 

306 ), 

307 "limit": kwargs.get("limit", None), 

308 "offset": kwargs.get("offset", 0), 

309 "sort": kwargs.get("sort", None), 

310 }, 

311 ) 

312 from prefect.client.schemas.objects import ArtifactCollection 

313 

314 return ArtifactCollection.model_validate_list(response.json())