Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/block.py: 16%
234 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Command line interface for working with blocks.
3"""
5from __future__ import annotations 1a
7import inspect 1a
8from importlib import import_module 1a
9from pathlib import Path 1a
10from types import ModuleType 1a
11from typing import TYPE_CHECKING, Any, Optional 1a
12from uuid import UUID 1a
14import typer 1a
15import yaml 1a
16from rich.table import Table 1a
18from prefect.blocks.core import Block, InvalidBlockRegistration 1a
19from prefect.cli._types import PrefectTyper 1a
20from prefect.cli._utilities import exit_with_error, exit_with_success 1a
21from prefect.cli.root import app, is_interactive 1a
22from prefect.client.base import ServerType, determine_server_type 1a
23from prefect.client.orchestration import get_client 1a
24from prefect.exceptions import ( 1a
25 ObjectNotFound,
26 PrefectHTTPStatusError,
27 ProtectedBlockError,
28 ScriptError,
29 exception_traceback,
30)
31from prefect.settings import get_current_settings 1a
32from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a
33from prefect.utilities.importtools import load_script_as_module 1a
35if TYPE_CHECKING: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true1a
36 from prefect.client.schemas.objects import BlockDocument, BlockType
38blocks_app: PrefectTyper = PrefectTyper(name="block", help="Manage blocks.") 1a
39blocktypes_app: PrefectTyper = PrefectTyper( 1a
40 name="type", help="Inspect and delete block types."
41)
42app.add_typer(blocks_app, aliases=["blocks"]) 1a
43blocks_app.add_typer(blocktypes_app, aliases=["types"]) 1a
46def display_block(block_document: "BlockDocument") -> Table: 1a
47 block_slug = (
48 f"{getattr(block_document.block_type, 'slug', '')}/{block_document.name}"
49 )
50 block_table = Table(
51 title=block_slug, show_header=False, show_footer=False, expand=True
52 )
53 block_table.add_column(style="italic cyan")
54 block_table.add_column(style="blue")
56 block_table.add_row("Block Type", getattr(block_document.block_type, "name", ""))
57 block_table.add_row("Block id", str(block_document.id), end_section=True)
58 for k, v in block_document.data.items():
59 block_table.add_row(k, str(v))
61 return block_table
64def display_block_type(block_type: "BlockType") -> Table: 1a
65 block_type_table = Table(
66 title=block_type.name, show_header=False, show_footer=False, expand=True
67 )
68 block_type_table.add_column(style="italic cyan")
69 block_type_table.add_column(style="blue")
71 block_type_table.add_row("Slug", block_type.slug)
72 block_type_table.add_row("Block Type id", str(block_type.id))
73 block_type_table.add_row(
74 "Description",
75 (
76 block_type.description.splitlines()[0].partition(".")[0]
77 if block_type.description is not None
78 else ""
79 ),
80 end_section=True,
81 )
83 return block_type_table
86def display_block_schema_properties(block_schema_fields: dict[str, Any]) -> Table: 1a
87 required = block_schema_fields.get("required", [])
88 properties = block_schema_fields.get("properties", {})
90 block_schema_yaml_table = Table(
91 title="Schema Properties",
92 show_header=False,
93 show_footer=False,
94 show_lines=True,
95 expand=True,
96 )
97 block_schema_yaml_table.add_column(style="cyan")
98 block_schema_yaml_table.add_column()
100 for property_name, property_schema in properties.items():
101 if property_name in required:
102 property_schema["required"] = True
104 block_schema_yaml_table.add_row(
105 property_name, yaml.dump(property_schema, default_flow_style=False)
106 )
108 return block_schema_yaml_table
111def display_block_schema_extra_definitions( 1a
112 block_schema_definitions: dict[str, Any],
113) -> Table:
114 extra_definitions_table = Table(
115 title="Extra Definitions", show_header=False, show_footer=False, expand=True
116 )
117 extra_definitions_table.add_column(style="cyan")
118 extra_definitions_table.add_column()
119 extra_definitions_table.add_column()
121 for definition_name, definition_schema in block_schema_definitions.items():
122 for index, (property_name, property_schema) in enumerate(
123 definition_schema.get("properties", {}).items()
124 ):
125 # We'll set the definition column for the first row of each group only
126 # to give visual whitespace between each group
127 extra_definitions_table.add_row(
128 definition_name if index == 0 else None,
129 property_name,
130 yaml.dump(property_schema, default_flow_style=False),
131 )
133 return extra_definitions_table
136async def _register_blocks_in_module(module: ModuleType) -> list[type[Block]]: 1a
137 registered_blocks: list[type[Block]] = []
138 for _, cls in inspect.getmembers(module):
139 if cls is not None and Block.is_block_class(cls):
140 try:
141 coro = cls.register_type_and_schema()
142 if TYPE_CHECKING:
143 assert inspect.isawaitable(coro)
144 await coro
145 registered_blocks.append(cls)
146 except InvalidBlockRegistration:
147 # Attempted to register Block base class or a Block interface
148 pass
149 return registered_blocks
152def _build_registered_blocks_table(registered_blocks: list[type[Block]]) -> Table: 1a
153 table = Table("Registered Blocks")
154 for block in registered_blocks:
155 table.add_row(block.get_block_type_name())
156 return table
159@blocks_app.command() 1a
160async def register( 1a
161 module_name: Optional[str] = typer.Option(
162 None,
163 "--module",
164 "-m",
165 help="Python module containing block types to be registered",
166 ),
167 file_path: Optional[Path] = typer.Option(
168 None,
169 "--file",
170 "-f",
171 help="Path to .py file containing block types to be registered",
172 ),
173):
174 """
175 Register blocks types within a module or file.
177 This makes the blocks available for configuration via the UI.
178 If a block type has already been registered, its registration will be updated to
179 match the block's current definition.
181 \b
182 Examples:
183 \b
184 Register block types in a Python module:
185 $ prefect block register -m prefect_aws.credentials
186 \b
187 Register block types in a .py file:
188 $ prefect block register -f my_blocks.py
189 """
190 # Handles if both options are specified or if neither are specified
191 if not (bool(file_path) ^ bool(module_name)):
192 exit_with_error(
193 "Please specify either a module or a file containing blocks to be"
194 " registered, but not both."
195 )
197 imported_module: Optional[ModuleType] = None
198 if module_name:
199 try:
200 imported_module = import_module(name=module_name)
201 except ModuleNotFoundError:
202 exit_with_error(
203 f"Unable to load {module_name}. Please make sure the module is "
204 "installed in your current environment."
205 )
207 if file_path:
208 if file_path.suffix != ".py":
209 exit_with_error(
210 f"{file_path} is not a .py file. Please specify a "
211 ".py that contains blocks to be registered."
212 )
213 try:
214 imported_module = await run_sync_in_worker_thread(
215 load_script_as_module, str(file_path)
216 )
217 except ScriptError as exc:
218 app.console.print(exc)
219 app.console.print(exception_traceback(exc.user_exc))
220 exit_with_error(
221 f"Unable to load file at {file_path}. Please make sure the file path "
222 "is correct and the file contains valid Python."
223 )
225 if TYPE_CHECKING:
226 assert imported_module is not None
227 registered_blocks = await _register_blocks_in_module(imported_module)
228 number_of_registered_blocks = len(registered_blocks)
230 if number_of_registered_blocks == 0:
231 source = f"module {module_name!r}" if module_name else f"file {file_path!r}"
232 exit_with_error(
233 f"No blocks were registered from {source}.\n\nPlease make sure the {source} "
234 "contains valid blocks.\n"
235 )
237 block_text = "block" if 0 < number_of_registered_blocks < 2 else "blocks"
238 app.console.print(
239 f"[green]Successfully registered {number_of_registered_blocks} {block_text}\n"
240 )
241 app.console.print(_build_registered_blocks_table(registered_blocks))
242 msg = (
243 "\n To configure the newly registered blocks, "
244 "go to the Blocks page in the Prefect UI.\n"
245 )
247 if ui_url := get_current_settings().ui_url:
248 if determine_server_type() == ServerType.CLOUD:
249 block_catalog_url = f"{ui_url}/settings/blocks/catalog"
250 else:
251 block_catalog_url = f"{ui_url}/blocks/catalog"
252 msg = f"{msg.rstrip().rstrip('.')}: {block_catalog_url}\n"
254 app.console.print(msg, soft_wrap=True)
257@blocks_app.command("ls") 1a
258async def block_ls(): 1a
259 """
260 View all configured blocks.
261 """
262 async with get_client() as client:
263 blocks = await client.read_block_documents()
265 table = Table(
266 title="Blocks", caption="List Block Types using `prefect block type ls`"
267 )
268 table.add_column("ID", style="cyan", no_wrap=True)
269 table.add_column("Type", style="blue", no_wrap=True)
270 table.add_column("Name", style="blue", no_wrap=True)
271 table.add_column("Slug", style="blue", no_wrap=True)
273 for block in sorted(
274 blocks, key=lambda x: f"{getattr(x.block_type, 'slug', '')}/{x.name}"
275 ):
276 table.add_row(
277 str(block.id),
278 getattr(block.block_type, "name", ""),
279 str(block.name),
280 f"{getattr(block.block_type, 'slug', '')}/{block.name}",
281 )
283 app.console.print(table)
286@blocks_app.command("delete") 1a
287async def block_delete( 1a
288 slug: Optional[str] = typer.Argument(
289 None, help="A block slug. Formatted as '<BLOCK_TYPE_SLUG>/<BLOCK_NAME>'"
290 ),
291 block_id: Optional[UUID] = typer.Option(None, "--id", help="A block id."),
292):
293 """
294 Delete a configured block.
295 """
296 async with get_client() as client:
297 if slug is None and block_id is not None:
298 try:
299 if is_interactive() and not typer.confirm(
300 (f"Are you sure you want to delete block with id {block_id!r}?"),
301 default=False,
302 ):
303 exit_with_error("Deletion aborted.")
304 await client.delete_block_document(block_id)
305 exit_with_success(f"Deleted Block '{block_id}'.")
306 except ObjectNotFound:
307 exit_with_error(f"Block {block_id!r} not found!")
308 elif slug is not None:
309 if "/" not in slug:
310 exit_with_error(
311 f"{slug!r} is not valid. Slug must contain a '/', e.g. 'json/my-json-block'"
312 )
313 block_type_slug, block_document_name = slug.split("/")
314 try:
315 block_document = await client.read_block_document_by_name(
316 block_document_name, block_type_slug, include_secrets=False
317 )
318 if is_interactive() and not typer.confirm(
319 (f"Are you sure you want to delete block with slug {slug!r}?"),
320 default=False,
321 ):
322 exit_with_error("Deletion aborted.")
323 await client.delete_block_document(block_document.id)
324 exit_with_success(f"Deleted Block '{slug}'.")
325 except ObjectNotFound:
326 exit_with_error(f"Block {slug!r} not found!")
327 else:
328 exit_with_error("Must provide a block slug or id")
331@blocks_app.command("create") 1a
332async def block_create( 1a
333 block_type_slug: str = typer.Argument(
334 ...,
335 help="A block type slug. View available types with: prefect block type ls",
336 show_default=False,
337 ),
338):
339 """
340 Generate a link to the Prefect UI to create a block.
341 """
342 async with get_client() as client:
343 try:
344 block_type = await client.read_block_type_by_slug(block_type_slug)
345 except ObjectNotFound:
346 app.console.print(f"[red]Block type {block_type_slug!r} not found![/red]")
347 block_types = await client.read_block_types()
348 slugs = {block_type.slug for block_type in block_types}
349 app.console.print(f"Available block types: {', '.join(slugs)}")
350 raise typer.Exit(1)
352 ui_url = get_current_settings().ui_url
353 if not ui_url:
354 exit_with_error(
355 "Prefect must be configured to use a hosted Prefect server or "
356 "Prefect Cloud to display the Prefect UI"
357 )
358 if determine_server_type() == ServerType.CLOUD:
359 block_link = f"{ui_url}/settings/blocks/catalog/{block_type.slug}/create"
360 else:
361 block_link = f"{ui_url}/blocks/catalog/{block_type.slug}/create"
362 app.console.print(
363 f"Create a {block_type_slug} block: {block_link}",
364 )
367@blocks_app.command("inspect") 1a
368async def block_inspect( 1a
369 slug: Optional[str] = typer.Argument(
370 None, help="A Block slug: <BLOCK_TYPE_SLUG>/<BLOCK_NAME>"
371 ),
372 block_id: Optional[UUID] = typer.Option(
373 None, "--id", help="A Block id to search for if no slug is given"
374 ),
375):
376 """
377 Displays details about a configured block.
378 """
379 async with get_client() as client:
380 if slug is None and block_id is not None:
381 try:
382 block_document = await client.read_block_document(
383 block_id, include_secrets=False
384 )
385 except ObjectNotFound:
386 exit_with_error(f"Block {block_id!r} not found!")
387 elif slug is not None:
388 if "/" not in slug:
389 exit_with_error(
390 f"{slug!r} is not valid. Slug must contain a '/', e.g. 'json/my-json-block'"
391 )
392 block_type_slug, block_document_name = slug.split("/")
393 try:
394 block_document = await client.read_block_document_by_name(
395 block_document_name, block_type_slug, include_secrets=False
396 )
397 except ObjectNotFound:
398 exit_with_error(f"Block {slug!r} not found!")
399 else:
400 exit_with_error("Must provide a block slug or id")
401 app.console.print(display_block(block_document))
404@blocktypes_app.command("ls") 1a
405async def list_types(): 1a
406 """
407 List all block types.
408 """
409 async with get_client() as client:
410 block_types = await client.read_block_types()
412 table = Table(
413 title="Block Types",
414 show_lines=True,
415 )
417 table.add_column("Block Type Slug", style="italic cyan", no_wrap=True)
418 table.add_column("Description", style="blue", no_wrap=False, justify="left")
419 table.add_column(
420 "Generate creation link", style="italic cyan", no_wrap=False, justify="left"
421 )
423 for blocktype in sorted(block_types, key=lambda x: x.name):
424 table.add_row(
425 str(blocktype.slug),
426 (
427 str(blocktype.description.splitlines()[0].partition(".")[0])
428 if blocktype.description is not None
429 else ""
430 ),
431 f"prefect block create {blocktype.slug}",
432 )
434 app.console.print(table)
437@blocktypes_app.command("inspect") 1a
438async def blocktype_inspect( 1a
439 slug: str = typer.Argument(..., help="A block type slug"),
440):
441 """
442 Display details about a block type.
443 """
444 async with get_client() as client:
445 try:
446 block_type = await client.read_block_type_by_slug(slug)
447 except ObjectNotFound:
448 exit_with_error(f"Block type {slug!r} not found!")
450 app.console.print(display_block_type(block_type))
452 try:
453 latest_schema = await client.get_most_recent_block_schema_for_block_type(
454 block_type.id
455 )
456 except Exception:
457 exit_with_error(f"Failed to fetch latest schema for the {slug} block type")
459 if latest_schema is None:
460 exit_with_error(f"No schema found for the {slug} block type")
462 app.console.print(display_block_schema_properties(latest_schema.fields))
464 latest_schema_extra_definitions = latest_schema.fields.get("definitions")
465 if latest_schema_extra_definitions:
466 app.console.print(
467 display_block_schema_extra_definitions(latest_schema_extra_definitions)
468 )
471@blocktypes_app.command("delete") 1a
472async def blocktype_delete( 1a
473 slug: str = typer.Argument(..., help="A Block type slug"),
474):
475 """
476 Delete an unprotected Block Type.
477 """
478 async with get_client() as client:
479 try:
480 block_type = await client.read_block_type_by_slug(slug)
481 if is_interactive() and not typer.confirm(
482 (f"Are you sure you want to delete block type {block_type.slug!r}?"),
483 default=False,
484 ):
485 exit_with_error("Deletion aborted.")
486 await client.delete_block_type(block_type.id)
487 exit_with_success(f"Deleted Block Type '{slug}'.")
488 except ObjectNotFound:
489 exit_with_error(f"Block Type {slug!r} not found!")
490 except ProtectedBlockError:
491 exit_with_error(f"Block Type {slug!r} is a protected block!")
492 except PrefectHTTPStatusError:
493 exit_with_error(f"Cannot delete Block Type {slug!r}!")