Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/core.py: 43%
586 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import hashlib 1a
4import html 1a
5import inspect 1a
6import sys 1a
7import types 1a
8import uuid 1a
9import warnings 1a
10from abc import ABC 1a
11from functools import partial 1a
12from textwrap import dedent 1a
13from typing import ( 1a
14 TYPE_CHECKING,
15 Any,
16 ClassVar,
17 Coroutine,
18 FrozenSet,
19 Optional,
20 TypeVar,
21 Union,
22 cast,
23 get_origin,
24)
25from uuid import UUID, uuid4 1a
27from griffe import Docstring, DocstringSection, DocstringSectionKind, Parser, parse 1a
28from packaging.version import InvalidVersion, Version 1a
29from pydantic import ( 1a
30 BaseModel,
31 ConfigDict,
32 HttpUrl,
33 PrivateAttr,
34 SecretBytes,
35 SecretStr,
36 SerializationInfo,
37 SerializerFunctionWrapHandler,
38 ValidationError,
39 model_serializer,
40 model_validator,
41)
42from pydantic.json_schema import GenerateJsonSchema 1a
43from typing_extensions import Literal, ParamSpec, Self, TypeGuard, get_args 1a
45import prefect.exceptions 1a
46from prefect._internal.compatibility.async_dispatch import async_dispatch 1a
47from prefect.client.schemas import ( 1a
48 DEFAULT_BLOCK_SCHEMA_VERSION,
49 BlockDocument,
50 BlockSchema,
51 BlockType,
52 BlockTypeUpdate,
53)
54from prefect.client.schemas.actions import ( 1a
55 BlockDocumentCreate,
56 BlockDocumentUpdate,
57 BlockSchemaCreate,
58 BlockTypeCreate,
59)
60from prefect.client.utilities import inject_client 1a
61from prefect.events import emit_event 1a
62from prefect.logging.loggers import disable_logger 1a
63from prefect.plugins import load_prefect_collections 1a
64from prefect.types import SecretDict 1a
65from prefect.utilities.asyncutils import run_coro_as_sync, sync_compatible 1a
66from prefect.utilities.collections import listrepr, remove_nested_keys, visit_collection 1a
67from prefect.utilities.dispatch import lookup_type, register_base_type 1a
68from prefect.utilities.hashing import hash_objects 1a
69from prefect.utilities.importtools import to_qualified_name 1a
70from prefect.utilities.pydantic import handle_secret_render 1a
71from prefect.utilities.slugify import slugify 1a
73if TYPE_CHECKING: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true1a
74 from pydantic.main import IncEx
76 from prefect.client.orchestration import PrefectClient, SyncPrefectClient
78R = TypeVar("R") 1a
79P = ParamSpec("P") 1a
81ResourceTuple = tuple[dict[str, Any], list[dict[str, Any]]] 1a
82UnionTypes: tuple[object, ...] = (Union,) 1a
83if hasattr(types, "UnionType"): 83 ↛ 86line 83 didn't jump to line 86 because the condition on line 83 was always true1a
84 # Python 3.10+ only
85 UnionTypes = (*UnionTypes, types.UnionType) 1a
86NestedTypes: tuple[type, ...] = (list, dict, tuple, *UnionTypes) 1a
89def block_schema_to_key(schema: BlockSchema) -> str: 1a
90 """
91 Defines the unique key used to lookup the Block class for a given schema.
92 """
93 if schema.block_type is None: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true1ab
94 raise ValueError("Block type is not set")
95 return f"{schema.block_type.slug}" 1ab
98class InvalidBlockRegistration(Exception): 1a
99 """
100 Raised on attempted registration of the base Block
101 class or a Block interface class
102 """
105class UnknownBlockType(Exception): 1a
106 """
107 Raised when a block type is not found in the registry.
108 """
111def _collect_nested_reference_strings( 1a
112 obj: dict[str, Any] | list[Any],
113) -> list[dict[str, Any]]:
114 """
115 Collects all nested reference strings (e.g. #/definitions/Model) from a given object.
116 """
117 found_reference_strings: list[dict[str, Any]] = [] 1ab
118 if isinstance(obj, dict): 1ab
119 if ref := obj.get("$ref"): 1ab
120 found_reference_strings.append(ref) 1ab
121 for value in obj.values(): 1ab
122 found_reference_strings.extend(_collect_nested_reference_strings(value)) 1ab
123 if isinstance(obj, list): 1ab
124 for item in obj: 1ab
125 found_reference_strings.extend(_collect_nested_reference_strings(item)) 1ab
126 return found_reference_strings 1ab
129def _get_non_block_reference_definitions( 1a
130 object_definition: dict[str, Any], definitions: dict[str, Any]
131) -> dict[str, Any]:
132 """
133 Given a definition of an object in a block schema OpenAPI spec and the dictionary
134 of all reference definitions in that same block schema OpenAPI spec, return the
135 definitions for objects that are referenced from the object or any children of
136 the object that do not reference a block.
137 """
138 non_block_definitions: dict[str, Any] = {} 1ab
139 reference_strings = _collect_nested_reference_strings(object_definition) 1ab
140 for reference_string in reference_strings: 1ab
141 if isinstance(reference_string, str): 141 ↛ 140line 141 didn't jump to line 140 because the condition on line 141 was always true1ab
142 definition_key = reference_string.replace("#/definitions/", "") 1ab
143 definition = definitions.get(definition_key) 1ab
144 if definition and definition.get("block_type_slug") is None: 1ab
145 non_block_definitions = { 1b
146 **non_block_definitions,
147 definition_key: definition,
148 **_get_non_block_reference_definitions(definition, definitions),
149 }
150 return non_block_definitions 1ab
153def _is_subclass(cls: type, parent_cls: type) -> TypeGuard[type[BaseModel]]: 1a
154 """
155 Checks if a given class is a subclass of another class. Unlike issubclass,
156 this will not throw an exception if cls is an instance instead of a type.
157 """
158 # For python<=3.11 inspect.isclass() will return True for parametrized types (e.g. list[str])
159 # so we need to check for get_origin() to avoid TypeError for issubclass.
160 return inspect.isclass(cls) and not get_origin(cls) and issubclass(cls, parent_cls) 1ab
163def _collect_secret_fields( 1a
164 name: str,
165 type_: type[BaseModel] | type[SecretStr] | type[SecretBytes] | type[SecretDict],
166 secrets: list[str],
167) -> None:
168 """
169 Recursively collects all secret fields from a given type and adds them to the
170 secrets list, supporting nested Union / Dict / Tuple / List / BaseModel fields.
171 Also, note, this function mutates the input secrets list, thus does not return anything.
172 """
173 if get_origin(type_) in NestedTypes: 1ab
174 for nested_type in get_args(type_): 1ab
175 _collect_secret_fields(name, nested_type, secrets) 1ab
176 return 1ab
177 elif _is_subclass(type_, BaseModel): 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true1ab
178 for field_name, field in type_.model_fields.items():
179 if field.annotation is not None:
180 _collect_secret_fields(
181 f"{name}.{field_name}", field.annotation, secrets
182 )
183 return
185 # Check if this is a pydantic Secret type (including generic Secret[T])
186 is_pydantic_secret = False 1ab
188 # Direct check for SecretStr, SecretBytes
189 if type_ in (SecretStr, SecretBytes): 1ab
190 is_pydantic_secret = True 1ab
191 # Check for base Secret class
192 elif ( 192 ↛ 197line 192 didn't jump to line 197 because the condition on line 192 was never true
193 isinstance(type_, type) # type: ignore[unnecessaryIsInstance]
194 and getattr(type_, "__module__", None) == "pydantic.types"
195 and getattr(type_, "__name__", None) == "Secret"
196 ):
197 is_pydantic_secret = True
198 # Check for generic Secret[T] (e.g., Secret[str], Secret[int])
199 elif get_origin(type_) is not None: 1ab
200 origin = get_origin(type_) 1ab
201 if (
202 getattr(origin, "__module__", None) == "pydantic.types"
203 and getattr(origin, "__name__", None) == "Secret"
204 ):
205 is_pydantic_secret = True 1ab
207 if is_pydantic_secret: 1ab
208 secrets.append(name) 1ab
209 elif type_ == SecretDict: 1ab
210 # Append .* to field name to signify that all values under a given key are secret and should be obfuscated.
211 secrets.append(f"{name}.*") 1ab
212 elif Block.is_block_class(type_): 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true1ab
213 secrets.extend(
214 f"{name}.{s}" for s in type_.model_json_schema()["secret_fields"]
215 )
218def _should_update_block_type( 1a
219 local_block_type: BlockType, server_block_type: BlockType
220) -> bool:
221 """
222 Compares the fields of `local_block_type` and `server_block_type`.
223 Only compare the possible updatable fields as defined by `BlockTypeUpdate.updatable_fields`
224 Returns True if they are different, otherwise False.
225 """
226 fields = BlockTypeUpdate.updatable_fields()
228 local_block_fields = local_block_type.model_dump(include=fields, exclude_unset=True)
229 server_block_fields = server_block_type.model_dump(
230 include=fields, exclude_unset=True
231 )
233 if local_block_fields.get("description") is not None:
234 local_block_fields["description"] = html.unescape(
235 local_block_fields["description"]
236 )
237 if local_block_fields.get("code_example") is not None:
238 local_block_fields["code_example"] = html.unescape(
239 local_block_fields["code_example"]
240 )
242 if server_block_fields.get("description") is not None:
243 server_block_fields["description"] = html.unescape(
244 server_block_fields["description"]
245 )
246 if server_block_fields.get("code_example") is not None:
247 server_block_fields["code_example"] = html.unescape(
248 server_block_fields["code_example"]
249 )
251 return server_block_fields != local_block_fields
254class BlockNotSavedError(RuntimeError): 1a
255 """
256 Raised when a given block is not saved and an operation that requires
257 the block to be saved is attempted.
258 """
260 pass 1a
263def schema_extra(schema: dict[str, Any], model: type["Block"]) -> None: 1a
264 """
265 Customizes Pydantic's schema generation feature to add blocks related information.
266 """
267 schema["block_type_slug"] = model.get_block_type_slug() 1ab
268 # Ensures args and code examples aren't included in the schema
269 description = model.get_description() 1ab
270 if description: 270 ↛ 274line 270 didn't jump to line 274 because the condition on line 270 was always true1ab
271 schema["description"] = description 1ab
272 else:
273 # Prevent the description of the base class from being included in the schema
274 schema.pop("description", None)
276 # create a list of secret field names
277 # secret fields include both top-level keys and dot-delimited nested secret keys
278 # A wildcard (*) means that all fields under a given key are secret.
279 # for example: ["x", "y", "z.*", "child.a"]
280 # means the top-level keys "x" and "y", all keys under "z", and the key "a" of a block
281 # nested under the "child" key are all secret. There is no limit to nesting.
282 secrets: list[str] = [] 1ab
283 for name, field in model.model_fields.items(): 1ab
284 if field.annotation is not None: 284 ↛ 283line 284 didn't jump to line 283 because the condition on line 284 was always true1ab
285 _collect_secret_fields(name, field.annotation, secrets) 1ab
286 schema["secret_fields"] = secrets 1ab
288 # create block schema references
289 refs: dict[str, Any] = {} 1ab
291 def collect_block_schema_references(field_name: str, annotation: type) -> None: 1ab
292 """Walk through the annotation and collect block schemas for any nested blocks."""
293 if Block.is_block_class(annotation): 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true1ab
294 if isinstance(refs.get(field_name), list):
295 refs[field_name].append(annotation._to_block_schema_reference_dict()) # pyright: ignore[reportPrivateUsage]
296 elif isinstance(refs.get(field_name), dict):
297 refs[field_name] = [
298 refs[field_name],
299 annotation._to_block_schema_reference_dict(), # pyright: ignore[reportPrivateUsage]
300 ]
301 else:
302 refs[field_name] = annotation._to_block_schema_reference_dict() # pyright: ignore[reportPrivateUsage]
304 if get_origin(annotation) in NestedTypes: 1ab
305 for type_ in get_args(annotation): 1ab
306 collect_block_schema_references(field_name, type_) 1ab
308 for name, field in model.model_fields.items(): 1ab
309 if field.annotation is not None: 309 ↛ 308line 309 didn't jump to line 308 because the condition on line 309 was always true1ab
310 collect_block_schema_references(name, field.annotation) 1ab
311 schema["block_schema_references"] = refs 1ab
314@register_base_type 1a
315class Block(BaseModel, ABC): 1a
316 """
317 A base class for implementing a block that wraps an external service.
319 This class can be defined with an arbitrary set of fields and methods, and
320 couples business logic with data contained in an block document.
321 `_block_document_name`, `_block_document_id`, `_block_schema_id`, and
322 `_block_type_id` are reserved by Prefect as Block metadata fields, but
323 otherwise a Block can implement arbitrary logic. Blocks can be instantiated
324 without populating these metadata fields, but can only be used interactively,
325 not with the Prefect API.
327 Instead of the __init__ method, a block implementation allows the
328 definition of a `block_initialization` method that is called after
329 initialization.
330 """
332 model_config: ClassVar[ConfigDict] = ConfigDict( 1a
333 extra="allow",
334 json_schema_extra=schema_extra,
335 )
337 def __new__(cls: type[Self], **kwargs: Any) -> Self: 1a
338 """
339 Create an instance of the Block subclass type if a `block_type_slug` is
340 present in the data payload.
341 """
342 if block_type_slug := kwargs.pop("block_type_slug", None):
343 subcls = cls.get_block_class_from_key(block_type_slug)
344 if cls is not subcls and issubclass(subcls, cls):
345 return super().__new__(subcls)
346 return super().__new__(cls)
348 def __init__(self, *args: Any, **kwargs: Any): 1a
349 super().__init__(*args, **kwargs)
350 self.block_initialization()
352 def __str__(self) -> str: 1a
353 return self.__repr__()
355 def __repr_args__(self) -> list[tuple[str | None, Any]]: 1a
356 repr_args = super().__repr_args__()
357 data_keys = self.model_json_schema()["properties"].keys()
358 return [
359 (key, value) for key, value in repr_args if key is None or key in data_keys
360 ]
362 @model_validator(mode="before") 1a
363 @classmethod 1a
364 def validate_block_type_slug(cls, values: Any) -> Any: 1a
365 """
366 Validates that the `block_type_slug` in the input values matches the expected
367 block type slug for the class. This helps pydantic to correctly discriminate
368 between different Block subclasses when validating Union types of Blocks.
369 """
370 if isinstance(values, dict):
371 if "block_type_slug" in values:
372 expected_slug = cls.get_block_type_slug()
373 slug = values["block_type_slug"]
374 if slug and slug != expected_slug:
375 raise ValueError(
376 f"Invalid block_type_slug: expected '{expected_slug}', got '{values['block_type_slug']}'"
377 )
378 return values
380 def block_initialization(self) -> None: 1a
381 pass
383 # -- private class variables
384 # set by the class itself
386 # Attribute to customize the name of the block type created
387 # when the block is registered with the API. If not set, block
388 # type name will default to the class name.
389 _block_type_name: ClassVar[Optional[str]] = None 1a
390 _block_type_slug: ClassVar[Optional[str]] = None 1a
392 # Attributes used to set properties on a block type when registered
393 # with the API.
394 _logo_url: ClassVar[Optional[HttpUrl]] = None 1a
395 _documentation_url: ClassVar[Optional[HttpUrl]] = None 1a
396 _description: ClassVar[Optional[str]] = None 1a
397 _code_example: ClassVar[Optional[str]] = None 1a
398 _block_type_id: ClassVar[Optional[UUID]] = None 1a
399 _block_schema_id: ClassVar[Optional[UUID]] = None 1a
400 _block_schema_capabilities: ClassVar[Optional[list[str]]] = None 1a
401 _block_schema_version: ClassVar[Optional[str]] = None 1a
403 # -- private instance variables
404 # these are set when blocks are loaded from the API
406 _block_document_id: Optional[UUID] = PrivateAttr(None) 1a
407 _block_document_name: Optional[str] = PrivateAttr(None) 1a
408 _is_anonymous: Optional[bool] = PrivateAttr(None) 1a
410 # Exclude `save` as it uses the `sync_compatible` decorator and needs to be
411 # decorated directly.
412 _events_excluded_methods: ClassVar[list[str]] = PrivateAttr( 1a
413 default=["block_initialization", "save", "dict"]
414 )
416 @classmethod 1a
417 def __dispatch_key__(cls) -> str | None: 1a
418 if cls.__name__ == "Block": 1ab
419 return None # The base class is abstract 1a
420 return block_schema_to_key(cls._to_block_schema()) 1ab
422 @model_serializer(mode="wrap") 1a
423 def ser_model( 1a
424 self, handler: SerializerFunctionWrapHandler, info: SerializationInfo
425 ) -> Any:
426 jsonable_self = handler(self)
427 if (ctx := info.context) and ctx.get("include_secrets") is True:
428 # Add serialization mode to context so handle_secret_render knows how to process nested models
429 ctx["serialization_mode"] = info.mode
431 for field_name in type(self).model_fields:
432 field_value = getattr(self, field_name)
434 # In JSON mode, skip fields that don't contain secrets
435 # as they're already properly serialized by the handler
436 if (
437 info.mode == "json"
438 and field_name in jsonable_self
439 and not self._field_has_secrets(field_name)
440 ):
441 continue
443 # For all other fields, use visit_collection with handle_secret_render
444 jsonable_self[field_name] = visit_collection(
445 expr=field_value,
446 visit_fn=partial(handle_secret_render, context=ctx),
447 return_data=True,
448 )
449 extra_fields = {
450 "block_type_slug": self.get_block_type_slug(),
451 "_block_document_id": self._block_document_id,
452 "_block_document_name": self._block_document_name,
453 "_is_anonymous": self._is_anonymous,
454 }
455 jsonable_self |= {
456 key: value for key, value in extra_fields.items() if value is not None
457 }
458 return jsonable_self
460 @classmethod 1a
461 def get_block_type_name(cls) -> str: 1a
462 return cls._block_type_name or cls.__name__ 1ab
464 @classmethod 1a
465 def get_block_type_slug(cls) -> str: 1a
466 return slugify(cls._block_type_slug or cls.get_block_type_name()) 1ab
468 @classmethod 1a
469 def get_block_capabilities(cls) -> FrozenSet[str]: 1a
470 """
471 Returns the block capabilities for this Block. Recursively collects all block
472 capabilities of all parent classes into a single frozenset.
473 """
474 return frozenset( 1ab
475 {
476 c
477 for base in (cls,) + cls.__mro__
478 for c in getattr(base, "_block_schema_capabilities", []) or []
479 }
480 )
482 @classmethod 1a
483 def _get_current_package_version(cls): 1a
484 current_module = inspect.getmodule(cls) 1ab
485 if current_module: 485 ↛ 496line 485 didn't jump to line 496 because the condition on line 485 was always true1ab
486 top_level_module = sys.modules[ 1ab
487 current_module.__name__.split(".")[0] or "__main__"
488 ]
489 try: 1ab
490 version = Version(top_level_module.__version__) 1ab
491 # Strips off any local version information
492 return version.base_version 1ab
493 except (AttributeError, InvalidVersion): 1b
494 # Module does not have a __version__ attribute or is not a parsable format
495 pass 1b
496 return DEFAULT_BLOCK_SCHEMA_VERSION 1b
498 @classmethod 1a
499 def get_block_schema_version(cls) -> str: 1a
500 return cls._block_schema_version or cls._get_current_package_version() 1ab
502 @classmethod 1a
503 def _to_block_schema_reference_dict(cls): 1a
504 return dict(
505 block_type_slug=cls.get_block_type_slug(),
506 block_schema_checksum=cls._calculate_schema_checksum(),
507 )
509 @classmethod 1a
510 def _calculate_schema_checksum( 1a
511 cls, block_schema_fields: dict[str, Any] | None = None
512 ):
513 """
514 Generates a unique hash for the underlying schema of block.
516 Args:
517 block_schema_fields: Dictionary detailing block schema fields to generate a
518 checksum for. The fields of the current class is used if this parameter
519 is not provided.
521 Returns:
522 str: The calculated checksum prefixed with the hashing algorithm used.
523 """
524 block_schema_fields = ( 1ab
525 cls.model_json_schema()
526 if block_schema_fields is None
527 else block_schema_fields
528 )
529 fields_for_checksum = remove_nested_keys(["secret_fields"], block_schema_fields) 1ab
530 if fields_for_checksum.get("definitions"): 1ab
531 non_block_definitions = _get_non_block_reference_definitions( 1ab
532 fields_for_checksum, fields_for_checksum["definitions"]
533 )
534 if non_block_definitions: 1ab
535 fields_for_checksum["definitions"] = non_block_definitions 1b
536 else:
537 # Pop off definitions entirely instead of empty dict for consistency
538 # with the OpenAPI specification
539 fields_for_checksum.pop("definitions") 1ab
540 checksum = hash_objects(fields_for_checksum, hash_algo=hashlib.sha256) 1ab
541 if checksum is None: 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true1ab
542 raise ValueError("Unable to compute checksum for block schema")
543 else:
544 return f"sha256:{checksum}" 1ab
546 def _field_has_secrets(self, field_name: str) -> bool: 1a
547 """Check if a field contains secrets based on the schema's secret_fields."""
548 secret_fields = self.model_json_schema().get("secret_fields", [])
550 # Check if field_name matches any secret field pattern
551 for secret_field in secret_fields:
552 if secret_field == field_name:
553 return True
554 elif secret_field.startswith(f"{field_name}."):
555 # This field contains nested secrets
556 return True
557 elif secret_field.endswith(".*"):
558 # Handle wildcard patterns like "field.*"
559 prefix = secret_field[:-2] # Remove .*
560 if field_name == prefix:
561 return True
563 return False
565 def _to_block_document( 1a
566 self,
567 name: Optional[str] = None,
568 block_schema_id: Optional[UUID] = None,
569 block_type_id: Optional[UUID] = None,
570 is_anonymous: Optional[bool] = None,
571 include_secrets: bool = False,
572 ) -> BlockDocument:
573 """
574 Creates the corresponding block document based on the data stored in a block.
575 The corresponding block document name, block type ID, and block schema ID must
576 either be passed into the method or configured on the block.
578 Args:
579 name: The name of the created block document. Not required if anonymous.
580 block_schema_id: UUID of the corresponding block schema.
581 block_type_id: UUID of the corresponding block type.
582 is_anonymous: if True, an anonymous block is created. Anonymous
583 blocks are not displayed in the UI and used primarily for system
584 operations and features that need to automatically generate blocks.
586 Returns:
587 BlockDocument: Corresponding block document
588 populated with the block's configured data.
589 """
590 if is_anonymous is None:
591 is_anonymous = self._is_anonymous or False
593 # name must be present if not anonymous
594 if not is_anonymous and not name and not self._block_document_name:
595 raise ValueError("No name provided, either as an argument or on the block.")
597 if not block_schema_id and not self._block_schema_id:
598 raise ValueError(
599 "No block schema ID provided, either as an argument or on the block."
600 )
601 if not block_type_id and not self._block_type_id:
602 raise ValueError(
603 "No block type ID provided, either as an argument or on the block."
604 )
606 # The keys passed to `include` must NOT be aliases, else some items will be missed
607 # i.e. must do `self.schema_` vs `self.schema` to get a `schema_ = Field(alias="schema")`
608 # reported from https://github.com/PrefectHQ/prefect-dbt/issues/54
609 data_keys = self.model_json_schema(by_alias=False)["properties"].keys()
611 # `block_document_data`` must return the aliased version for it to show in the UI
612 block_document_data = self.model_dump(
613 by_alias=True,
614 include=data_keys,
615 context={"include_secrets": include_secrets},
616 )
618 # Ensure non-secret fields are JSON-serializable to avoid issues with types
619 # like SemanticVersion when the BlockDocument is later serialized
620 try:
621 json_data = self.model_dump(
622 mode="json",
623 by_alias=True,
624 include=data_keys,
625 context={"include_secrets": include_secrets},
626 )
627 # Replace non-secret, non-Block fields with their JSON representation
628 # We need to check the original field to determine if it's a secret or Block
629 for key in data_keys:
630 if key in block_document_data and key in json_data:
631 field_value = getattr(self, key)
632 # Only replace if the field doesn't contain secrets and is not a Block
633 if not self._field_has_secrets(key) and not isinstance(
634 field_value, Block
635 ):
636 block_document_data[key] = json_data[key]
637 except Exception:
638 # If JSON serialization fails, we'll handle it later
639 pass
641 # Iterate through and find blocks that already have saved block documents to
642 # create references to those saved block documents.
643 for key in data_keys:
644 field_value = getattr(self, key)
645 if (
646 isinstance(field_value, Block)
647 and field_value._block_document_id is not None
648 ):
649 block_document_data[key] = {
650 "$ref": {"block_document_id": field_value._block_document_id}
651 }
653 block_schema_id = block_schema_id or self._block_schema_id
654 block_type_id = block_type_id or self._block_type_id
656 if block_schema_id is None:
657 raise ValueError(
658 "No block schema ID provided, either as an argument or on the block."
659 )
660 if block_type_id is None:
661 raise ValueError(
662 "No block type ID provided, either as an argument or on the block."
663 )
665 return BlockDocument(
666 id=self._block_document_id or uuid4(),
667 name=(name or self._block_document_name) if not is_anonymous else None,
668 block_schema_id=block_schema_id,
669 block_type_id=block_type_id,
670 block_type_name=self._block_type_name,
671 data=block_document_data,
672 block_schema=self._to_block_schema(
673 block_type_id=block_type_id or self._block_type_id,
674 ),
675 block_type=self._to_block_type(),
676 is_anonymous=is_anonymous,
677 )
679 @classmethod 1a
680 def _to_block_schema(cls, block_type_id: Optional[UUID] = None) -> BlockSchema: 1a
681 """
682 Creates the corresponding block schema of the block.
683 The corresponding block_type_id must either be passed into
684 the method or configured on the block.
686 Args:
687 block_type_id: UUID of the corresponding block type.
689 Returns:
690 BlockSchema: The corresponding block schema.
691 """
692 fields = cls.model_json_schema() 1ab
693 return BlockSchema( 1ab
694 id=cls._block_schema_id if cls._block_schema_id is not None else uuid4(),
695 checksum=cls._calculate_schema_checksum(),
696 fields=fields,
697 block_type_id=block_type_id or cls._block_type_id,
698 block_type=cls._to_block_type(),
699 capabilities=list(cls.get_block_capabilities()),
700 version=cls.get_block_schema_version(),
701 )
703 @classmethod 1a
704 def _parse_docstring(cls) -> list[DocstringSection]: 1a
705 """
706 Parses the docstring into list of DocstringSection objects.
707 Helper method used primarily to suppress irrelevant logs, e.g.
708 `<module>:11: No type or annotation for parameter 'write_json'`
709 because griffe is unable to parse the types from pydantic.BaseModel.
710 """
711 if cls.__doc__ is None: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true1ab
712 return []
713 with disable_logger("griffe"): 1ab
714 docstring = Docstring(cls.__doc__) 1ab
715 parsed = parse(docstring, Parser.google) 1ab
716 return parsed 1ab
718 @classmethod 1a
719 def get_description(cls) -> Optional[str]: 1a
720 """
721 Returns the description for the current block. Attempts to parse
722 description from class docstring if an override is not defined.
723 """
724 description = cls._description 1ab
725 # If no description override has been provided, find the first text section
726 # and use that as the description
727 if description is None and cls.__doc__ is not None: 1ab
728 parsed = cls._parse_docstring() 1ab
729 parsed_description = next( 1ab
730 (
731 section.as_dict().get("value")
732 for section in parsed
733 if section.kind == DocstringSectionKind.text
734 ),
735 None,
736 )
737 if isinstance(parsed_description, str): 737 ↛ 739line 737 didn't jump to line 739 because the condition on line 737 was always true1ab
738 description = parsed_description.strip() 1ab
739 return description 1ab
741 @classmethod 1a
742 def get_code_example(cls) -> Optional[str]: 1a
743 """
744 Returns the code example for the given block. Attempts to parse
745 code example from the class docstring if an override is not provided.
746 """
747 code_example = ( 1ab
748 dedent(text=cls._code_example) if cls._code_example is not None else None
749 )
750 # If no code example override has been provided, attempt to find a examples
751 # section or an admonition with the annotation "example" and use that as the
752 # code example
753 if code_example is None and cls.__doc__ is not None: 753 ↛ 772line 753 didn't jump to line 772 because the condition on line 753 was always true1ab
754 parsed = cls._parse_docstring() 1ab
755 for section in parsed: 1ab
756 # Section kind will be "examples" if Examples section heading is used.
757 if section.kind == DocstringSectionKind.examples: 1ab
758 # Examples sections are made up of smaller sections that need to be
759 # joined with newlines. Smaller sections are represented as tuples
760 # with shape (DocstringSectionKind, str)
761 code_example = "\n".join( 1ab
762 (part[1] for part in section.as_dict().get("value", []))
763 )
764 break 1ab
765 # Section kind will be "admonition" if Example section heading is used.
766 if section.kind == DocstringSectionKind.admonition: 1ab
767 value = section.as_dict().get("value", {}) 1ab
768 if value.get("annotation") == "example": 768 ↛ 755line 768 didn't jump to line 755 because the condition on line 768 was always true1ab
769 code_example = value.get("description") 1ab
770 break 1ab
772 if code_example is None: 1ab
773 # If no code example has been specified or extracted from the class
774 # docstring, generate a sensible default
775 code_example = cls._generate_code_example() 1ab
777 return code_example 1ab
779 @classmethod 1a
780 def _generate_code_example(cls) -> str: 1a
781 """Generates a default code example for the current class"""
782 qualified_name = to_qualified_name(cls) 1ab
783 module_str = ".".join(qualified_name.split(".")[:-1]) 1ab
784 origin = cls.__pydantic_generic_metadata__.get("origin") or cls 1ab
785 class_name = origin.__name__ 1ab
786 block_variable_name = f"{cls.get_block_type_slug().replace('-', '_')}_block" 1ab
788 return dedent( 1ab
789 f"""\
790 ```python
791 from {module_str} import {class_name}
793 {block_variable_name} = {class_name}.load("BLOCK_NAME")
794 ```"""
795 )
797 @classmethod 1a
798 def _to_block_type(cls) -> BlockType: 1a
799 """
800 Creates the corresponding block type of the block.
802 Returns:
803 BlockType: The corresponding block type.
804 """
805 return BlockType( 1ab
806 id=cls._block_type_id or uuid4(),
807 slug=cls.get_block_type_slug(),
808 name=cls.get_block_type_name(),
809 logo_url=cls._logo_url,
810 documentation_url=cls._documentation_url,
811 description=cls.get_description(),
812 code_example=cls.get_code_example(),
813 )
815 @classmethod 1a
816 def _from_block_document(cls, block_document: BlockDocument) -> Self: 1a
817 """
818 Instantiates a block from a given block document. The corresponding block class
819 will be looked up in the block registry based on the corresponding block schema
820 of the provided block document.
822 Args:
823 block_document: The block document used to instantiate a block.
825 Raises:
826 ValueError: If the provided block document doesn't have a corresponding block
827 schema.
829 Returns:
830 Block: Hydrated block with data from block document.
831 """
832 if block_document.block_schema is None:
833 raise ValueError(
834 "Unable to determine block schema for provided block document"
835 )
837 block_cls = (
838 cls
839 if cls.__name__ != "Block"
840 # Look up the block class by dispatch
841 else cls.get_block_class_from_schema(block_document.block_schema)
842 )
844 block = block_cls.model_validate(block_document.data)
845 block._block_document_id = block_document.id
846 block.__class__._block_schema_id = block_document.block_schema_id
847 block.__class__._block_type_id = block_document.block_type_id
848 block._block_document_name = block_document.name
849 block._is_anonymous = block_document.is_anonymous
850 block._define_metadata_on_nested_blocks(
851 block_document.block_document_references
852 )
854 resources: Optional[ResourceTuple] = block._event_method_called_resources()
855 if resources:
856 kind = block._event_kind()
857 resource, related = resources
858 emit_event(event=f"{kind}.loaded", resource=resource, related=related)
860 return block
862 def _event_kind(self) -> str: 1a
863 return f"prefect.block.{self.get_block_type_slug()}"
865 def _event_method_called_resources(self) -> ResourceTuple | None: 1a
866 if not (self._block_document_id and self._block_document_name):
867 return None
869 return (
870 {
871 "prefect.resource.id": (
872 f"prefect.block-document.{self._block_document_id}"
873 ),
874 "prefect.resource.name": self._block_document_name,
875 },
876 [
877 {
878 "prefect.resource.id": (
879 f"prefect.block-type.{self.get_block_type_slug()}"
880 ),
881 "prefect.resource.role": "block-type",
882 }
883 ],
884 )
886 @classmethod 1a
887 def get_block_class_from_schema(cls: type[Self], schema: BlockSchema) -> type[Self]: 1a
888 """
889 Retrieve the block class implementation given a schema.
890 """
891 return cls.get_block_class_from_key(block_schema_to_key(schema))
893 @classmethod 1a
894 def get_block_class_from_key(cls: type[Self], key: str) -> type[Self]: 1a
895 """
896 Retrieve the block class implementation given a key.
897 """
899 # Ensure collections are imported and have the opportunity to register types
900 # before looking up the block class, but only do this once
901 load_prefect_collections()
903 try:
904 return lookup_type(cls, key)
905 except KeyError:
906 message = f"No block class found for slug {key!r}."
907 # Handle common blocks types used for storage, which is the primary use case for looking up blocks by key
908 if key == "s3-bucket":
909 message += " Please ensure that `prefect-aws` is installed."
910 elif key == "gcs-bucket":
911 message += " Please ensure that `prefect-gcp` is installed."
912 elif key == "azure-blob-storage-container":
913 message += " Please ensure that `prefect-azure` is installed."
914 else:
915 message += " Please ensure that the block class is available in the current environment."
916 raise UnknownBlockType(message)
918 def _define_metadata_on_nested_blocks( 1a
919 self, block_document_references: dict[str, dict[str, Any]]
920 ):
921 """
922 Recursively populates metadata fields on nested blocks based on the
923 provided block document references.
924 """
925 for item in block_document_references.items():
926 field_name, block_document_reference = item
927 nested_block = getattr(self, field_name)
928 if isinstance(nested_block, Block):
929 nested_block_document_info = block_document_reference.get(
930 "block_document", {}
931 )
932 nested_block._define_metadata_on_nested_blocks(
933 nested_block_document_info.get("block_document_references", {})
934 )
935 nested_block_document_id = nested_block_document_info.get("id")
936 nested_block._block_document_id = (
937 UUID(nested_block_document_id) if nested_block_document_id else None
938 )
939 nested_block._block_document_name = nested_block_document_info.get(
940 "name"
941 )
942 nested_block._is_anonymous = nested_block_document_info.get(
943 "is_anonymous"
944 )
946 @classmethod 1a
947 async def _aget_block_document( 1a
948 cls,
949 name: str,
950 client: "PrefectClient",
951 ) -> tuple[BlockDocument, str]:
952 if cls.__name__ == "Block":
953 block_type_slug, block_document_name = name.split("/", 1)
954 else:
955 block_type_slug = cls.get_block_type_slug()
956 block_document_name = name
958 try:
959 block_document = await client.read_block_document_by_name(
960 name=block_document_name, block_type_slug=block_type_slug
961 )
962 except prefect.exceptions.ObjectNotFound as e:
963 raise ValueError(
964 f"Unable to find block document named {block_document_name} for block"
965 f" type {block_type_slug}"
966 ) from e
968 return block_document, block_document_name
970 @classmethod 1a
971 def _get_block_document( 1a
972 cls,
973 name: str,
974 client: "SyncPrefectClient",
975 ) -> tuple[BlockDocument, str]:
976 if cls.__name__ == "Block":
977 block_type_slug, block_document_name = name.split("/", 1)
978 else:
979 block_type_slug = cls.get_block_type_slug()
980 block_document_name = name
982 try:
983 block_document = client.read_block_document_by_name(
984 name=block_document_name, block_type_slug=block_type_slug
985 )
986 except prefect.exceptions.ObjectNotFound as e:
987 raise ValueError(
988 f"Unable to find block document named {block_document_name} for block"
989 f" type {block_type_slug}"
990 ) from e
992 return block_document, block_document_name
994 @classmethod 1a
995 @inject_client 1a
996 async def _get_block_document_by_id( 1a
997 cls,
998 block_document_id: Union[str, uuid.UUID],
999 client: "PrefectClient | None" = None,
1000 ):
1001 if TYPE_CHECKING:
1002 assert isinstance(client, PrefectClient)
1003 if isinstance(block_document_id, str):
1004 try:
1005 block_document_id = UUID(block_document_id)
1006 except ValueError:
1007 raise ValueError(
1008 f"Block document ID {block_document_id!r} is not a valid UUID"
1009 )
1011 try:
1012 block_document = await client.read_block_document(
1013 block_document_id=block_document_id
1014 )
1015 except prefect.exceptions.ObjectNotFound:
1016 raise ValueError(
1017 f"Unable to find block document with ID {block_document_id!r}"
1018 )
1020 return block_document, block_document.name
1022 @classmethod 1a
1023 @inject_client 1a
1024 async def aload( 1a
1025 cls,
1026 name: str,
1027 validate: bool = True,
1028 client: Optional["PrefectClient"] = None,
1029 ) -> "Self":
1030 """
1031 Retrieves data from the block document with the given name for the block type
1032 that corresponds with the current class and returns an instantiated version of
1033 the current class with the data stored in the block document.
1035 If a block document for a given block type is saved with a different schema
1036 than the current class calling `aload`, a warning will be raised.
1038 If the current class schema is a subset of the block document schema, the block
1039 can be loaded as normal using the default `validate = True`.
1041 If the current class schema is a superset of the block document schema, `aload`
1042 must be called with `validate` set to False to prevent a validation error. In
1043 this case, the block attributes will default to `None` and must be set manually
1044 and saved to a new block document before the block can be used as expected.
1046 Args:
1047 name: The name or slug of the block document. A block document slug is a
1048 string with the format `<block_type_slug>/<block_document_name>`
1049 validate: If False, the block document will be loaded without Pydantic
1050 validating the block schema. This is useful if the block schema has
1051 changed client-side since the block document referred to by `name` was saved.
1052 client: The client to use to load the block document. If not provided, the
1053 default client will be injected.
1055 Raises:
1056 ValueError: If the requested block document is not found.
1058 Returns:
1059 An instance of the current class hydrated with the data stored in the
1060 block document with the specified name.
1062 Examples:
1063 Load from a Block subclass with a block document name:
1064 ```python
1065 class Custom(Block):
1066 message: str
1068 Custom(message="Hello!").save("my-custom-message")
1070 loaded_block = await Custom.aload("my-custom-message")
1071 ```
1073 Load from Block with a block document slug:
1074 ```python
1075 class Custom(Block):
1076 message: str
1078 Custom(message="Hello!").save("my-custom-message")
1080 loaded_block = await Block.aload("custom/my-custom-message")
1081 ```
1083 Migrate a block document to a new schema:
1084 ```python
1085 # original class
1086 class Custom(Block):
1087 message: str
1089 Custom(message="Hello!").save("my-custom-message")
1091 # Updated class with new required field
1092 class Custom(Block):
1093 message: str
1094 number_of_ducks: int
1096 loaded_block = await Custom.aload("my-custom-message", validate=False)
1098 # Prints UserWarning about schema mismatch
1100 loaded_block.number_of_ducks = 42
1102 loaded_block.save("my-custom-message", overwrite=True)
1103 ```
1104 """
1105 if TYPE_CHECKING:
1106 assert isinstance(client, PrefectClient)
1107 block_document, _ = await cls._aget_block_document(name, client=client)
1109 return cls._load_from_block_document(block_document, validate=validate)
1111 @classmethod 1a
1112 @async_dispatch(aload) 1a
1113 def load( 1a
1114 cls,
1115 name: str,
1116 validate: bool = True,
1117 client: Optional["PrefectClient"] = None,
1118 ) -> "Self":
1119 """
1120 Retrieves data from the block document with the given name for the block type
1121 that corresponds with the current class and returns an instantiated version of
1122 the current class with the data stored in the block document.
1124 If a block document for a given block type is saved with a different schema
1125 than the current class calling `load`, a warning will be raised.
1127 If the current class schema is a subset of the block document schema, the block
1128 can be loaded as normal using the default `validate = True`.
1130 If the current class schema is a superset of the block document schema, `load`
1131 must be called with `validate` set to False to prevent a validation error. In
1132 this case, the block attributes will default to `None` and must be set manually
1133 and saved to a new block document before the block can be used as expected.
1135 Args:
1136 name: The name or slug of the block document. A block document slug is a
1137 string with the format `<block_type_slug>/<block_document_name>`
1138 validate: If False, the block document will be loaded without Pydantic
1139 validating the block schema. This is useful if the block schema has
1140 changed client-side since the block document referred to by `name` was saved.
1141 client: The client to use to load the block document. If not provided, the
1142 default client will be injected.
1144 Raises:
1145 ValueError: If the requested block document is not found.
1147 Returns:
1148 An instance of the current class hydrated with the data stored in the
1149 block document with the specified name.
1151 Examples:
1152 Load from a Block subclass with a block document name:
1153 ```python
1154 class Custom(Block):
1155 message: str
1157 Custom(message="Hello!").save("my-custom-message")
1159 loaded_block = Custom.load("my-custom-message")
1160 ```
1162 Load from Block with a block document slug:
1163 ```python
1164 class Custom(Block):
1165 message: str
1167 Custom(message="Hello!").save("my-custom-message")
1169 loaded_block = Block.load("custom/my-custom-message")
1170 ```
1172 Migrate a block document to a new schema:
1173 ```python
1174 # original class
1175 class Custom(Block):
1176 message: str
1178 Custom(message="Hello!").save("my-custom-message")
1180 # Updated class with new required field
1181 class Custom(Block):
1182 message: str
1183 number_of_ducks: int
1185 loaded_block = Custom.load("my-custom-message", validate=False)
1187 # Prints UserWarning about schema mismatch
1189 loaded_block.number_of_ducks = 42
1191 loaded_block.save("my-custom-message", overwrite=True)
1192 ```
1193 """
1194 # Need to use a `PrefectClient` here to ensure `Block.load` and `Block.aload` signatures match
1195 # TODO: replace with only sync client once all internal calls are updated to use `Block.aload` and `@async_dispatch` is removed
1196 if client is None:
1197 # If a client wasn't provided, we get to use a sync client
1198 from prefect.client.orchestration import get_client
1200 with get_client(sync_client=True) as sync_client:
1201 block_document, _ = cls._get_block_document(name, client=sync_client)
1202 else:
1203 # If a client was provided, reuse it, even though it's async, to avoid excessive client creation
1204 block_document, _ = run_coro_as_sync(
1205 cls._aget_block_document(name, client=client)
1206 )
1207 return cls._load_from_block_document(block_document, validate=validate)
1209 @classmethod 1a
1210 @sync_compatible 1a
1211 @inject_client 1a
1212 async def load_from_ref( 1a
1213 cls,
1214 ref: Union[str, UUID, dict[str, Any]],
1215 validate: bool = True,
1216 client: "PrefectClient | None" = None,
1217 ) -> Self:
1218 """
1219 Retrieves data from the block document by given reference for the block type
1220 that corresponds with the current class and returns an instantiated version of
1221 the current class with the data stored in the block document.
1223 Provided reference can be a block document ID, or a reference data in dictionary format.
1224 Supported dictionary reference formats are:
1225 - `{"block_document_id": <block_document_id>}`
1226 - `{"block_document_slug": <block_document_slug>}`
1228 If a block document for a given block type is saved with a different schema
1229 than the current class calling `load`, a warning will be raised.
1231 If the current class schema is a subset of the block document schema, the block
1232 can be loaded as normal using the default `validate = True`.
1234 If the current class schema is a superset of the block document schema, `load`
1235 must be called with `validate` set to False to prevent a validation error. In
1236 this case, the block attributes will default to `None` and must be set manually
1237 and saved to a new block document before the block can be used as expected.
1239 Args:
1240 ref: The reference to the block document. This can be a block document ID,
1241 or one of supported dictionary reference formats.
1242 validate: If False, the block document will be loaded without Pydantic
1243 validating the block schema. This is useful if the block schema has
1244 changed client-side since the block document referred to by `name` was saved.
1245 client: The client to use to load the block document. If not provided, the
1246 default client will be injected.
1248 Raises:
1249 ValueError: If invalid reference format is provided.
1250 ValueError: If the requested block document is not found.
1252 Returns:
1253 An instance of the current class hydrated with the data stored in the
1254 block document with the specified name.
1256 """
1257 if TYPE_CHECKING:
1258 assert isinstance(client, PrefectClient)
1259 block_document = None
1260 if isinstance(ref, (str, UUID)):
1261 block_document, _ = await cls._get_block_document_by_id(ref, client=client)
1262 else:
1263 if block_document_id := ref.get("block_document_id"):
1264 block_document, _ = await cls._get_block_document_by_id(
1265 block_document_id, client=client
1266 )
1267 elif block_document_slug := ref.get("block_document_slug"):
1268 block_document, _ = await cls._aget_block_document(
1269 block_document_slug, client=client
1270 )
1272 if not block_document:
1273 raise ValueError(f"Invalid reference format {ref!r}.")
1275 return cls._load_from_block_document(block_document, validate=validate)
1277 @classmethod 1a
1278 def _load_from_block_document( 1a
1279 cls, block_document: BlockDocument, validate: bool = True
1280 ) -> Self:
1281 """
1282 Loads a block from a given block document.
1284 If a block document for a given block type is saved with a different schema
1285 than the current class calling `load`, a warning will be raised.
1287 If the current class schema is a subset of the block document schema, the block
1288 can be loaded as normal using the default `validate = True`.
1290 If the current class schema is a superset of the block document schema, `load`
1291 must be called with `validate` set to False to prevent a validation error. In
1292 this case, the block attributes will default to `None` and must be set manually
1293 and saved to a new block document before the block can be used as expected.
1295 Args:
1296 block_document: The block document used to instantiate a block.
1297 validate: If False, the block document will be loaded without Pydantic
1298 validating the block schema. This is useful if the block schema has
1299 changed client-side since the block document referred to by `name` was saved.
1301 Raises:
1302 ValueError: If the requested block document is not found.
1304 Returns:
1305 An instance of the current class hydrated with the data stored in the
1306 block document with the specified name.
1308 """
1309 try:
1310 return cls._from_block_document(block_document)
1311 except ValidationError as e:
1312 if not validate:
1313 missing_fields = tuple(err["loc"][0] for err in e.errors())
1314 missing_block_data: dict[str, None] = {
1315 field: None for field in missing_fields if isinstance(field, str)
1316 }
1317 warnings.warn(
1318 f"Could not fully load {block_document.name!r} of block type"
1319 f" {cls.get_block_type_slug()!r} - this is likely because one or more"
1320 " required fields were added to the schema for"
1321 f" {cls.__name__!r} that did not exist on the class when this block"
1322 " was last saved. Please specify values for new field(s):"
1323 f" {listrepr(missing_fields)}, then run"
1324 f' `{cls.__name__}.save("{block_document.name}", overwrite=True)`,'
1325 " and load this block again before attempting to use it."
1326 )
1327 return cls.model_construct(**block_document.data, **missing_block_data)
1328 raise RuntimeError(
1329 f"Unable to load {block_document.name!r} of block type"
1330 f" {cls.get_block_type_slug()!r} due to failed validation. To load without"
1331 " validation, try loading again with `validate=False`."
1332 ) from e
1334 @staticmethod 1a
1335 def is_block_class(block: Any) -> TypeGuard[type["Block"]]: 1a
1336 return _is_subclass(block, Block) 1ab
1338 @staticmethod 1a
1339 def annotation_refers_to_block_class(annotation: Any) -> bool: 1a
1340 if Block.is_block_class(annotation):
1341 return True
1343 if get_origin(annotation) in UnionTypes:
1344 for annotation in get_args(annotation):
1345 if Block.is_block_class(annotation):
1346 return True
1348 return False
1350 @classmethod 1a
1351 @sync_compatible 1a
1352 @inject_client 1a
1353 async def register_type_and_schema(cls, client: Optional["PrefectClient"] = None): 1a
1354 """
1355 Makes block available for configuration with current Prefect API.
1356 Recursively registers all nested blocks. Registration is idempotent.
1358 Args:
1359 client: Optional client to use for registering type and schema with the
1360 Prefect API. A new client will be created and used if one is not
1361 provided.
1362 """
1363 if TYPE_CHECKING:
1364 assert isinstance(client, PrefectClient)
1365 if cls.__name__ == "Block":
1366 raise InvalidBlockRegistration(
1367 "`register_type_and_schema` should be called on a Block "
1368 "subclass and not on the Block class directly."
1369 )
1370 if ABC in getattr(cls, "__bases__", []):
1371 raise InvalidBlockRegistration(
1372 "`register_type_and_schema` should be called on a Block "
1373 "subclass and not on a Block interface class directly."
1374 )
1376 async def register_blocks_in_annotation(annotation: type) -> None:
1377 """Walk through the annotation and register any nested blocks."""
1378 if Block.is_block_class(annotation):
1379 coro = annotation.register_type_and_schema(client=client)
1380 if TYPE_CHECKING:
1381 assert isinstance(coro, Coroutine)
1382 await coro
1383 elif get_origin(annotation) in NestedTypes:
1384 for inner_annotation in get_args(annotation):
1385 await register_blocks_in_annotation(inner_annotation)
1387 for field in cls.model_fields.values():
1388 if field.annotation is not None:
1389 await register_blocks_in_annotation(field.annotation)
1391 try:
1392 block_type = await client.read_block_type_by_slug(
1393 slug=cls.get_block_type_slug()
1394 )
1396 cls._block_type_id = block_type.id
1398 local_block_type = cls._to_block_type()
1399 if _should_update_block_type(
1400 local_block_type=local_block_type, server_block_type=block_type
1401 ):
1402 await client.update_block_type(
1403 block_type_id=block_type.id,
1404 block_type=BlockTypeUpdate(
1405 **local_block_type.model_dump(
1406 include={
1407 "logo_url",
1408 "documentation_url",
1409 "description",
1410 "code_example",
1411 }
1412 )
1413 ),
1414 )
1415 except prefect.exceptions.ObjectNotFound:
1416 block_type_create = BlockTypeCreate(
1417 **cls._to_block_type().model_dump(
1418 include={
1419 "name",
1420 "slug",
1421 "logo_url",
1422 "documentation_url",
1423 "description",
1424 "code_example",
1425 }
1426 )
1427 )
1428 block_type = await client.create_block_type(block_type=block_type_create)
1429 cls._block_type_id = block_type.id
1431 try:
1432 block_schema = await client.read_block_schema_by_checksum(
1433 checksum=cls._calculate_schema_checksum(),
1434 version=cls.get_block_schema_version(),
1435 )
1436 except prefect.exceptions.ObjectNotFound:
1437 block_schema_create = BlockSchemaCreate(
1438 **cls._to_block_schema(block_type_id=block_type.id).model_dump(
1439 include={"fields", "block_type_id", "capabilities", "version"}
1440 )
1441 )
1442 block_schema = await client.create_block_schema(
1443 block_schema=block_schema_create
1444 )
1446 cls._block_schema_id = block_schema.id
1448 @inject_client 1a
1449 async def _save( 1a
1450 self,
1451 name: Optional[str] = None,
1452 is_anonymous: bool = False,
1453 overwrite: bool = False,
1454 client: Optional["PrefectClient"] = None,
1455 ) -> UUID:
1456 """
1457 Saves the values of a block as a block document with an option to save as an
1458 anonymous block document.
1460 Args:
1461 name: User specified name to give saved block document which can later be used to load the
1462 block document.
1463 is_anonymous: Boolean value specifying whether the block document is anonymous. Anonymous
1464 blocks are intended for system use and are not shown in the UI. Anonymous blocks do not
1465 require a user-supplied name.
1466 overwrite: Boolean value specifying if values should be overwritten if a block document with
1467 the specified name already exists.
1469 Raises:
1470 ValueError: If a name is not given and `is_anonymous` is `False` or a name is given and
1471 `is_anonymous` is `True`.
1472 """
1473 if TYPE_CHECKING:
1474 assert isinstance(client, PrefectClient)
1475 if name is None and not is_anonymous:
1476 if self._block_document_name is None:
1477 raise ValueError(
1478 "You're attempting to save a block document without a name."
1479 " Please either call `save` with a `name` or pass"
1480 " `is_anonymous=True` to save an anonymous block."
1481 )
1482 else:
1483 name = self._block_document_name
1485 self._is_anonymous = is_anonymous
1487 # Ensure block type and schema are registered before saving block document.
1488 coro = self.register_type_and_schema(client=client)
1489 if TYPE_CHECKING:
1490 assert isinstance(coro, Coroutine)
1491 await coro
1493 block_document = None
1494 try:
1495 block_document_create = BlockDocumentCreate(
1496 **self._to_block_document(name=name, include_secrets=True).model_dump(
1497 include={
1498 "name",
1499 "block_schema_id",
1500 "block_type_id",
1501 "data",
1502 "is_anonymous",
1503 }
1504 )
1505 )
1506 block_document = await client.create_block_document(
1507 block_document=block_document_create
1508 )
1509 except prefect.exceptions.ObjectAlreadyExists as err:
1510 if overwrite:
1511 block_document_id = self._block_document_id
1512 if block_document_id is None and name is not None:
1513 existing_block_document = await client.read_block_document_by_name(
1514 name=name, block_type_slug=self.get_block_type_slug()
1515 )
1516 block_document_id = existing_block_document.id
1517 if TYPE_CHECKING:
1518 # We know that the block document id is not None here because we
1519 # only get here if the block document already exists
1520 assert isinstance(block_document_id, UUID)
1521 block_document_update = BlockDocumentUpdate(
1522 **self._to_block_document(
1523 name=name, include_secrets=True
1524 ).model_dump(include={"block_schema_id", "data"})
1525 )
1526 await client.update_block_document(
1527 block_document_id=block_document_id,
1528 block_document=block_document_update,
1529 )
1530 block_document = await client.read_block_document(
1531 block_document_id=block_document_id
1532 )
1533 else:
1534 raise ValueError(
1535 "You are attempting to save values with a name that is already in"
1536 " use for this block type. If you would like to overwrite the"
1537 " values that are saved, then save with `overwrite=True`."
1538 ) from err
1540 # Update metadata on block instance for later use.
1541 self._block_document_name = block_document.name
1542 self._block_document_id = block_document.id
1543 return self._block_document_id
1545 @sync_compatible 1a
1546 async def save( 1a
1547 self,
1548 name: Optional[str] = None,
1549 overwrite: bool = False,
1550 client: Optional["PrefectClient"] = None,
1551 ):
1552 """
1553 Saves the values of a block as a block document.
1555 Args:
1556 name: User specified name to give saved block document which can later be used to load the
1557 block document.
1558 overwrite: Boolean value specifying if values should be overwritten if a block document with
1559 the specified name already exists.
1561 """
1562 document_id = await self._save(name=name, overwrite=overwrite, client=client)
1564 return document_id
1566 @classmethod 1a
1567 @sync_compatible 1a
1568 @inject_client 1a
1569 async def delete( 1a
1570 cls,
1571 name: str,
1572 client: Optional["PrefectClient"] = None,
1573 ):
1574 if TYPE_CHECKING:
1575 assert isinstance(client, PrefectClient)
1576 block_document, _ = await cls._aget_block_document(name, client=client)
1578 await client.delete_block_document(block_document.id)
1580 def get_block_placeholder(self) -> str: 1a
1581 """
1582 Returns the block placeholder for the current block which can be used for
1583 templating.
1585 Returns:
1586 str: The block placeholder for the current block in the format
1587 `prefect.blocks.{block_type_name}.{block_document_name}`
1589 Raises:
1590 BlockNotSavedError: Raised if the block has not been saved.
1592 If a block has not been saved, the return value will be `None`.
1593 """
1594 block_document_name = self._block_document_name
1595 if not block_document_name:
1596 raise BlockNotSavedError(
1597 "Could not generate block placeholder for unsaved block."
1598 )
1600 return f"prefect.blocks.{self.get_block_type_slug()}.{block_document_name}"
1602 @classmethod 1a
1603 def model_json_schema( 1a
1604 cls,
1605 by_alias: bool = True,
1606 ref_template: str = "#/definitions/{model}",
1607 schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema,
1608 mode: Literal["validation", "serialization"] = "validation",
1609 ) -> dict[str, Any]:
1610 """TODO: stop overriding this method - use GenerateSchema in ConfigDict instead?"""
1611 schema = super().model_json_schema( 1ab
1612 by_alias, ref_template, schema_generator, mode
1613 )
1615 # ensure backwards compatibility by copying $defs into definitions
1616 if "$defs" in schema: 1ab
1617 schema["definitions"] = schema.pop("$defs") 1ab
1619 schema = remove_nested_keys(["additionalProperties"], schema) 1ab
1620 return schema 1ab
1622 @classmethod 1a
1623 def model_validate( 1a
1624 cls: type[Self],
1625 obj: dict[str, Any] | Any,
1626 *,
1627 strict: bool | None = None,
1628 from_attributes: bool | None = None,
1629 context: dict[str, Any] | None = None,
1630 ) -> Self:
1631 if isinstance(obj, dict):
1632 obj = cast(dict[str, Any], obj)
1633 extra_serializer_fields = {
1634 "_block_document_id",
1635 "_block_document_name",
1636 "_is_anonymous",
1637 }.intersection(obj.keys())
1638 for field in extra_serializer_fields:
1639 obj.pop(field, None)
1641 return super().model_validate(
1642 obj,
1643 strict=strict,
1644 from_attributes=from_attributes,
1645 context=context,
1646 )
1648 def model_dump( 1a
1649 self,
1650 *,
1651 mode: Literal["json", "python"] | str = "python",
1652 include: "IncEx | None" = None,
1653 exclude: "IncEx | None" = None,
1654 context: dict[str, Any] | None = None,
1655 by_alias: bool = False,
1656 exclude_unset: bool = False,
1657 exclude_defaults: bool = False,
1658 exclude_none: bool = False,
1659 round_trip: bool = False,
1660 warnings: bool | Literal["none", "warn", "error"] = True,
1661 serialize_as_any: bool = False,
1662 ) -> dict[str, Any]:
1663 d = super().model_dump(
1664 mode=mode,
1665 include=include,
1666 exclude=exclude,
1667 context=context,
1668 by_alias=by_alias,
1669 exclude_unset=exclude_unset,
1670 exclude_defaults=exclude_defaults,
1671 exclude_none=exclude_none,
1672 round_trip=round_trip,
1673 warnings=warnings,
1674 serialize_as_any=serialize_as_any,
1675 )
1677 extra_serializer_fields = {
1678 "block_type_slug",
1679 "_block_document_id",
1680 "_block_document_name",
1681 "_is_anonymous",
1682 }.intersection(d.keys())
1684 for field in extra_serializer_fields:
1685 if (include and field not in include) or (exclude and field in exclude):
1686 d.pop(field)
1688 return d