Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/global_concurrency_limit.py: 17%

145 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from pathlib import Path 1a

2from typing import Optional 1a

3 

4import orjson 1a

5import typer 1a

6from pydantic import ValidationError 1a

7from rich.pretty import Pretty 1a

8from rich.table import Table 1a

9 

10from prefect.cli._types import PrefectTyper 1a

11from prefect.cli._utilities import exit_with_error, exit_with_success 1a

12from prefect.cli.root import app, is_interactive 1a

13from prefect.client.orchestration import get_client 1a

14from prefect.client.schemas.actions import ( 1a

15 GlobalConcurrencyLimitCreate, 

16 GlobalConcurrencyLimitUpdate, 

17) 

18from prefect.exceptions import ( 1a

19 ObjectNotFound, 

20 PrefectHTTPStatusError, 

21) 

22from prefect.types._datetime import human_friendly_diff 1a

23 

24global_concurrency_limit_app: PrefectTyper = PrefectTyper( 1a

25 name="global-concurrency-limit", 

26 help="Manage global concurrency limits.", 

27) 

28 

29app.add_typer(global_concurrency_limit_app, aliases=["gcl"]) 1a

30 

31 

32@global_concurrency_limit_app.command("ls") 1a

33async def list_global_concurrency_limits(): 1a

34 """ 

35 List all global concurrency limits. 

36 """ 

37 async with get_client() as client: 

38 gcl_limits = await client.read_global_concurrency_limits(limit=100, offset=0) 

39 if not gcl_limits: 

40 exit_with_success("No global concurrency limits found.") 

41 

42 table = Table( 

43 title="Global Concurrency Limits", 

44 caption="List Global Concurrency Limits using `prefect global-concurrency-limit ls`", 

45 show_header=True, 

46 ) 

47 

48 table.add_column("ID", justify="right", style="cyan", no_wrap=True, overflow="fold") 

49 table.add_column("Name", style="blue", no_wrap=True, overflow="fold") 

50 table.add_column("Active", style="blue", no_wrap=True) 

51 table.add_column("Limit", style="blue", no_wrap=True) 

52 table.add_column("Active Slots", style="blue", no_wrap=True) 

53 table.add_column("Slot Decay Per Second", style="blue", no_wrap=True) 

54 table.add_column("Created", style="blue", no_wrap=True) 

55 table.add_column("Updated", style="blue", no_wrap=True) 

56 

57 for gcl_limit in sorted(gcl_limits, key=lambda x: f"{x.name}"): 

58 assert gcl_limit.created is not None, "created is not None" 

59 assert gcl_limit.updated is not None, "updated is not None" 

60 table.add_row( 

61 str(gcl_limit.id), 

62 gcl_limit.name, 

63 str(gcl_limit.active), 

64 str(gcl_limit.limit), 

65 str(gcl_limit.active_slots), 

66 str(gcl_limit.slot_decay_per_second), 

67 gcl_limit.created.isoformat(), 

68 human_friendly_diff(gcl_limit.updated), 

69 ) 

70 

71 app.console.print(table) 

72 

73 

74@global_concurrency_limit_app.command("inspect") 1a

75async def inspect_global_concurrency_limit( 1a

76 name: str = typer.Argument( 

77 ..., help="The name of the global concurrency limit to inspect." 

78 ), 

79 output: Optional[str] = typer.Option( 

80 None, 

81 "--output", 

82 "-o", 

83 help="Specify an output format. Currently supports: json", 

84 ), 

85 file_path: Optional[Path] = typer.Option( 

86 None, 

87 "--file", 

88 "-f", 

89 help="Path to .json file to write the global concurrency limit output to.", 

90 ), 

91): 

92 """ 

93 Inspect a global concurrency limit. 

94 

95 Arguments: 

96 name (str): The name of the global concurrency limit to inspect. 

97 output (Optional[OutputFormat]): An output format for the command. Currently only supports JSON. 

98 Required if --file/-f is set. 

99 file_path (Optional[Path]): A path to .json file to write the global concurrent limit output to. 

100 

101 Returns: 

102 id (str): The ID of the global concurrency limit. 

103 created (str): The created date of the global concurrency limit. 

104 updated (str): The updated date of the global concurrency limit. 

105 name (str): The name of the global concurrency limit. 

106 limit (int): The limit of the global concurrency limit. 

107 active_slots (int): The number of active slots. 

108 slot_decay_per_second (float): The slot decay per second. 

109 

110 """ 

111 if output and output.lower() != "json": 

112 exit_with_error("Only 'json' output format is supported.") 

113 

114 if file_path and not output: 

115 exit_with_error("The --file/-f option requires the --output option to be set.") 

116 

117 async with get_client() as client: 

118 try: 

119 gcl_limit = await client.read_global_concurrency_limit_by_name(name=name) 

120 except ObjectNotFound: 

121 exit_with_error(f"Global concurrency limit {name!r} not found.") 

122 

123 if output and output.lower() == "json": 

124 gcl_limit_json = gcl_limit.model_dump(mode="json") 

125 json_output = orjson.dumps(gcl_limit_json, option=orjson.OPT_INDENT_2).decode() 

126 if not file_path: 

127 app.console.print(json_output) 

128 else: 

129 with open(file_path, "w") as f: 

130 f.write(json_output) 

131 exit_with_success( 

132 f"Global concurrency limit {name!r} written to {file_path}" 

133 ) 

134 else: 

135 app.console.print(Pretty(gcl_limit)) 

136 

137 

138@global_concurrency_limit_app.command("delete") 1a

139async def delete_global_concurrency_limit( 1a

140 name: str = typer.Argument( 

141 ..., help="The name of the global concurrency limit to delete." 

142 ), 

143): 

144 """ 

145 Delete a global concurrency limit. 

146 

147 Arguments: 

148 name (str): The name of the global concurrency limit to delete. 

149 """ 

150 async with get_client() as client: 

151 try: 

152 gcl_limit = await client.read_global_concurrency_limit_by_name(name=name) 

153 

154 if is_interactive() and not typer.confirm( 

155 f"Are you sure you want to delete global concurrency limit with name {gcl_limit.name!r}?", 

156 default=False, 

157 ): 

158 exit_with_error("Deletion aborted.") 

159 

160 await client.delete_global_concurrency_limit_by_name(name=name) 

161 except ObjectNotFound: 

162 exit_with_error(f"Global concurrency limit {name!r} not found.") 

163 

164 exit_with_success(f"Deleted global concurrency limit with name {name!r}.") 

165 

166 

167@global_concurrency_limit_app.command("enable") 1a

168async def enable_global_concurrency_limit( 1a

169 name: str = typer.Argument( 

170 ..., help="The name of the global concurrency limit to enable." 

171 ), 

172): 

173 """ 

174 Enable a global concurrency limit. 

175 

176 Arguments: 

177 name (str): The name of the global concurrency limit to enable. 

178 """ 

179 async with get_client() as client: 

180 try: 

181 gcl_limit = await client.read_global_concurrency_limit_by_name(name=name) 

182 if gcl_limit.active: 

183 exit_with_error( 

184 f"Global concurrency limit with name {name!r} is already enabled." 

185 ) 

186 await client.update_global_concurrency_limit( 

187 name=name, 

188 concurrency_limit=GlobalConcurrencyLimitUpdate(active=True), 

189 ) 

190 except ObjectNotFound: 

191 exit_with_error(f"Global concurrency limit {name!r} not found.") 

192 

193 exit_with_success(f"Enabled global concurrency limit with name {name!r}.") 

194 

195 

196@global_concurrency_limit_app.command("disable") 1a

197async def disable_global_concurrency_limit( 1a

198 name: str = typer.Argument( 

199 ..., help="The name of the global concurrency limit to disable." 

200 ), 

201): 

202 """ 

203 Disable a global concurrency limit. 

204 

205 Arguments: 

206 name (str): The name of the global concurrency limit to disable. 

207 """ 

208 async with get_client() as client: 

209 try: 

210 gcl_limit = await client.read_global_concurrency_limit_by_name(name=name) 

211 if not gcl_limit.active: 

212 exit_with_error( 

213 f"Global concurrency limit with name {name!r} is already disabled." 

214 ) 

215 await client.update_global_concurrency_limit( 

216 name=name, 

217 concurrency_limit=GlobalConcurrencyLimitUpdate(active=False), 

218 ) 

219 except ObjectNotFound: 

220 exit_with_error(f"Global concurrency limit {name!r} not found.") 

221 

222 exit_with_success(f"Disabled global concurrency limit with name {name!r}.") 

223 

224 

225@global_concurrency_limit_app.command("update") 1a

226async def update_global_concurrency_limit( 1a

227 name: str = typer.Argument( 

228 ..., help="The name of the global concurrency limit to update." 

229 ), 

230 enable: Optional[bool] = typer.Option( 

231 None, "--enable", help="Enable the global concurrency limit." 

232 ), 

233 disable: Optional[bool] = typer.Option( 

234 None, "--disable", help="Disable the global concurrency limit." 

235 ), 

236 limit: Optional[int] = typer.Option( 

237 None, "--limit", "-l", help="The limit of the global concurrency limit." 

238 ), 

239 active_slots: Optional[int] = typer.Option( 

240 None, "--active-slots", help="The number of active slots." 

241 ), 

242 slot_decay_per_second: Optional[float] = typer.Option( 

243 None, "--slot-decay-per-second", help="The slot decay per second." 

244 ), 

245): 

246 """ 

247 Update a global concurrency limit. 

248 

249 Arguments: 

250 name (str): The name of the global concurrency limit to update. 

251 enable (Optional[bool]): Enable the global concurrency limit. 

252 disable (Optional[bool]): Disable the global concurrency limit. 

253 limit (Optional[int]): The limit of the global concurrency limit. 

254 active_slots (Optional[int]): The number of active slots. 

255 slot_decay_per_second (Optional[float]): The slot decay per second. 

256 

257 Examples: 

258 $ prefect global-concurrency-limit update my-gcl --limit 10 

259 $ prefect gcl update my-gcl --active-slots 5 

260 $ prefect gcl update my-gcl --slot-decay-per-second 0.5 

261 $ prefect gcl update my-gcl --enable 

262 $ prefect gcl update my-gcl --disable --limit 5 

263 """ 

264 gcl = GlobalConcurrencyLimitUpdate() 

265 

266 if enable and disable: 

267 exit_with_error( 

268 "Cannot enable and disable a global concurrency limit at the same time." 

269 ) 

270 

271 if enable: 

272 gcl.active = True 

273 if disable: 

274 gcl.active = False 

275 

276 if limit is not None: 

277 gcl.limit = limit 

278 

279 if active_slots is not None: 

280 gcl.active_slots = active_slots 

281 

282 if slot_decay_per_second is not None: 

283 gcl.slot_decay_per_second = slot_decay_per_second 

284 

285 if not gcl.model_dump(exclude_unset=True): 

286 exit_with_error("No update arguments provided.") 

287 

288 try: 

289 GlobalConcurrencyLimitUpdate(**gcl.model_dump()) 

290 except ValidationError as exc: 

291 exit_with_error(f"Invalid arguments provided: {exc}") 

292 except Exception as exc: 

293 exit_with_error(f"Error creating global concurrency limit: {exc}") 

294 

295 async with get_client() as client: 

296 try: 

297 await client.update_global_concurrency_limit( 

298 name=name, concurrency_limit=gcl 

299 ) 

300 except ObjectNotFound: 

301 exit_with_error(f"Global concurrency limit {name!r} not found.") 

302 except PrefectHTTPStatusError as exc: 

303 if exc.response.status_code == 422: 

304 parsed_response = exc.response.json() 

305 

306 error_message = parsed_response["exception_detail"][0]["msg"] 

307 

308 exit_with_error( 

309 f"Error updating global concurrency limit: {error_message}" 

310 ) 

311 

312 exit_with_success(f"Updated global concurrency limit with name {name!r}.") 

313 

314 

315@global_concurrency_limit_app.command("create") 1a

316async def create_global_concurrency_limit( 1a

317 name: str = typer.Argument( 

318 ..., help="The name of the global concurrency limit to create." 

319 ), 

320 limit: int = typer.Option( 

321 ..., "--limit", "-l", help="The limit of the global concurrency limit." 

322 ), 

323 disable: Optional[bool] = typer.Option( 

324 None, "--disable", help="Create an inactive global concurrency limit." 

325 ), 

326 active_slots: Optional[int] = typer.Option( 

327 0, "--active-slots", help="The number of active slots." 

328 ), 

329 slot_decay_per_second: Optional[float] = typer.Option( 

330 0.0, "--slot-decay-per-second", help="The slot decay per second." 

331 ), 

332): 

333 """ 

334 Create a global concurrency limit. 

335 

336 Arguments: 

337 

338 name (str): The name of the global concurrency limit to create. 

339 

340 limit (int): The limit of the global concurrency limit. 

341 

342 disable (Optional[bool]): Create an inactive global concurrency limit. 

343 

344 active_slots (Optional[int]): The number of active slots. 

345 

346 slot_decay_per_second (Optional[float]): The slot decay per second. 

347 

348 Examples: 

349 

350 $ prefect global-concurrency-limit create my-gcl --limit 10 

351 

352 $ prefect gcl create my-gcl --limit 5 --active-slots 3 

353 

354 $ prefect gcl create my-gcl --limit 5 --active-slots 3 --slot-decay-per-second 0.5 

355 

356 $ prefect gcl create my-gcl --limit 5 --inactive 

357 """ 

358 async with get_client() as client: 

359 try: 

360 await client.read_global_concurrency_limit_by_name(name=name) 

361 except ObjectNotFound: 

362 pass 

363 else: 

364 exit_with_error( 

365 f"Global concurrency limit {name!r} already exists. Please try creating with a different name." 

366 ) 

367 

368 try: 

369 gcl = GlobalConcurrencyLimitCreate( 

370 name=name, 

371 limit=limit, 

372 active=False if disable else True, 

373 active_slots=active_slots, 

374 slot_decay_per_second=slot_decay_per_second, 

375 ) 

376 

377 except ValidationError as exc: 

378 exit_with_error(f"Invalid arguments provided: {exc}") 

379 except Exception as exc: 

380 exit_with_error(f"Error creating global concurrency limit: {exc}") 

381 

382 async with get_client() as client: 

383 try: 

384 gcl_id = await client.create_global_concurrency_limit(concurrency_limit=gcl) 

385 except PrefectHTTPStatusError as exc: 

386 parsed_response = exc.response.json() 

387 exc = parsed_response["exception_detail"][0]["msg"] 

388 

389 exit_with_error(f"Error updating global concurrency limit: {exc}") 

390 

391 exit_with_success( 

392 f"Created global concurrency limit with name {name!r} and ID '{gcl_id}'. Run `prefect gcl inspect {name}` to view details." 

393 )