Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_automations/client.py: 16%

150 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3from typing import TYPE_CHECKING 1a

4 

5from httpx import HTTPStatusError 1a

6 

7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

8from prefect.exceptions import ObjectNotFound 1a

9 

10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a

11 from uuid import UUID 

12 

13 from prefect.events.schemas.automations import Automation, AutomationCore 

14 

15 

16class AutomationClient(BaseClient): 1a

17 def create_automation(self, automation: "AutomationCore") -> "UUID": 1a

18 """Creates an automation in Prefect Cloud.""" 

19 response = self.request( 

20 "POST", 

21 "/automations/", 

22 json=automation.model_dump(mode="json"), 

23 ) 

24 from uuid import UUID 

25 

26 return UUID(response.json()["id"]) 

27 

28 def update_automation( 1a

29 self, automation_id: "UUID", automation: "AutomationCore" 

30 ) -> None: 

31 """Updates an automation in Prefect Cloud.""" 

32 response = self.request( 

33 "PUT", 

34 "/automations/{id}", 

35 path_params={"id": automation_id}, 

36 json=automation.model_dump(mode="json", exclude_unset=True), 

37 ) 

38 response.raise_for_status() 

39 

40 def read_automations(self) -> list["Automation"]: 1a

41 response = self.request("POST", "/automations/filter") 

42 response.raise_for_status() 

43 from prefect.events.schemas.automations import Automation 

44 

45 return Automation.model_validate_list(response.json()) 

46 

47 def find_automation(self, id_or_name: "str | UUID") -> "Automation | None": 1a

48 from uuid import UUID 

49 

50 if isinstance(id_or_name, str): 

51 name = id_or_name 

52 try: 

53 id = UUID(id_or_name) 

54 except ValueError: 

55 id = None 

56 else: 

57 id = id_or_name 

58 name = str(id) 

59 

60 if id: 

61 try: 

62 automation = self.read_automation(id) 

63 return automation 

64 except HTTPStatusError as e: 

65 if e.response.status_code == 404: 

66 raise ObjectNotFound(http_exc=e) from e 

67 

68 automations = self.read_automations() 

69 

70 # Look for it by an exact name 

71 for automation in automations: 

72 if automation.name == name: 

73 return automation 

74 

75 # Look for it by a case-insensitive name 

76 for automation in automations: 

77 if automation.name.lower() == name.lower(): 

78 return automation 

79 

80 return None 

81 

82 def read_automation(self, automation_id: "UUID | str") -> "Automation | None": 1a

83 response = self.request( 

84 "GET", "/automations/{id}", path_params={"id": automation_id} 

85 ) 

86 if response.status_code == 404: 

87 return None 

88 response.raise_for_status() 

89 from prefect.events.schemas.automations import Automation 

90 

91 return Automation.model_validate(response.json()) 

92 

93 def read_automations_by_name(self, name: str) -> list["Automation"]: 1a

94 """ 

95 Query the Prefect API for an automation by name. Only automations matching the provided name will be returned. 

96 

97 Args: 

98 name: the name of the automation to query 

99 

100 Returns: 

101 a list of Automation model representations of the automations 

102 """ 

103 from prefect.client.schemas.sorting import AutomationSort 

104 from prefect.events.filters import ( 

105 AutomationFilter, 

106 AutomationFilterName, 

107 ) 

108 

109 automation_filter = AutomationFilter(name=AutomationFilterName(any_=[name])) 

110 

111 response = self.request( 

112 "POST", 

113 "/automations/filter", 

114 json={ 

115 "sort": AutomationSort.UPDATED_DESC, 

116 "automations": automation_filter.model_dump(mode="json") 

117 if automation_filter 

118 else None, 

119 }, 

120 ) 

121 

122 response.raise_for_status() 

123 from prefect.events.schemas.automations import Automation 

124 

125 return Automation.model_validate_list(response.json()) 

126 

127 def pause_automation(self, automation_id: "UUID") -> None: 1a

128 response = self.request( 

129 "PATCH", 

130 "/automations/{id}", 

131 path_params={"id": automation_id}, 

132 json={"enabled": False}, 

133 ) 

134 response.raise_for_status() 

135 

136 def resume_automation(self, automation_id: "UUID") -> None: 1a

137 response = self.request( 

138 "PATCH", 

139 "/automations/{id}", 

140 path_params={"id": automation_id}, 

141 json={"enabled": True}, 

142 ) 

143 response.raise_for_status() 

144 

145 def delete_automation(self, automation_id: "UUID") -> None: 1a

146 response = self.request( 

147 "DELETE", 

148 "/automations/{id}", 

149 path_params={"id": automation_id}, 

150 ) 

151 if response.status_code == 404: 

152 return 

153 

154 response.raise_for_status() 

155 

156 def read_resource_related_automations(self, resource_id: str) -> list["Automation"]: 1a

157 response = self.request( 

158 "GET", 

159 "/automations/related-to/{resource_id}", 

160 path_params={"resource_id": resource_id}, 

161 ) 

162 response.raise_for_status() 

163 from prefect.events.schemas.automations import Automation 

164 

165 return Automation.model_validate_list(response.json()) 

166 

167 def delete_resource_owned_automations(self, resource_id: str) -> None: 1a

168 self.request( 

169 "DELETE", 

170 "/automations/owned-by/{resource_id}", 

171 path_params={"resource_id": resource_id}, 

172 ) 

173 

174 

175class AutomationAsyncClient(BaseAsyncClient): 1a

176 async def create_automation(self, automation: "AutomationCore") -> "UUID": 1a

177 """Creates an automation in Prefect Cloud.""" 

178 response = await self.request( 

179 "POST", 

180 "/automations/", 

181 json=automation.model_dump(mode="json"), 

182 ) 

183 from uuid import UUID 

184 

185 return UUID(response.json()["id"]) 

186 

187 async def update_automation( 1a

188 self, automation_id: "UUID", automation: "AutomationCore" 

189 ) -> None: 

190 """Updates an automation in Prefect Cloud.""" 

191 response = await self.request( 

192 "PUT", 

193 "/automations/{id}", 

194 path_params={"id": automation_id}, 

195 json=automation.model_dump(mode="json", exclude_unset=True), 

196 ) 

197 response.raise_for_status() 

198 

199 async def read_automations(self) -> list["Automation"]: 1a

200 response = await self.request("POST", "/automations/filter") 

201 response.raise_for_status() 

202 from prefect.events.schemas.automations import Automation 

203 

204 return Automation.model_validate_list(response.json()) 

205 

206 async def find_automation(self, id_or_name: "str | UUID") -> "Automation | None": 1a

207 from uuid import UUID 

208 

209 if isinstance(id_or_name, str): 

210 name = id_or_name 

211 try: 

212 id = UUID(id_or_name) 

213 except ValueError: 

214 id = None 

215 else: 

216 id = id_or_name 

217 name = str(id) 

218 

219 if id: 

220 try: 

221 automation = await self.read_automation(id) 

222 return automation 

223 except HTTPStatusError as e: 

224 if e.response.status_code == 404: 

225 raise ObjectNotFound(http_exc=e) from e 

226 

227 automations = await self.read_automations() 

228 

229 # Look for it by an exact name 

230 for automation in automations: 

231 if automation.name == name: 

232 return automation 

233 

234 # Look for it by a case-insensitive name 

235 for automation in automations: 

236 if automation.name.lower() == name.lower(): 

237 return automation 

238 

239 return None 

240 

241 async def read_automation(self, automation_id: "UUID | str") -> "Automation | None": 1a

242 response = await self.request( 

243 "GET", "/automations/{id}", path_params={"id": automation_id} 

244 ) 

245 if response.status_code == 404: 

246 return None 

247 response.raise_for_status() 

248 from prefect.events.schemas.automations import Automation 

249 

250 return Automation.model_validate(response.json()) 

251 

252 async def read_automations_by_name(self, name: str) -> list["Automation"]: 1a

253 """ 

254 Query the Prefect API for an automation by name. Only automations matching the provided name will be returned. 

255 

256 Args: 

257 name: the name of the automation to query 

258 

259 Returns: 

260 a list of Automation model representations of the automations 

261 """ 

262 from prefect.client.schemas.sorting import AutomationSort 

263 from prefect.events.filters import ( 

264 AutomationFilter, 

265 AutomationFilterName, 

266 ) 

267 

268 automation_filter = AutomationFilter(name=AutomationFilterName(any_=[name])) 

269 

270 response = await self.request( 

271 "POST", 

272 "/automations/filter", 

273 json={ 

274 "sort": AutomationSort.UPDATED_DESC, 

275 "automations": automation_filter.model_dump(mode="json") 

276 if automation_filter 

277 else None, 

278 }, 

279 ) 

280 

281 response.raise_for_status() 

282 from prefect.events.schemas.automations import Automation 

283 

284 return Automation.model_validate_list(response.json()) 

285 

286 async def pause_automation(self, automation_id: "UUID") -> None: 1a

287 response = await self.request( 

288 "PATCH", 

289 "/automations/{id}", 

290 path_params={"id": automation_id}, 

291 json={"enabled": False}, 

292 ) 

293 response.raise_for_status() 

294 

295 async def resume_automation(self, automation_id: "UUID") -> None: 1a

296 response = await self.request( 

297 "PATCH", 

298 "/automations/{id}", 

299 path_params={"id": automation_id}, 

300 json={"enabled": True}, 

301 ) 

302 response.raise_for_status() 

303 

304 async def delete_automation(self, automation_id: "UUID") -> None: 1a

305 response = await self.request( 

306 "DELETE", 

307 "/automations/{id}", 

308 path_params={"id": automation_id}, 

309 ) 

310 if response.status_code == 404: 

311 return 

312 

313 response.raise_for_status() 

314 

315 async def read_resource_related_automations( 1a

316 self, resource_id: str 

317 ) -> list["Automation"]: 

318 response = await self.request( 

319 "GET", 

320 "/automations/related-to/{resource_id}", 

321 path_params={"resource_id": resource_id}, 

322 ) 

323 response.raise_for_status() 

324 from prefect.events.schemas.automations import Automation 

325 

326 return Automation.model_validate_list(response.json()) 

327 

328 async def delete_resource_owned_automations(self, resource_id: str) -> None: 1a

329 await self.request( 

330 "DELETE", 

331 "/automations/owned-by/{resource_id}", 

332 path_params={"resource_id": resource_id}, 

333 )