Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/block.py: 16%

234 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Command line interface for working with blocks. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import inspect 1a

8from importlib import import_module 1a

9from pathlib import Path 1a

10from types import ModuleType 1a

11from typing import TYPE_CHECKING, Any, Optional 1a

12from uuid import UUID 1a

13 

14import typer 1a

15import yaml 1a

16from rich.table import Table 1a

17 

18from prefect.blocks.core import Block, InvalidBlockRegistration 1a

19from prefect.cli._types import PrefectTyper 1a

20from prefect.cli._utilities import exit_with_error, exit_with_success 1a

21from prefect.cli.root import app, is_interactive 1a

22from prefect.client.base import ServerType, determine_server_type 1a

23from prefect.client.orchestration import get_client 1a

24from prefect.exceptions import ( 1a

25 ObjectNotFound, 

26 PrefectHTTPStatusError, 

27 ProtectedBlockError, 

28 ScriptError, 

29 exception_traceback, 

30) 

31from prefect.settings import get_current_settings 1a

32from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a

33from prefect.utilities.importtools import load_script_as_module 1a

34 

35if TYPE_CHECKING: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true1a

36 from prefect.client.schemas.objects import BlockDocument, BlockType 

37 

38blocks_app: PrefectTyper = PrefectTyper(name="block", help="Manage blocks.") 1a

39blocktypes_app: PrefectTyper = PrefectTyper( 1a

40 name="type", help="Inspect and delete block types." 

41) 

42app.add_typer(blocks_app, aliases=["blocks"]) 1a

43blocks_app.add_typer(blocktypes_app, aliases=["types"]) 1a

44 

45 

46def display_block(block_document: "BlockDocument") -> Table: 1a

47 block_slug = ( 

48 f"{getattr(block_document.block_type, 'slug', '')}/{block_document.name}" 

49 ) 

50 block_table = Table( 

51 title=block_slug, show_header=False, show_footer=False, expand=True 

52 ) 

53 block_table.add_column(style="italic cyan") 

54 block_table.add_column(style="blue") 

55 

56 block_table.add_row("Block Type", getattr(block_document.block_type, "name", "")) 

57 block_table.add_row("Block id", str(block_document.id), end_section=True) 

58 for k, v in block_document.data.items(): 

59 block_table.add_row(k, str(v)) 

60 

61 return block_table 

62 

63 

64def display_block_type(block_type: "BlockType") -> Table: 1a

65 block_type_table = Table( 

66 title=block_type.name, show_header=False, show_footer=False, expand=True 

67 ) 

68 block_type_table.add_column(style="italic cyan") 

69 block_type_table.add_column(style="blue") 

70 

71 block_type_table.add_row("Slug", block_type.slug) 

72 block_type_table.add_row("Block Type id", str(block_type.id)) 

73 block_type_table.add_row( 

74 "Description", 

75 ( 

76 block_type.description.splitlines()[0].partition(".")[0] 

77 if block_type.description is not None 

78 else "" 

79 ), 

80 end_section=True, 

81 ) 

82 

83 return block_type_table 

84 

85 

86def display_block_schema_properties(block_schema_fields: dict[str, Any]) -> Table: 1a

87 required = block_schema_fields.get("required", []) 

88 properties = block_schema_fields.get("properties", {}) 

89 

90 block_schema_yaml_table = Table( 

91 title="Schema Properties", 

92 show_header=False, 

93 show_footer=False, 

94 show_lines=True, 

95 expand=True, 

96 ) 

97 block_schema_yaml_table.add_column(style="cyan") 

98 block_schema_yaml_table.add_column() 

99 

100 for property_name, property_schema in properties.items(): 

101 if property_name in required: 

102 property_schema["required"] = True 

103 

104 block_schema_yaml_table.add_row( 

105 property_name, yaml.dump(property_schema, default_flow_style=False) 

106 ) 

107 

108 return block_schema_yaml_table 

109 

110 

111def display_block_schema_extra_definitions( 1a

112 block_schema_definitions: dict[str, Any], 

113) -> Table: 

114 extra_definitions_table = Table( 

115 title="Extra Definitions", show_header=False, show_footer=False, expand=True 

116 ) 

117 extra_definitions_table.add_column(style="cyan") 

118 extra_definitions_table.add_column() 

119 extra_definitions_table.add_column() 

120 

121 for definition_name, definition_schema in block_schema_definitions.items(): 

122 for index, (property_name, property_schema) in enumerate( 

123 definition_schema.get("properties", {}).items() 

124 ): 

125 # We'll set the definition column for the first row of each group only 

126 # to give visual whitespace between each group 

127 extra_definitions_table.add_row( 

128 definition_name if index == 0 else None, 

129 property_name, 

130 yaml.dump(property_schema, default_flow_style=False), 

131 ) 

132 

133 return extra_definitions_table 

134 

135 

136async def _register_blocks_in_module(module: ModuleType) -> list[type[Block]]: 1a

137 registered_blocks: list[type[Block]] = [] 

138 for _, cls in inspect.getmembers(module): 

139 if cls is not None and Block.is_block_class(cls): 

140 try: 

141 coro = cls.register_type_and_schema() 

142 if TYPE_CHECKING: 

143 assert inspect.isawaitable(coro) 

144 await coro 

145 registered_blocks.append(cls) 

146 except InvalidBlockRegistration: 

147 # Attempted to register Block base class or a Block interface 

148 pass 

149 return registered_blocks 

150 

151 

152def _build_registered_blocks_table(registered_blocks: list[type[Block]]) -> Table: 1a

153 table = Table("Registered Blocks") 

154 for block in registered_blocks: 

155 table.add_row(block.get_block_type_name()) 

156 return table 

157 

158 

159@blocks_app.command() 1a

160async def register( 1a

161 module_name: Optional[str] = typer.Option( 

162 None, 

163 "--module", 

164 "-m", 

165 help="Python module containing block types to be registered", 

166 ), 

167 file_path: Optional[Path] = typer.Option( 

168 None, 

169 "--file", 

170 "-f", 

171 help="Path to .py file containing block types to be registered", 

172 ), 

173): 

174 """ 

175 Register blocks types within a module or file. 

176 

177 This makes the blocks available for configuration via the UI. 

178 If a block type has already been registered, its registration will be updated to 

179 match the block's current definition. 

180 

181 \b 

182 Examples: 

183 \b 

184 Register block types in a Python module: 

185 $ prefect block register -m prefect_aws.credentials 

186 \b 

187 Register block types in a .py file: 

188 $ prefect block register -f my_blocks.py 

189 """ 

190 # Handles if both options are specified or if neither are specified 

191 if not (bool(file_path) ^ bool(module_name)): 

192 exit_with_error( 

193 "Please specify either a module or a file containing blocks to be" 

194 " registered, but not both." 

195 ) 

196 

197 imported_module: Optional[ModuleType] = None 

198 if module_name: 

199 try: 

200 imported_module = import_module(name=module_name) 

201 except ModuleNotFoundError: 

202 exit_with_error( 

203 f"Unable to load {module_name}. Please make sure the module is " 

204 "installed in your current environment." 

205 ) 

206 

207 if file_path: 

208 if file_path.suffix != ".py": 

209 exit_with_error( 

210 f"{file_path} is not a .py file. Please specify a " 

211 ".py that contains blocks to be registered." 

212 ) 

213 try: 

214 imported_module = await run_sync_in_worker_thread( 

215 load_script_as_module, str(file_path) 

216 ) 

217 except ScriptError as exc: 

218 app.console.print(exc) 

219 app.console.print(exception_traceback(exc.user_exc)) 

220 exit_with_error( 

221 f"Unable to load file at {file_path}. Please make sure the file path " 

222 "is correct and the file contains valid Python." 

223 ) 

224 

225 if TYPE_CHECKING: 

226 assert imported_module is not None 

227 registered_blocks = await _register_blocks_in_module(imported_module) 

228 number_of_registered_blocks = len(registered_blocks) 

229 

230 if number_of_registered_blocks == 0: 

231 source = f"module {module_name!r}" if module_name else f"file {file_path!r}" 

232 exit_with_error( 

233 f"No blocks were registered from {source}.\n\nPlease make sure the {source} " 

234 "contains valid blocks.\n" 

235 ) 

236 

237 block_text = "block" if 0 < number_of_registered_blocks < 2 else "blocks" 

238 app.console.print( 

239 f"[green]Successfully registered {number_of_registered_blocks} {block_text}\n" 

240 ) 

241 app.console.print(_build_registered_blocks_table(registered_blocks)) 

242 msg = ( 

243 "\n To configure the newly registered blocks, " 

244 "go to the Blocks page in the Prefect UI.\n" 

245 ) 

246 

247 if ui_url := get_current_settings().ui_url: 

248 if determine_server_type() == ServerType.CLOUD: 

249 block_catalog_url = f"{ui_url}/settings/blocks/catalog" 

250 else: 

251 block_catalog_url = f"{ui_url}/blocks/catalog" 

252 msg = f"{msg.rstrip().rstrip('.')}: {block_catalog_url}\n" 

253 

254 app.console.print(msg, soft_wrap=True) 

255 

256 

257@blocks_app.command("ls") 1a

258async def block_ls(): 1a

259 """ 

260 View all configured blocks. 

261 """ 

262 async with get_client() as client: 

263 blocks = await client.read_block_documents() 

264 

265 table = Table( 

266 title="Blocks", caption="List Block Types using `prefect block type ls`" 

267 ) 

268 table.add_column("ID", style="cyan", no_wrap=True) 

269 table.add_column("Type", style="blue", no_wrap=True) 

270 table.add_column("Name", style="blue", no_wrap=True) 

271 table.add_column("Slug", style="blue", no_wrap=True) 

272 

273 for block in sorted( 

274 blocks, key=lambda x: f"{getattr(x.block_type, 'slug', '')}/{x.name}" 

275 ): 

276 table.add_row( 

277 str(block.id), 

278 getattr(block.block_type, "name", ""), 

279 str(block.name), 

280 f"{getattr(block.block_type, 'slug', '')}/{block.name}", 

281 ) 

282 

283 app.console.print(table) 

284 

285 

286@blocks_app.command("delete") 1a

287async def block_delete( 1a

288 slug: Optional[str] = typer.Argument( 

289 None, help="A block slug. Formatted as '<BLOCK_TYPE_SLUG>/<BLOCK_NAME>'" 

290 ), 

291 block_id: Optional[UUID] = typer.Option(None, "--id", help="A block id."), 

292): 

293 """ 

294 Delete a configured block. 

295 """ 

296 async with get_client() as client: 

297 if slug is None and block_id is not None: 

298 try: 

299 if is_interactive() and not typer.confirm( 

300 (f"Are you sure you want to delete block with id {block_id!r}?"), 

301 default=False, 

302 ): 

303 exit_with_error("Deletion aborted.") 

304 await client.delete_block_document(block_id) 

305 exit_with_success(f"Deleted Block '{block_id}'.") 

306 except ObjectNotFound: 

307 exit_with_error(f"Block {block_id!r} not found!") 

308 elif slug is not None: 

309 if "/" not in slug: 

310 exit_with_error( 

311 f"{slug!r} is not valid. Slug must contain a '/', e.g. 'json/my-json-block'" 

312 ) 

313 block_type_slug, block_document_name = slug.split("/") 

314 try: 

315 block_document = await client.read_block_document_by_name( 

316 block_document_name, block_type_slug, include_secrets=False 

317 ) 

318 if is_interactive() and not typer.confirm( 

319 (f"Are you sure you want to delete block with slug {slug!r}?"), 

320 default=False, 

321 ): 

322 exit_with_error("Deletion aborted.") 

323 await client.delete_block_document(block_document.id) 

324 exit_with_success(f"Deleted Block '{slug}'.") 

325 except ObjectNotFound: 

326 exit_with_error(f"Block {slug!r} not found!") 

327 else: 

328 exit_with_error("Must provide a block slug or id") 

329 

330 

331@blocks_app.command("create") 1a

332async def block_create( 1a

333 block_type_slug: str = typer.Argument( 

334 ..., 

335 help="A block type slug. View available types with: prefect block type ls", 

336 show_default=False, 

337 ), 

338): 

339 """ 

340 Generate a link to the Prefect UI to create a block. 

341 """ 

342 async with get_client() as client: 

343 try: 

344 block_type = await client.read_block_type_by_slug(block_type_slug) 

345 except ObjectNotFound: 

346 app.console.print(f"[red]Block type {block_type_slug!r} not found![/red]") 

347 block_types = await client.read_block_types() 

348 slugs = {block_type.slug for block_type in block_types} 

349 app.console.print(f"Available block types: {', '.join(slugs)}") 

350 raise typer.Exit(1) 

351 

352 ui_url = get_current_settings().ui_url 

353 if not ui_url: 

354 exit_with_error( 

355 "Prefect must be configured to use a hosted Prefect server or " 

356 "Prefect Cloud to display the Prefect UI" 

357 ) 

358 if determine_server_type() == ServerType.CLOUD: 

359 block_link = f"{ui_url}/settings/blocks/catalog/{block_type.slug}/create" 

360 else: 

361 block_link = f"{ui_url}/blocks/catalog/{block_type.slug}/create" 

362 app.console.print( 

363 f"Create a {block_type_slug} block: {block_link}", 

364 ) 

365 

366 

367@blocks_app.command("inspect") 1a

368async def block_inspect( 1a

369 slug: Optional[str] = typer.Argument( 

370 None, help="A Block slug: <BLOCK_TYPE_SLUG>/<BLOCK_NAME>" 

371 ), 

372 block_id: Optional[UUID] = typer.Option( 

373 None, "--id", help="A Block id to search for if no slug is given" 

374 ), 

375): 

376 """ 

377 Displays details about a configured block. 

378 """ 

379 async with get_client() as client: 

380 if slug is None and block_id is not None: 

381 try: 

382 block_document = await client.read_block_document( 

383 block_id, include_secrets=False 

384 ) 

385 except ObjectNotFound: 

386 exit_with_error(f"Block {block_id!r} not found!") 

387 elif slug is not None: 

388 if "/" not in slug: 

389 exit_with_error( 

390 f"{slug!r} is not valid. Slug must contain a '/', e.g. 'json/my-json-block'" 

391 ) 

392 block_type_slug, block_document_name = slug.split("/") 

393 try: 

394 block_document = await client.read_block_document_by_name( 

395 block_document_name, block_type_slug, include_secrets=False 

396 ) 

397 except ObjectNotFound: 

398 exit_with_error(f"Block {slug!r} not found!") 

399 else: 

400 exit_with_error("Must provide a block slug or id") 

401 app.console.print(display_block(block_document)) 

402 

403 

404@blocktypes_app.command("ls") 1a

405async def list_types(): 1a

406 """ 

407 List all block types. 

408 """ 

409 async with get_client() as client: 

410 block_types = await client.read_block_types() 

411 

412 table = Table( 

413 title="Block Types", 

414 show_lines=True, 

415 ) 

416 

417 table.add_column("Block Type Slug", style="italic cyan", no_wrap=True) 

418 table.add_column("Description", style="blue", no_wrap=False, justify="left") 

419 table.add_column( 

420 "Generate creation link", style="italic cyan", no_wrap=False, justify="left" 

421 ) 

422 

423 for blocktype in sorted(block_types, key=lambda x: x.name): 

424 table.add_row( 

425 str(blocktype.slug), 

426 ( 

427 str(blocktype.description.splitlines()[0].partition(".")[0]) 

428 if blocktype.description is not None 

429 else "" 

430 ), 

431 f"prefect block create {blocktype.slug}", 

432 ) 

433 

434 app.console.print(table) 

435 

436 

437@blocktypes_app.command("inspect") 1a

438async def blocktype_inspect( 1a

439 slug: str = typer.Argument(..., help="A block type slug"), 

440): 

441 """ 

442 Display details about a block type. 

443 """ 

444 async with get_client() as client: 

445 try: 

446 block_type = await client.read_block_type_by_slug(slug) 

447 except ObjectNotFound: 

448 exit_with_error(f"Block type {slug!r} not found!") 

449 

450 app.console.print(display_block_type(block_type)) 

451 

452 try: 

453 latest_schema = await client.get_most_recent_block_schema_for_block_type( 

454 block_type.id 

455 ) 

456 except Exception: 

457 exit_with_error(f"Failed to fetch latest schema for the {slug} block type") 

458 

459 if latest_schema is None: 

460 exit_with_error(f"No schema found for the {slug} block type") 

461 

462 app.console.print(display_block_schema_properties(latest_schema.fields)) 

463 

464 latest_schema_extra_definitions = latest_schema.fields.get("definitions") 

465 if latest_schema_extra_definitions: 

466 app.console.print( 

467 display_block_schema_extra_definitions(latest_schema_extra_definitions) 

468 ) 

469 

470 

471@blocktypes_app.command("delete") 1a

472async def blocktype_delete( 1a

473 slug: str = typer.Argument(..., help="A Block type slug"), 

474): 

475 """ 

476 Delete an unprotected Block Type. 

477 """ 

478 async with get_client() as client: 

479 try: 

480 block_type = await client.read_block_type_by_slug(slug) 

481 if is_interactive() and not typer.confirm( 

482 (f"Are you sure you want to delete block type {block_type.slug!r}?"), 

483 default=False, 

484 ): 

485 exit_with_error("Deletion aborted.") 

486 await client.delete_block_type(block_type.id) 

487 exit_with_success(f"Deleted Block Type '{slug}'.") 

488 except ObjectNotFound: 

489 exit_with_error(f"Block Type {slug!r} not found!") 

490 except ProtectedBlockError: 

491 exit_with_error(f"Block Type {slug!r} is a protected block!") 

492 except PrefectHTTPStatusError: 

493 exit_with_error(f"Cannot delete Block Type {slug!r}!")