Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/core.py: 43%

586 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import hashlib 1a

4import html 1a

5import inspect 1a

6import sys 1a

7import types 1a

8import uuid 1a

9import warnings 1a

10from abc import ABC 1a

11from functools import partial 1a

12from textwrap import dedent 1a

13from typing import ( 1a

14 TYPE_CHECKING, 

15 Any, 

16 ClassVar, 

17 Coroutine, 

18 FrozenSet, 

19 Optional, 

20 TypeVar, 

21 Union, 

22 cast, 

23 get_origin, 

24) 

25from uuid import UUID, uuid4 1a

26 

27from griffe import Docstring, DocstringSection, DocstringSectionKind, Parser, parse 1a

28from packaging.version import InvalidVersion, Version 1a

29from pydantic import ( 1a

30 BaseModel, 

31 ConfigDict, 

32 HttpUrl, 

33 PrivateAttr, 

34 SecretBytes, 

35 SecretStr, 

36 SerializationInfo, 

37 SerializerFunctionWrapHandler, 

38 ValidationError, 

39 model_serializer, 

40 model_validator, 

41) 

42from pydantic.json_schema import GenerateJsonSchema 1a

43from typing_extensions import Literal, ParamSpec, Self, TypeGuard, get_args 1a

44 

45import prefect.exceptions 1a

46from prefect._internal.compatibility.async_dispatch import async_dispatch 1a

47from prefect.client.schemas import ( 1a

48 DEFAULT_BLOCK_SCHEMA_VERSION, 

49 BlockDocument, 

50 BlockSchema, 

51 BlockType, 

52 BlockTypeUpdate, 

53) 

54from prefect.client.schemas.actions import ( 1a

55 BlockDocumentCreate, 

56 BlockDocumentUpdate, 

57 BlockSchemaCreate, 

58 BlockTypeCreate, 

59) 

60from prefect.client.utilities import inject_client 1a

61from prefect.events import emit_event 1a

62from prefect.logging.loggers import disable_logger 1a

63from prefect.plugins import load_prefect_collections 1a

64from prefect.types import SecretDict 1a

65from prefect.utilities.asyncutils import run_coro_as_sync, sync_compatible 1a

66from prefect.utilities.collections import listrepr, remove_nested_keys, visit_collection 1a

67from prefect.utilities.dispatch import lookup_type, register_base_type 1a

68from prefect.utilities.hashing import hash_objects 1a

69from prefect.utilities.importtools import to_qualified_name 1a

70from prefect.utilities.pydantic import handle_secret_render 1a

71from prefect.utilities.slugify import slugify 1a

72 

73if TYPE_CHECKING: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true1a

74 from pydantic.main import IncEx 

75 

76 from prefect.client.orchestration import PrefectClient, SyncPrefectClient 

77 

78R = TypeVar("R") 1a

79P = ParamSpec("P") 1a

80 

81ResourceTuple = tuple[dict[str, Any], list[dict[str, Any]]] 1a

82UnionTypes: tuple[object, ...] = (Union,) 1a

83if hasattr(types, "UnionType"): 83 ↛ 86line 83 didn't jump to line 86 because the condition on line 83 was always true1a

84 # Python 3.10+ only 

85 UnionTypes = (*UnionTypes, types.UnionType) 1a

86NestedTypes: tuple[type, ...] = (list, dict, tuple, *UnionTypes) 1a

87 

88 

89def block_schema_to_key(schema: BlockSchema) -> str: 1a

90 """ 

91 Defines the unique key used to lookup the Block class for a given schema. 

92 """ 

93 if schema.block_type is None: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true1ab

94 raise ValueError("Block type is not set") 

95 return f"{schema.block_type.slug}" 1ab

96 

97 

98class InvalidBlockRegistration(Exception): 1a

99 """ 

100 Raised on attempted registration of the base Block 

101 class or a Block interface class 

102 """ 

103 

104 

105class UnknownBlockType(Exception): 1a

106 """ 

107 Raised when a block type is not found in the registry. 

108 """ 

109 

110 

111def _collect_nested_reference_strings( 1a

112 obj: dict[str, Any] | list[Any], 

113) -> list[dict[str, Any]]: 

114 """ 

115 Collects all nested reference strings (e.g. #/definitions/Model) from a given object. 

116 """ 

117 found_reference_strings: list[dict[str, Any]] = [] 1ab

118 if isinstance(obj, dict): 1ab

119 if ref := obj.get("$ref"): 1ab

120 found_reference_strings.append(ref) 1ab

121 for value in obj.values(): 1ab

122 found_reference_strings.extend(_collect_nested_reference_strings(value)) 1ab

123 if isinstance(obj, list): 1ab

124 for item in obj: 1ab

125 found_reference_strings.extend(_collect_nested_reference_strings(item)) 1ab

126 return found_reference_strings 1ab

127 

128 

129def _get_non_block_reference_definitions( 1a

130 object_definition: dict[str, Any], definitions: dict[str, Any] 

131) -> dict[str, Any]: 

132 """ 

133 Given a definition of an object in a block schema OpenAPI spec and the dictionary 

134 of all reference definitions in that same block schema OpenAPI spec, return the 

135 definitions for objects that are referenced from the object or any children of 

136 the object that do not reference a block. 

137 """ 

138 non_block_definitions: dict[str, Any] = {} 1ab

139 reference_strings = _collect_nested_reference_strings(object_definition) 1ab

140 for reference_string in reference_strings: 1ab

141 if isinstance(reference_string, str): 141 ↛ 140line 141 didn't jump to line 140 because the condition on line 141 was always true1ab

142 definition_key = reference_string.replace("#/definitions/", "") 1ab

143 definition = definitions.get(definition_key) 1ab

144 if definition and definition.get("block_type_slug") is None: 1ab

145 non_block_definitions = { 1b

146 **non_block_definitions, 

147 definition_key: definition, 

148 **_get_non_block_reference_definitions(definition, definitions), 

149 } 

150 return non_block_definitions 1ab

151 

152 

153def _is_subclass(cls: type, parent_cls: type) -> TypeGuard[type[BaseModel]]: 1a

154 """ 

155 Checks if a given class is a subclass of another class. Unlike issubclass, 

156 this will not throw an exception if cls is an instance instead of a type. 

157 """ 

158 # For python<=3.11 inspect.isclass() will return True for parametrized types (e.g. list[str]) 

159 # so we need to check for get_origin() to avoid TypeError for issubclass. 

160 return inspect.isclass(cls) and not get_origin(cls) and issubclass(cls, parent_cls) 1ab

161 

162 

163def _collect_secret_fields( 1a

164 name: str, 

165 type_: type[BaseModel] | type[SecretStr] | type[SecretBytes] | type[SecretDict], 

166 secrets: list[str], 

167) -> None: 

168 """ 

169 Recursively collects all secret fields from a given type and adds them to the 

170 secrets list, supporting nested Union / Dict / Tuple / List / BaseModel fields. 

171 Also, note, this function mutates the input secrets list, thus does not return anything. 

172 """ 

173 if get_origin(type_) in NestedTypes: 1ab

174 for nested_type in get_args(type_): 1ab

175 _collect_secret_fields(name, nested_type, secrets) 1ab

176 return 1ab

177 elif _is_subclass(type_, BaseModel): 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true1ab

178 for field_name, field in type_.model_fields.items(): 

179 if field.annotation is not None: 

180 _collect_secret_fields( 

181 f"{name}.{field_name}", field.annotation, secrets 

182 ) 

183 return 

184 

185 # Check if this is a pydantic Secret type (including generic Secret[T]) 

186 is_pydantic_secret = False 1ab

187 

188 # Direct check for SecretStr, SecretBytes 

189 if type_ in (SecretStr, SecretBytes): 1ab

190 is_pydantic_secret = True 1ab

191 # Check for base Secret class 

192 elif ( 192 ↛ 197line 192 didn't jump to line 197 because the condition on line 192 was never true

193 isinstance(type_, type) # type: ignore[unnecessaryIsInstance] 

194 and getattr(type_, "__module__", None) == "pydantic.types" 

195 and getattr(type_, "__name__", None) == "Secret" 

196 ): 

197 is_pydantic_secret = True 

198 # Check for generic Secret[T] (e.g., Secret[str], Secret[int]) 

199 elif get_origin(type_) is not None: 1ab

200 origin = get_origin(type_) 1ab

201 if ( 

202 getattr(origin, "__module__", None) == "pydantic.types" 

203 and getattr(origin, "__name__", None) == "Secret" 

204 ): 

205 is_pydantic_secret = True 1ab

206 

207 if is_pydantic_secret: 1ab

208 secrets.append(name) 1ab

209 elif type_ == SecretDict: 1ab

210 # Append .* to field name to signify that all values under a given key are secret and should be obfuscated. 

211 secrets.append(f"{name}.*") 1ab

212 elif Block.is_block_class(type_): 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true1ab

213 secrets.extend( 

214 f"{name}.{s}" for s in type_.model_json_schema()["secret_fields"] 

215 ) 

216 

217 

218def _should_update_block_type( 1a

219 local_block_type: BlockType, server_block_type: BlockType 

220) -> bool: 

221 """ 

222 Compares the fields of `local_block_type` and `server_block_type`. 

223 Only compare the possible updatable fields as defined by `BlockTypeUpdate.updatable_fields` 

224 Returns True if they are different, otherwise False. 

225 """ 

226 fields = BlockTypeUpdate.updatable_fields() 

227 

228 local_block_fields = local_block_type.model_dump(include=fields, exclude_unset=True) 

229 server_block_fields = server_block_type.model_dump( 

230 include=fields, exclude_unset=True 

231 ) 

232 

233 if local_block_fields.get("description") is not None: 

234 local_block_fields["description"] = html.unescape( 

235 local_block_fields["description"] 

236 ) 

237 if local_block_fields.get("code_example") is not None: 

238 local_block_fields["code_example"] = html.unescape( 

239 local_block_fields["code_example"] 

240 ) 

241 

242 if server_block_fields.get("description") is not None: 

243 server_block_fields["description"] = html.unescape( 

244 server_block_fields["description"] 

245 ) 

246 if server_block_fields.get("code_example") is not None: 

247 server_block_fields["code_example"] = html.unescape( 

248 server_block_fields["code_example"] 

249 ) 

250 

251 return server_block_fields != local_block_fields 

252 

253 

254class BlockNotSavedError(RuntimeError): 1a

255 """ 

256 Raised when a given block is not saved and an operation that requires 

257 the block to be saved is attempted. 

258 """ 

259 

260 pass 1a

261 

262 

263def schema_extra(schema: dict[str, Any], model: type["Block"]) -> None: 1a

264 """ 

265 Customizes Pydantic's schema generation feature to add blocks related information. 

266 """ 

267 schema["block_type_slug"] = model.get_block_type_slug() 1ab

268 # Ensures args and code examples aren't included in the schema 

269 description = model.get_description() 1ab

270 if description: 270 ↛ 274line 270 didn't jump to line 274 because the condition on line 270 was always true1ab

271 schema["description"] = description 1ab

272 else: 

273 # Prevent the description of the base class from being included in the schema 

274 schema.pop("description", None) 

275 

276 # create a list of secret field names 

277 # secret fields include both top-level keys and dot-delimited nested secret keys 

278 # A wildcard (*) means that all fields under a given key are secret. 

279 # for example: ["x", "y", "z.*", "child.a"] 

280 # means the top-level keys "x" and "y", all keys under "z", and the key "a" of a block 

281 # nested under the "child" key are all secret. There is no limit to nesting. 

282 secrets: list[str] = [] 1ab

283 for name, field in model.model_fields.items(): 1ab

284 if field.annotation is not None: 284 ↛ 283line 284 didn't jump to line 283 because the condition on line 284 was always true1ab

285 _collect_secret_fields(name, field.annotation, secrets) 1ab

286 schema["secret_fields"] = secrets 1ab

287 

288 # create block schema references 

289 refs: dict[str, Any] = {} 1ab

290 

291 def collect_block_schema_references(field_name: str, annotation: type) -> None: 1ab

292 """Walk through the annotation and collect block schemas for any nested blocks.""" 

293 if Block.is_block_class(annotation): 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true1ab

294 if isinstance(refs.get(field_name), list): 

295 refs[field_name].append(annotation._to_block_schema_reference_dict()) # pyright: ignore[reportPrivateUsage] 

296 elif isinstance(refs.get(field_name), dict): 

297 refs[field_name] = [ 

298 refs[field_name], 

299 annotation._to_block_schema_reference_dict(), # pyright: ignore[reportPrivateUsage] 

300 ] 

301 else: 

302 refs[field_name] = annotation._to_block_schema_reference_dict() # pyright: ignore[reportPrivateUsage] 

303 

304 if get_origin(annotation) in NestedTypes: 1ab

305 for type_ in get_args(annotation): 1ab

306 collect_block_schema_references(field_name, type_) 1ab

307 

308 for name, field in model.model_fields.items(): 1ab

309 if field.annotation is not None: 309 ↛ 308line 309 didn't jump to line 308 because the condition on line 309 was always true1ab

310 collect_block_schema_references(name, field.annotation) 1ab

311 schema["block_schema_references"] = refs 1ab

312 

313 

314@register_base_type 1a

315class Block(BaseModel, ABC): 1a

316 """ 

317 A base class for implementing a block that wraps an external service. 

318 

319 This class can be defined with an arbitrary set of fields and methods, and 

320 couples business logic with data contained in an block document. 

321 `_block_document_name`, `_block_document_id`, `_block_schema_id`, and 

322 `_block_type_id` are reserved by Prefect as Block metadata fields, but 

323 otherwise a Block can implement arbitrary logic. Blocks can be instantiated 

324 without populating these metadata fields, but can only be used interactively, 

325 not with the Prefect API. 

326 

327 Instead of the __init__ method, a block implementation allows the 

328 definition of a `block_initialization` method that is called after 

329 initialization. 

330 """ 

331 

332 model_config: ClassVar[ConfigDict] = ConfigDict( 1a

333 extra="allow", 

334 json_schema_extra=schema_extra, 

335 ) 

336 

337 def __new__(cls: type[Self], **kwargs: Any) -> Self: 1a

338 """ 

339 Create an instance of the Block subclass type if a `block_type_slug` is 

340 present in the data payload. 

341 """ 

342 if block_type_slug := kwargs.pop("block_type_slug", None): 

343 subcls = cls.get_block_class_from_key(block_type_slug) 

344 if cls is not subcls and issubclass(subcls, cls): 

345 return super().__new__(subcls) 

346 return super().__new__(cls) 

347 

348 def __init__(self, *args: Any, **kwargs: Any): 1a

349 super().__init__(*args, **kwargs) 

350 self.block_initialization() 

351 

352 def __str__(self) -> str: 1a

353 return self.__repr__() 

354 

355 def __repr_args__(self) -> list[tuple[str | None, Any]]: 1a

356 repr_args = super().__repr_args__() 

357 data_keys = self.model_json_schema()["properties"].keys() 

358 return [ 

359 (key, value) for key, value in repr_args if key is None or key in data_keys 

360 ] 

361 

362 @model_validator(mode="before") 1a

363 @classmethod 1a

364 def validate_block_type_slug(cls, values: Any) -> Any: 1a

365 """ 

366 Validates that the `block_type_slug` in the input values matches the expected 

367 block type slug for the class. This helps pydantic to correctly discriminate 

368 between different Block subclasses when validating Union types of Blocks. 

369 """ 

370 if isinstance(values, dict): 

371 if "block_type_slug" in values: 

372 expected_slug = cls.get_block_type_slug() 

373 slug = values["block_type_slug"] 

374 if slug and slug != expected_slug: 

375 raise ValueError( 

376 f"Invalid block_type_slug: expected '{expected_slug}', got '{values['block_type_slug']}'" 

377 ) 

378 return values 

379 

380 def block_initialization(self) -> None: 1a

381 pass 

382 

383 # -- private class variables 

384 # set by the class itself 

385 

386 # Attribute to customize the name of the block type created 

387 # when the block is registered with the API. If not set, block 

388 # type name will default to the class name. 

389 _block_type_name: ClassVar[Optional[str]] = None 1a

390 _block_type_slug: ClassVar[Optional[str]] = None 1a

391 

392 # Attributes used to set properties on a block type when registered 

393 # with the API. 

394 _logo_url: ClassVar[Optional[HttpUrl]] = None 1a

395 _documentation_url: ClassVar[Optional[HttpUrl]] = None 1a

396 _description: ClassVar[Optional[str]] = None 1a

397 _code_example: ClassVar[Optional[str]] = None 1a

398 _block_type_id: ClassVar[Optional[UUID]] = None 1a

399 _block_schema_id: ClassVar[Optional[UUID]] = None 1a

400 _block_schema_capabilities: ClassVar[Optional[list[str]]] = None 1a

401 _block_schema_version: ClassVar[Optional[str]] = None 1a

402 

403 # -- private instance variables 

404 # these are set when blocks are loaded from the API 

405 

406 _block_document_id: Optional[UUID] = PrivateAttr(None) 1a

407 _block_document_name: Optional[str] = PrivateAttr(None) 1a

408 _is_anonymous: Optional[bool] = PrivateAttr(None) 1a

409 

410 # Exclude `save` as it uses the `sync_compatible` decorator and needs to be 

411 # decorated directly. 

412 _events_excluded_methods: ClassVar[list[str]] = PrivateAttr( 1a

413 default=["block_initialization", "save", "dict"] 

414 ) 

415 

416 @classmethod 1a

417 def __dispatch_key__(cls) -> str | None: 1a

418 if cls.__name__ == "Block": 1ab

419 return None # The base class is abstract 1a

420 return block_schema_to_key(cls._to_block_schema()) 1ab

421 

422 @model_serializer(mode="wrap") 1a

423 def ser_model( 1a

424 self, handler: SerializerFunctionWrapHandler, info: SerializationInfo 

425 ) -> Any: 

426 jsonable_self = handler(self) 

427 if (ctx := info.context) and ctx.get("include_secrets") is True: 

428 # Add serialization mode to context so handle_secret_render knows how to process nested models 

429 ctx["serialization_mode"] = info.mode 

430 

431 for field_name in type(self).model_fields: 

432 field_value = getattr(self, field_name) 

433 

434 # In JSON mode, skip fields that don't contain secrets 

435 # as they're already properly serialized by the handler 

436 if ( 

437 info.mode == "json" 

438 and field_name in jsonable_self 

439 and not self._field_has_secrets(field_name) 

440 ): 

441 continue 

442 

443 # For all other fields, use visit_collection with handle_secret_render 

444 jsonable_self[field_name] = visit_collection( 

445 expr=field_value, 

446 visit_fn=partial(handle_secret_render, context=ctx), 

447 return_data=True, 

448 ) 

449 extra_fields = { 

450 "block_type_slug": self.get_block_type_slug(), 

451 "_block_document_id": self._block_document_id, 

452 "_block_document_name": self._block_document_name, 

453 "_is_anonymous": self._is_anonymous, 

454 } 

455 jsonable_self |= { 

456 key: value for key, value in extra_fields.items() if value is not None 

457 } 

458 return jsonable_self 

459 

460 @classmethod 1a

461 def get_block_type_name(cls) -> str: 1a

462 return cls._block_type_name or cls.__name__ 1ab

463 

464 @classmethod 1a

465 def get_block_type_slug(cls) -> str: 1a

466 return slugify(cls._block_type_slug or cls.get_block_type_name()) 1ab

467 

468 @classmethod 1a

469 def get_block_capabilities(cls) -> FrozenSet[str]: 1a

470 """ 

471 Returns the block capabilities for this Block. Recursively collects all block 

472 capabilities of all parent classes into a single frozenset. 

473 """ 

474 return frozenset( 1ab

475 { 

476 c 

477 for base in (cls,) + cls.__mro__ 

478 for c in getattr(base, "_block_schema_capabilities", []) or [] 

479 } 

480 ) 

481 

482 @classmethod 1a

483 def _get_current_package_version(cls): 1a

484 current_module = inspect.getmodule(cls) 1ab

485 if current_module: 485 ↛ 496line 485 didn't jump to line 496 because the condition on line 485 was always true1ab

486 top_level_module = sys.modules[ 1ab

487 current_module.__name__.split(".")[0] or "__main__" 

488 ] 

489 try: 1ab

490 version = Version(top_level_module.__version__) 1ab

491 # Strips off any local version information 

492 return version.base_version 1ab

493 except (AttributeError, InvalidVersion): 1b

494 # Module does not have a __version__ attribute or is not a parsable format 

495 pass 1b

496 return DEFAULT_BLOCK_SCHEMA_VERSION 1b

497 

498 @classmethod 1a

499 def get_block_schema_version(cls) -> str: 1a

500 return cls._block_schema_version or cls._get_current_package_version() 1ab

501 

502 @classmethod 1a

503 def _to_block_schema_reference_dict(cls): 1a

504 return dict( 

505 block_type_slug=cls.get_block_type_slug(), 

506 block_schema_checksum=cls._calculate_schema_checksum(), 

507 ) 

508 

509 @classmethod 1a

510 def _calculate_schema_checksum( 1a

511 cls, block_schema_fields: dict[str, Any] | None = None 

512 ): 

513 """ 

514 Generates a unique hash for the underlying schema of block. 

515 

516 Args: 

517 block_schema_fields: Dictionary detailing block schema fields to generate a 

518 checksum for. The fields of the current class is used if this parameter 

519 is not provided. 

520 

521 Returns: 

522 str: The calculated checksum prefixed with the hashing algorithm used. 

523 """ 

524 block_schema_fields = ( 1ab

525 cls.model_json_schema() 

526 if block_schema_fields is None 

527 else block_schema_fields 

528 ) 

529 fields_for_checksum = remove_nested_keys(["secret_fields"], block_schema_fields) 1ab

530 if fields_for_checksum.get("definitions"): 1ab

531 non_block_definitions = _get_non_block_reference_definitions( 1ab

532 fields_for_checksum, fields_for_checksum["definitions"] 

533 ) 

534 if non_block_definitions: 1ab

535 fields_for_checksum["definitions"] = non_block_definitions 1b

536 else: 

537 # Pop off definitions entirely instead of empty dict for consistency 

538 # with the OpenAPI specification 

539 fields_for_checksum.pop("definitions") 1ab

540 checksum = hash_objects(fields_for_checksum, hash_algo=hashlib.sha256) 1ab

541 if checksum is None: 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true1ab

542 raise ValueError("Unable to compute checksum for block schema") 

543 else: 

544 return f"sha256:{checksum}" 1ab

545 

546 def _field_has_secrets(self, field_name: str) -> bool: 1a

547 """Check if a field contains secrets based on the schema's secret_fields.""" 

548 secret_fields = self.model_json_schema().get("secret_fields", []) 

549 

550 # Check if field_name matches any secret field pattern 

551 for secret_field in secret_fields: 

552 if secret_field == field_name: 

553 return True 

554 elif secret_field.startswith(f"{field_name}."): 

555 # This field contains nested secrets 

556 return True 

557 elif secret_field.endswith(".*"): 

558 # Handle wildcard patterns like "field.*" 

559 prefix = secret_field[:-2] # Remove .* 

560 if field_name == prefix: 

561 return True 

562 

563 return False 

564 

565 def _to_block_document( 1a

566 self, 

567 name: Optional[str] = None, 

568 block_schema_id: Optional[UUID] = None, 

569 block_type_id: Optional[UUID] = None, 

570 is_anonymous: Optional[bool] = None, 

571 include_secrets: bool = False, 

572 ) -> BlockDocument: 

573 """ 

574 Creates the corresponding block document based on the data stored in a block. 

575 The corresponding block document name, block type ID, and block schema ID must 

576 either be passed into the method or configured on the block. 

577 

578 Args: 

579 name: The name of the created block document. Not required if anonymous. 

580 block_schema_id: UUID of the corresponding block schema. 

581 block_type_id: UUID of the corresponding block type. 

582 is_anonymous: if True, an anonymous block is created. Anonymous 

583 blocks are not displayed in the UI and used primarily for system 

584 operations and features that need to automatically generate blocks. 

585 

586 Returns: 

587 BlockDocument: Corresponding block document 

588 populated with the block's configured data. 

589 """ 

590 if is_anonymous is None: 

591 is_anonymous = self._is_anonymous or False 

592 

593 # name must be present if not anonymous 

594 if not is_anonymous and not name and not self._block_document_name: 

595 raise ValueError("No name provided, either as an argument or on the block.") 

596 

597 if not block_schema_id and not self._block_schema_id: 

598 raise ValueError( 

599 "No block schema ID provided, either as an argument or on the block." 

600 ) 

601 if not block_type_id and not self._block_type_id: 

602 raise ValueError( 

603 "No block type ID provided, either as an argument or on the block." 

604 ) 

605 

606 # The keys passed to `include` must NOT be aliases, else some items will be missed 

607 # i.e. must do `self.schema_` vs `self.schema` to get a `schema_ = Field(alias="schema")` 

608 # reported from https://github.com/PrefectHQ/prefect-dbt/issues/54 

609 data_keys = self.model_json_schema(by_alias=False)["properties"].keys() 

610 

611 # `block_document_data`` must return the aliased version for it to show in the UI 

612 block_document_data = self.model_dump( 

613 by_alias=True, 

614 include=data_keys, 

615 context={"include_secrets": include_secrets}, 

616 ) 

617 

618 # Ensure non-secret fields are JSON-serializable to avoid issues with types 

619 # like SemanticVersion when the BlockDocument is later serialized 

620 try: 

621 json_data = self.model_dump( 

622 mode="json", 

623 by_alias=True, 

624 include=data_keys, 

625 context={"include_secrets": include_secrets}, 

626 ) 

627 # Replace non-secret, non-Block fields with their JSON representation 

628 # We need to check the original field to determine if it's a secret or Block 

629 for key in data_keys: 

630 if key in block_document_data and key in json_data: 

631 field_value = getattr(self, key) 

632 # Only replace if the field doesn't contain secrets and is not a Block 

633 if not self._field_has_secrets(key) and not isinstance( 

634 field_value, Block 

635 ): 

636 block_document_data[key] = json_data[key] 

637 except Exception: 

638 # If JSON serialization fails, we'll handle it later 

639 pass 

640 

641 # Iterate through and find blocks that already have saved block documents to 

642 # create references to those saved block documents. 

643 for key in data_keys: 

644 field_value = getattr(self, key) 

645 if ( 

646 isinstance(field_value, Block) 

647 and field_value._block_document_id is not None 

648 ): 

649 block_document_data[key] = { 

650 "$ref": {"block_document_id": field_value._block_document_id} 

651 } 

652 

653 block_schema_id = block_schema_id or self._block_schema_id 

654 block_type_id = block_type_id or self._block_type_id 

655 

656 if block_schema_id is None: 

657 raise ValueError( 

658 "No block schema ID provided, either as an argument or on the block." 

659 ) 

660 if block_type_id is None: 

661 raise ValueError( 

662 "No block type ID provided, either as an argument or on the block." 

663 ) 

664 

665 return BlockDocument( 

666 id=self._block_document_id or uuid4(), 

667 name=(name or self._block_document_name) if not is_anonymous else None, 

668 block_schema_id=block_schema_id, 

669 block_type_id=block_type_id, 

670 block_type_name=self._block_type_name, 

671 data=block_document_data, 

672 block_schema=self._to_block_schema( 

673 block_type_id=block_type_id or self._block_type_id, 

674 ), 

675 block_type=self._to_block_type(), 

676 is_anonymous=is_anonymous, 

677 ) 

678 

679 @classmethod 1a

680 def _to_block_schema(cls, block_type_id: Optional[UUID] = None) -> BlockSchema: 1a

681 """ 

682 Creates the corresponding block schema of the block. 

683 The corresponding block_type_id must either be passed into 

684 the method or configured on the block. 

685 

686 Args: 

687 block_type_id: UUID of the corresponding block type. 

688 

689 Returns: 

690 BlockSchema: The corresponding block schema. 

691 """ 

692 fields = cls.model_json_schema() 1ab

693 return BlockSchema( 1ab

694 id=cls._block_schema_id if cls._block_schema_id is not None else uuid4(), 

695 checksum=cls._calculate_schema_checksum(), 

696 fields=fields, 

697 block_type_id=block_type_id or cls._block_type_id, 

698 block_type=cls._to_block_type(), 

699 capabilities=list(cls.get_block_capabilities()), 

700 version=cls.get_block_schema_version(), 

701 ) 

702 

703 @classmethod 1a

704 def _parse_docstring(cls) -> list[DocstringSection]: 1a

705 """ 

706 Parses the docstring into list of DocstringSection objects. 

707 Helper method used primarily to suppress irrelevant logs, e.g. 

708 `<module>:11: No type or annotation for parameter 'write_json'` 

709 because griffe is unable to parse the types from pydantic.BaseModel. 

710 """ 

711 if cls.__doc__ is None: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true1ab

712 return [] 

713 with disable_logger("griffe"): 1ab

714 docstring = Docstring(cls.__doc__) 1ab

715 parsed = parse(docstring, Parser.google) 1ab

716 return parsed 1ab

717 

718 @classmethod 1a

719 def get_description(cls) -> Optional[str]: 1a

720 """ 

721 Returns the description for the current block. Attempts to parse 

722 description from class docstring if an override is not defined. 

723 """ 

724 description = cls._description 1ab

725 # If no description override has been provided, find the first text section 

726 # and use that as the description 

727 if description is None and cls.__doc__ is not None: 1ab

728 parsed = cls._parse_docstring() 1ab

729 parsed_description = next( 1ab

730 ( 

731 section.as_dict().get("value") 

732 for section in parsed 

733 if section.kind == DocstringSectionKind.text 

734 ), 

735 None, 

736 ) 

737 if isinstance(parsed_description, str): 737 ↛ 739line 737 didn't jump to line 739 because the condition on line 737 was always true1ab

738 description = parsed_description.strip() 1ab

739 return description 1ab

740 

741 @classmethod 1a

742 def get_code_example(cls) -> Optional[str]: 1a

743 """ 

744 Returns the code example for the given block. Attempts to parse 

745 code example from the class docstring if an override is not provided. 

746 """ 

747 code_example = ( 1ab

748 dedent(text=cls._code_example) if cls._code_example is not None else None 

749 ) 

750 # If no code example override has been provided, attempt to find a examples 

751 # section or an admonition with the annotation "example" and use that as the 

752 # code example 

753 if code_example is None and cls.__doc__ is not None: 753 ↛ 772line 753 didn't jump to line 772 because the condition on line 753 was always true1ab

754 parsed = cls._parse_docstring() 1ab

755 for section in parsed: 1ab

756 # Section kind will be "examples" if Examples section heading is used. 

757 if section.kind == DocstringSectionKind.examples: 1ab

758 # Examples sections are made up of smaller sections that need to be 

759 # joined with newlines. Smaller sections are represented as tuples 

760 # with shape (DocstringSectionKind, str) 

761 code_example = "\n".join( 1ab

762 (part[1] for part in section.as_dict().get("value", [])) 

763 ) 

764 break 1ab

765 # Section kind will be "admonition" if Example section heading is used. 

766 if section.kind == DocstringSectionKind.admonition: 1ab

767 value = section.as_dict().get("value", {}) 1ab

768 if value.get("annotation") == "example": 768 ↛ 755line 768 didn't jump to line 755 because the condition on line 768 was always true1ab

769 code_example = value.get("description") 1ab

770 break 1ab

771 

772 if code_example is None: 1ab

773 # If no code example has been specified or extracted from the class 

774 # docstring, generate a sensible default 

775 code_example = cls._generate_code_example() 1ab

776 

777 return code_example 1ab

778 

779 @classmethod 1a

780 def _generate_code_example(cls) -> str: 1a

781 """Generates a default code example for the current class""" 

782 qualified_name = to_qualified_name(cls) 1ab

783 module_str = ".".join(qualified_name.split(".")[:-1]) 1ab

784 origin = cls.__pydantic_generic_metadata__.get("origin") or cls 1ab

785 class_name = origin.__name__ 1ab

786 block_variable_name = f"{cls.get_block_type_slug().replace('-', '_')}_block" 1ab

787 

788 return dedent( 1ab

789 f"""\ 

790 ```python 

791 from {module_str} import {class_name} 

792 

793 {block_variable_name} = {class_name}.load("BLOCK_NAME") 

794 ```""" 

795 ) 

796 

797 @classmethod 1a

798 def _to_block_type(cls) -> BlockType: 1a

799 """ 

800 Creates the corresponding block type of the block. 

801 

802 Returns: 

803 BlockType: The corresponding block type. 

804 """ 

805 return BlockType( 1ab

806 id=cls._block_type_id or uuid4(), 

807 slug=cls.get_block_type_slug(), 

808 name=cls.get_block_type_name(), 

809 logo_url=cls._logo_url, 

810 documentation_url=cls._documentation_url, 

811 description=cls.get_description(), 

812 code_example=cls.get_code_example(), 

813 ) 

814 

815 @classmethod 1a

816 def _from_block_document(cls, block_document: BlockDocument) -> Self: 1a

817 """ 

818 Instantiates a block from a given block document. The corresponding block class 

819 will be looked up in the block registry based on the corresponding block schema 

820 of the provided block document. 

821 

822 Args: 

823 block_document: The block document used to instantiate a block. 

824 

825 Raises: 

826 ValueError: If the provided block document doesn't have a corresponding block 

827 schema. 

828 

829 Returns: 

830 Block: Hydrated block with data from block document. 

831 """ 

832 if block_document.block_schema is None: 

833 raise ValueError( 

834 "Unable to determine block schema for provided block document" 

835 ) 

836 

837 block_cls = ( 

838 cls 

839 if cls.__name__ != "Block" 

840 # Look up the block class by dispatch 

841 else cls.get_block_class_from_schema(block_document.block_schema) 

842 ) 

843 

844 block = block_cls.model_validate(block_document.data) 

845 block._block_document_id = block_document.id 

846 block.__class__._block_schema_id = block_document.block_schema_id 

847 block.__class__._block_type_id = block_document.block_type_id 

848 block._block_document_name = block_document.name 

849 block._is_anonymous = block_document.is_anonymous 

850 block._define_metadata_on_nested_blocks( 

851 block_document.block_document_references 

852 ) 

853 

854 resources: Optional[ResourceTuple] = block._event_method_called_resources() 

855 if resources: 

856 kind = block._event_kind() 

857 resource, related = resources 

858 emit_event(event=f"{kind}.loaded", resource=resource, related=related) 

859 

860 return block 

861 

862 def _event_kind(self) -> str: 1a

863 return f"prefect.block.{self.get_block_type_slug()}" 

864 

865 def _event_method_called_resources(self) -> ResourceTuple | None: 1a

866 if not (self._block_document_id and self._block_document_name): 

867 return None 

868 

869 return ( 

870 { 

871 "prefect.resource.id": ( 

872 f"prefect.block-document.{self._block_document_id}" 

873 ), 

874 "prefect.resource.name": self._block_document_name, 

875 }, 

876 [ 

877 { 

878 "prefect.resource.id": ( 

879 f"prefect.block-type.{self.get_block_type_slug()}" 

880 ), 

881 "prefect.resource.role": "block-type", 

882 } 

883 ], 

884 ) 

885 

886 @classmethod 1a

887 def get_block_class_from_schema(cls: type[Self], schema: BlockSchema) -> type[Self]: 1a

888 """ 

889 Retrieve the block class implementation given a schema. 

890 """ 

891 return cls.get_block_class_from_key(block_schema_to_key(schema)) 

892 

893 @classmethod 1a

894 def get_block_class_from_key(cls: type[Self], key: str) -> type[Self]: 1a

895 """ 

896 Retrieve the block class implementation given a key. 

897 """ 

898 

899 # Ensure collections are imported and have the opportunity to register types 

900 # before looking up the block class, but only do this once 

901 load_prefect_collections() 

902 

903 try: 

904 return lookup_type(cls, key) 

905 except KeyError: 

906 message = f"No block class found for slug {key!r}." 

907 # Handle common blocks types used for storage, which is the primary use case for looking up blocks by key 

908 if key == "s3-bucket": 

909 message += " Please ensure that `prefect-aws` is installed." 

910 elif key == "gcs-bucket": 

911 message += " Please ensure that `prefect-gcp` is installed." 

912 elif key == "azure-blob-storage-container": 

913 message += " Please ensure that `prefect-azure` is installed." 

914 else: 

915 message += " Please ensure that the block class is available in the current environment." 

916 raise UnknownBlockType(message) 

917 

918 def _define_metadata_on_nested_blocks( 1a

919 self, block_document_references: dict[str, dict[str, Any]] 

920 ): 

921 """ 

922 Recursively populates metadata fields on nested blocks based on the 

923 provided block document references. 

924 """ 

925 for item in block_document_references.items(): 

926 field_name, block_document_reference = item 

927 nested_block = getattr(self, field_name) 

928 if isinstance(nested_block, Block): 

929 nested_block_document_info = block_document_reference.get( 

930 "block_document", {} 

931 ) 

932 nested_block._define_metadata_on_nested_blocks( 

933 nested_block_document_info.get("block_document_references", {}) 

934 ) 

935 nested_block_document_id = nested_block_document_info.get("id") 

936 nested_block._block_document_id = ( 

937 UUID(nested_block_document_id) if nested_block_document_id else None 

938 ) 

939 nested_block._block_document_name = nested_block_document_info.get( 

940 "name" 

941 ) 

942 nested_block._is_anonymous = nested_block_document_info.get( 

943 "is_anonymous" 

944 ) 

945 

946 @classmethod 1a

947 async def _aget_block_document( 1a

948 cls, 

949 name: str, 

950 client: "PrefectClient", 

951 ) -> tuple[BlockDocument, str]: 

952 if cls.__name__ == "Block": 

953 block_type_slug, block_document_name = name.split("/", 1) 

954 else: 

955 block_type_slug = cls.get_block_type_slug() 

956 block_document_name = name 

957 

958 try: 

959 block_document = await client.read_block_document_by_name( 

960 name=block_document_name, block_type_slug=block_type_slug 

961 ) 

962 except prefect.exceptions.ObjectNotFound as e: 

963 raise ValueError( 

964 f"Unable to find block document named {block_document_name} for block" 

965 f" type {block_type_slug}" 

966 ) from e 

967 

968 return block_document, block_document_name 

969 

970 @classmethod 1a

971 def _get_block_document( 1a

972 cls, 

973 name: str, 

974 client: "SyncPrefectClient", 

975 ) -> tuple[BlockDocument, str]: 

976 if cls.__name__ == "Block": 

977 block_type_slug, block_document_name = name.split("/", 1) 

978 else: 

979 block_type_slug = cls.get_block_type_slug() 

980 block_document_name = name 

981 

982 try: 

983 block_document = client.read_block_document_by_name( 

984 name=block_document_name, block_type_slug=block_type_slug 

985 ) 

986 except prefect.exceptions.ObjectNotFound as e: 

987 raise ValueError( 

988 f"Unable to find block document named {block_document_name} for block" 

989 f" type {block_type_slug}" 

990 ) from e 

991 

992 return block_document, block_document_name 

993 

994 @classmethod 1a

995 @inject_client 1a

996 async def _get_block_document_by_id( 1a

997 cls, 

998 block_document_id: Union[str, uuid.UUID], 

999 client: "PrefectClient | None" = None, 

1000 ): 

1001 if TYPE_CHECKING: 

1002 assert isinstance(client, PrefectClient) 

1003 if isinstance(block_document_id, str): 

1004 try: 

1005 block_document_id = UUID(block_document_id) 

1006 except ValueError: 

1007 raise ValueError( 

1008 f"Block document ID {block_document_id!r} is not a valid UUID" 

1009 ) 

1010 

1011 try: 

1012 block_document = await client.read_block_document( 

1013 block_document_id=block_document_id 

1014 ) 

1015 except prefect.exceptions.ObjectNotFound: 

1016 raise ValueError( 

1017 f"Unable to find block document with ID {block_document_id!r}" 

1018 ) 

1019 

1020 return block_document, block_document.name 

1021 

1022 @classmethod 1a

1023 @inject_client 1a

1024 async def aload( 1a

1025 cls, 

1026 name: str, 

1027 validate: bool = True, 

1028 client: Optional["PrefectClient"] = None, 

1029 ) -> "Self": 

1030 """ 

1031 Retrieves data from the block document with the given name for the block type 

1032 that corresponds with the current class and returns an instantiated version of 

1033 the current class with the data stored in the block document. 

1034 

1035 If a block document for a given block type is saved with a different schema 

1036 than the current class calling `aload`, a warning will be raised. 

1037 

1038 If the current class schema is a subset of the block document schema, the block 

1039 can be loaded as normal using the default `validate = True`. 

1040 

1041 If the current class schema is a superset of the block document schema, `aload` 

1042 must be called with `validate` set to False to prevent a validation error. In 

1043 this case, the block attributes will default to `None` and must be set manually 

1044 and saved to a new block document before the block can be used as expected. 

1045 

1046 Args: 

1047 name: The name or slug of the block document. A block document slug is a 

1048 string with the format `<block_type_slug>/<block_document_name>` 

1049 validate: If False, the block document will be loaded without Pydantic 

1050 validating the block schema. This is useful if the block schema has 

1051 changed client-side since the block document referred to by `name` was saved. 

1052 client: The client to use to load the block document. If not provided, the 

1053 default client will be injected. 

1054 

1055 Raises: 

1056 ValueError: If the requested block document is not found. 

1057 

1058 Returns: 

1059 An instance of the current class hydrated with the data stored in the 

1060 block document with the specified name. 

1061 

1062 Examples: 

1063 Load from a Block subclass with a block document name: 

1064 ```python 

1065 class Custom(Block): 

1066 message: str 

1067 

1068 Custom(message="Hello!").save("my-custom-message") 

1069 

1070 loaded_block = await Custom.aload("my-custom-message") 

1071 ``` 

1072 

1073 Load from Block with a block document slug: 

1074 ```python 

1075 class Custom(Block): 

1076 message: str 

1077 

1078 Custom(message="Hello!").save("my-custom-message") 

1079 

1080 loaded_block = await Block.aload("custom/my-custom-message") 

1081 ``` 

1082 

1083 Migrate a block document to a new schema: 

1084 ```python 

1085 # original class 

1086 class Custom(Block): 

1087 message: str 

1088 

1089 Custom(message="Hello!").save("my-custom-message") 

1090 

1091 # Updated class with new required field 

1092 class Custom(Block): 

1093 message: str 

1094 number_of_ducks: int 

1095 

1096 loaded_block = await Custom.aload("my-custom-message", validate=False) 

1097 

1098 # Prints UserWarning about schema mismatch 

1099 

1100 loaded_block.number_of_ducks = 42 

1101 

1102 loaded_block.save("my-custom-message", overwrite=True) 

1103 ``` 

1104 """ 

1105 if TYPE_CHECKING: 

1106 assert isinstance(client, PrefectClient) 

1107 block_document, _ = await cls._aget_block_document(name, client=client) 

1108 

1109 return cls._load_from_block_document(block_document, validate=validate) 

1110 

1111 @classmethod 1a

1112 @async_dispatch(aload) 1a

1113 def load( 1a

1114 cls, 

1115 name: str, 

1116 validate: bool = True, 

1117 client: Optional["PrefectClient"] = None, 

1118 ) -> "Self": 

1119 """ 

1120 Retrieves data from the block document with the given name for the block type 

1121 that corresponds with the current class and returns an instantiated version of 

1122 the current class with the data stored in the block document. 

1123 

1124 If a block document for a given block type is saved with a different schema 

1125 than the current class calling `load`, a warning will be raised. 

1126 

1127 If the current class schema is a subset of the block document schema, the block 

1128 can be loaded as normal using the default `validate = True`. 

1129 

1130 If the current class schema is a superset of the block document schema, `load` 

1131 must be called with `validate` set to False to prevent a validation error. In 

1132 this case, the block attributes will default to `None` and must be set manually 

1133 and saved to a new block document before the block can be used as expected. 

1134 

1135 Args: 

1136 name: The name or slug of the block document. A block document slug is a 

1137 string with the format `<block_type_slug>/<block_document_name>` 

1138 validate: If False, the block document will be loaded without Pydantic 

1139 validating the block schema. This is useful if the block schema has 

1140 changed client-side since the block document referred to by `name` was saved. 

1141 client: The client to use to load the block document. If not provided, the 

1142 default client will be injected. 

1143 

1144 Raises: 

1145 ValueError: If the requested block document is not found. 

1146 

1147 Returns: 

1148 An instance of the current class hydrated with the data stored in the 

1149 block document with the specified name. 

1150 

1151 Examples: 

1152 Load from a Block subclass with a block document name: 

1153 ```python 

1154 class Custom(Block): 

1155 message: str 

1156 

1157 Custom(message="Hello!").save("my-custom-message") 

1158 

1159 loaded_block = Custom.load("my-custom-message") 

1160 ``` 

1161 

1162 Load from Block with a block document slug: 

1163 ```python 

1164 class Custom(Block): 

1165 message: str 

1166 

1167 Custom(message="Hello!").save("my-custom-message") 

1168 

1169 loaded_block = Block.load("custom/my-custom-message") 

1170 ``` 

1171 

1172 Migrate a block document to a new schema: 

1173 ```python 

1174 # original class 

1175 class Custom(Block): 

1176 message: str 

1177 

1178 Custom(message="Hello!").save("my-custom-message") 

1179 

1180 # Updated class with new required field 

1181 class Custom(Block): 

1182 message: str 

1183 number_of_ducks: int 

1184 

1185 loaded_block = Custom.load("my-custom-message", validate=False) 

1186 

1187 # Prints UserWarning about schema mismatch 

1188 

1189 loaded_block.number_of_ducks = 42 

1190 

1191 loaded_block.save("my-custom-message", overwrite=True) 

1192 ``` 

1193 """ 

1194 # Need to use a `PrefectClient` here to ensure `Block.load` and `Block.aload` signatures match 

1195 # TODO: replace with only sync client once all internal calls are updated to use `Block.aload` and `@async_dispatch` is removed 

1196 if client is None: 

1197 # If a client wasn't provided, we get to use a sync client 

1198 from prefect.client.orchestration import get_client 

1199 

1200 with get_client(sync_client=True) as sync_client: 

1201 block_document, _ = cls._get_block_document(name, client=sync_client) 

1202 else: 

1203 # If a client was provided, reuse it, even though it's async, to avoid excessive client creation 

1204 block_document, _ = run_coro_as_sync( 

1205 cls._aget_block_document(name, client=client) 

1206 ) 

1207 return cls._load_from_block_document(block_document, validate=validate) 

1208 

1209 @classmethod 1a

1210 @sync_compatible 1a

1211 @inject_client 1a

1212 async def load_from_ref( 1a

1213 cls, 

1214 ref: Union[str, UUID, dict[str, Any]], 

1215 validate: bool = True, 

1216 client: "PrefectClient | None" = None, 

1217 ) -> Self: 

1218 """ 

1219 Retrieves data from the block document by given reference for the block type 

1220 that corresponds with the current class and returns an instantiated version of 

1221 the current class with the data stored in the block document. 

1222 

1223 Provided reference can be a block document ID, or a reference data in dictionary format. 

1224 Supported dictionary reference formats are: 

1225 - `{"block_document_id": <block_document_id>}` 

1226 - `{"block_document_slug": <block_document_slug>}` 

1227 

1228 If a block document for a given block type is saved with a different schema 

1229 than the current class calling `load`, a warning will be raised. 

1230 

1231 If the current class schema is a subset of the block document schema, the block 

1232 can be loaded as normal using the default `validate = True`. 

1233 

1234 If the current class schema is a superset of the block document schema, `load` 

1235 must be called with `validate` set to False to prevent a validation error. In 

1236 this case, the block attributes will default to `None` and must be set manually 

1237 and saved to a new block document before the block can be used as expected. 

1238 

1239 Args: 

1240 ref: The reference to the block document. This can be a block document ID, 

1241 or one of supported dictionary reference formats. 

1242 validate: If False, the block document will be loaded without Pydantic 

1243 validating the block schema. This is useful if the block schema has 

1244 changed client-side since the block document referred to by `name` was saved. 

1245 client: The client to use to load the block document. If not provided, the 

1246 default client will be injected. 

1247 

1248 Raises: 

1249 ValueError: If invalid reference format is provided. 

1250 ValueError: If the requested block document is not found. 

1251 

1252 Returns: 

1253 An instance of the current class hydrated with the data stored in the 

1254 block document with the specified name. 

1255 

1256 """ 

1257 if TYPE_CHECKING: 

1258 assert isinstance(client, PrefectClient) 

1259 block_document = None 

1260 if isinstance(ref, (str, UUID)): 

1261 block_document, _ = await cls._get_block_document_by_id(ref, client=client) 

1262 else: 

1263 if block_document_id := ref.get("block_document_id"): 

1264 block_document, _ = await cls._get_block_document_by_id( 

1265 block_document_id, client=client 

1266 ) 

1267 elif block_document_slug := ref.get("block_document_slug"): 

1268 block_document, _ = await cls._aget_block_document( 

1269 block_document_slug, client=client 

1270 ) 

1271 

1272 if not block_document: 

1273 raise ValueError(f"Invalid reference format {ref!r}.") 

1274 

1275 return cls._load_from_block_document(block_document, validate=validate) 

1276 

1277 @classmethod 1a

1278 def _load_from_block_document( 1a

1279 cls, block_document: BlockDocument, validate: bool = True 

1280 ) -> Self: 

1281 """ 

1282 Loads a block from a given block document. 

1283 

1284 If a block document for a given block type is saved with a different schema 

1285 than the current class calling `load`, a warning will be raised. 

1286 

1287 If the current class schema is a subset of the block document schema, the block 

1288 can be loaded as normal using the default `validate = True`. 

1289 

1290 If the current class schema is a superset of the block document schema, `load` 

1291 must be called with `validate` set to False to prevent a validation error. In 

1292 this case, the block attributes will default to `None` and must be set manually 

1293 and saved to a new block document before the block can be used as expected. 

1294 

1295 Args: 

1296 block_document: The block document used to instantiate a block. 

1297 validate: If False, the block document will be loaded without Pydantic 

1298 validating the block schema. This is useful if the block schema has 

1299 changed client-side since the block document referred to by `name` was saved. 

1300 

1301 Raises: 

1302 ValueError: If the requested block document is not found. 

1303 

1304 Returns: 

1305 An instance of the current class hydrated with the data stored in the 

1306 block document with the specified name. 

1307 

1308 """ 

1309 try: 

1310 return cls._from_block_document(block_document) 

1311 except ValidationError as e: 

1312 if not validate: 

1313 missing_fields = tuple(err["loc"][0] for err in e.errors()) 

1314 missing_block_data: dict[str, None] = { 

1315 field: None for field in missing_fields if isinstance(field, str) 

1316 } 

1317 warnings.warn( 

1318 f"Could not fully load {block_document.name!r} of block type" 

1319 f" {cls.get_block_type_slug()!r} - this is likely because one or more" 

1320 " required fields were added to the schema for" 

1321 f" {cls.__name__!r} that did not exist on the class when this block" 

1322 " was last saved. Please specify values for new field(s):" 

1323 f" {listrepr(missing_fields)}, then run" 

1324 f' `{cls.__name__}.save("{block_document.name}", overwrite=True)`,' 

1325 " and load this block again before attempting to use it." 

1326 ) 

1327 return cls.model_construct(**block_document.data, **missing_block_data) 

1328 raise RuntimeError( 

1329 f"Unable to load {block_document.name!r} of block type" 

1330 f" {cls.get_block_type_slug()!r} due to failed validation. To load without" 

1331 " validation, try loading again with `validate=False`." 

1332 ) from e 

1333 

1334 @staticmethod 1a

1335 def is_block_class(block: Any) -> TypeGuard[type["Block"]]: 1a

1336 return _is_subclass(block, Block) 1ab

1337 

1338 @staticmethod 1a

1339 def annotation_refers_to_block_class(annotation: Any) -> bool: 1a

1340 if Block.is_block_class(annotation): 

1341 return True 

1342 

1343 if get_origin(annotation) in UnionTypes: 

1344 for annotation in get_args(annotation): 

1345 if Block.is_block_class(annotation): 

1346 return True 

1347 

1348 return False 

1349 

1350 @classmethod 1a

1351 @sync_compatible 1a

1352 @inject_client 1a

1353 async def register_type_and_schema(cls, client: Optional["PrefectClient"] = None): 1a

1354 """ 

1355 Makes block available for configuration with current Prefect API. 

1356 Recursively registers all nested blocks. Registration is idempotent. 

1357 

1358 Args: 

1359 client: Optional client to use for registering type and schema with the 

1360 Prefect API. A new client will be created and used if one is not 

1361 provided. 

1362 """ 

1363 if TYPE_CHECKING: 

1364 assert isinstance(client, PrefectClient) 

1365 if cls.__name__ == "Block": 

1366 raise InvalidBlockRegistration( 

1367 "`register_type_and_schema` should be called on a Block " 

1368 "subclass and not on the Block class directly." 

1369 ) 

1370 if ABC in getattr(cls, "__bases__", []): 

1371 raise InvalidBlockRegistration( 

1372 "`register_type_and_schema` should be called on a Block " 

1373 "subclass and not on a Block interface class directly." 

1374 ) 

1375 

1376 async def register_blocks_in_annotation(annotation: type) -> None: 

1377 """Walk through the annotation and register any nested blocks.""" 

1378 if Block.is_block_class(annotation): 

1379 coro = annotation.register_type_and_schema(client=client) 

1380 if TYPE_CHECKING: 

1381 assert isinstance(coro, Coroutine) 

1382 await coro 

1383 elif get_origin(annotation) in NestedTypes: 

1384 for inner_annotation in get_args(annotation): 

1385 await register_blocks_in_annotation(inner_annotation) 

1386 

1387 for field in cls.model_fields.values(): 

1388 if field.annotation is not None: 

1389 await register_blocks_in_annotation(field.annotation) 

1390 

1391 try: 

1392 block_type = await client.read_block_type_by_slug( 

1393 slug=cls.get_block_type_slug() 

1394 ) 

1395 

1396 cls._block_type_id = block_type.id 

1397 

1398 local_block_type = cls._to_block_type() 

1399 if _should_update_block_type( 

1400 local_block_type=local_block_type, server_block_type=block_type 

1401 ): 

1402 await client.update_block_type( 

1403 block_type_id=block_type.id, 

1404 block_type=BlockTypeUpdate( 

1405 **local_block_type.model_dump( 

1406 include={ 

1407 "logo_url", 

1408 "documentation_url", 

1409 "description", 

1410 "code_example", 

1411 } 

1412 ) 

1413 ), 

1414 ) 

1415 except prefect.exceptions.ObjectNotFound: 

1416 block_type_create = BlockTypeCreate( 

1417 **cls._to_block_type().model_dump( 

1418 include={ 

1419 "name", 

1420 "slug", 

1421 "logo_url", 

1422 "documentation_url", 

1423 "description", 

1424 "code_example", 

1425 } 

1426 ) 

1427 ) 

1428 block_type = await client.create_block_type(block_type=block_type_create) 

1429 cls._block_type_id = block_type.id 

1430 

1431 try: 

1432 block_schema = await client.read_block_schema_by_checksum( 

1433 checksum=cls._calculate_schema_checksum(), 

1434 version=cls.get_block_schema_version(), 

1435 ) 

1436 except prefect.exceptions.ObjectNotFound: 

1437 block_schema_create = BlockSchemaCreate( 

1438 **cls._to_block_schema(block_type_id=block_type.id).model_dump( 

1439 include={"fields", "block_type_id", "capabilities", "version"} 

1440 ) 

1441 ) 

1442 block_schema = await client.create_block_schema( 

1443 block_schema=block_schema_create 

1444 ) 

1445 

1446 cls._block_schema_id = block_schema.id 

1447 

1448 @inject_client 1a

1449 async def _save( 1a

1450 self, 

1451 name: Optional[str] = None, 

1452 is_anonymous: bool = False, 

1453 overwrite: bool = False, 

1454 client: Optional["PrefectClient"] = None, 

1455 ) -> UUID: 

1456 """ 

1457 Saves the values of a block as a block document with an option to save as an 

1458 anonymous block document. 

1459 

1460 Args: 

1461 name: User specified name to give saved block document which can later be used to load the 

1462 block document. 

1463 is_anonymous: Boolean value specifying whether the block document is anonymous. Anonymous 

1464 blocks are intended for system use and are not shown in the UI. Anonymous blocks do not 

1465 require a user-supplied name. 

1466 overwrite: Boolean value specifying if values should be overwritten if a block document with 

1467 the specified name already exists. 

1468 

1469 Raises: 

1470 ValueError: If a name is not given and `is_anonymous` is `False` or a name is given and 

1471 `is_anonymous` is `True`. 

1472 """ 

1473 if TYPE_CHECKING: 

1474 assert isinstance(client, PrefectClient) 

1475 if name is None and not is_anonymous: 

1476 if self._block_document_name is None: 

1477 raise ValueError( 

1478 "You're attempting to save a block document without a name." 

1479 " Please either call `save` with a `name` or pass" 

1480 " `is_anonymous=True` to save an anonymous block." 

1481 ) 

1482 else: 

1483 name = self._block_document_name 

1484 

1485 self._is_anonymous = is_anonymous 

1486 

1487 # Ensure block type and schema are registered before saving block document. 

1488 coro = self.register_type_and_schema(client=client) 

1489 if TYPE_CHECKING: 

1490 assert isinstance(coro, Coroutine) 

1491 await coro 

1492 

1493 block_document = None 

1494 try: 

1495 block_document_create = BlockDocumentCreate( 

1496 **self._to_block_document(name=name, include_secrets=True).model_dump( 

1497 include={ 

1498 "name", 

1499 "block_schema_id", 

1500 "block_type_id", 

1501 "data", 

1502 "is_anonymous", 

1503 } 

1504 ) 

1505 ) 

1506 block_document = await client.create_block_document( 

1507 block_document=block_document_create 

1508 ) 

1509 except prefect.exceptions.ObjectAlreadyExists as err: 

1510 if overwrite: 

1511 block_document_id = self._block_document_id 

1512 if block_document_id is None and name is not None: 

1513 existing_block_document = await client.read_block_document_by_name( 

1514 name=name, block_type_slug=self.get_block_type_slug() 

1515 ) 

1516 block_document_id = existing_block_document.id 

1517 if TYPE_CHECKING: 

1518 # We know that the block document id is not None here because we 

1519 # only get here if the block document already exists 

1520 assert isinstance(block_document_id, UUID) 

1521 block_document_update = BlockDocumentUpdate( 

1522 **self._to_block_document( 

1523 name=name, include_secrets=True 

1524 ).model_dump(include={"block_schema_id", "data"}) 

1525 ) 

1526 await client.update_block_document( 

1527 block_document_id=block_document_id, 

1528 block_document=block_document_update, 

1529 ) 

1530 block_document = await client.read_block_document( 

1531 block_document_id=block_document_id 

1532 ) 

1533 else: 

1534 raise ValueError( 

1535 "You are attempting to save values with a name that is already in" 

1536 " use for this block type. If you would like to overwrite the" 

1537 " values that are saved, then save with `overwrite=True`." 

1538 ) from err 

1539 

1540 # Update metadata on block instance for later use. 

1541 self._block_document_name = block_document.name 

1542 self._block_document_id = block_document.id 

1543 return self._block_document_id 

1544 

1545 @sync_compatible 1a

1546 async def save( 1a

1547 self, 

1548 name: Optional[str] = None, 

1549 overwrite: bool = False, 

1550 client: Optional["PrefectClient"] = None, 

1551 ): 

1552 """ 

1553 Saves the values of a block as a block document. 

1554 

1555 Args: 

1556 name: User specified name to give saved block document which can later be used to load the 

1557 block document. 

1558 overwrite: Boolean value specifying if values should be overwritten if a block document with 

1559 the specified name already exists. 

1560 

1561 """ 

1562 document_id = await self._save(name=name, overwrite=overwrite, client=client) 

1563 

1564 return document_id 

1565 

1566 @classmethod 1a

1567 @sync_compatible 1a

1568 @inject_client 1a

1569 async def delete( 1a

1570 cls, 

1571 name: str, 

1572 client: Optional["PrefectClient"] = None, 

1573 ): 

1574 if TYPE_CHECKING: 

1575 assert isinstance(client, PrefectClient) 

1576 block_document, _ = await cls._aget_block_document(name, client=client) 

1577 

1578 await client.delete_block_document(block_document.id) 

1579 

1580 def get_block_placeholder(self) -> str: 1a

1581 """ 

1582 Returns the block placeholder for the current block which can be used for 

1583 templating. 

1584 

1585 Returns: 

1586 str: The block placeholder for the current block in the format 

1587 `prefect.blocks.{block_type_name}.{block_document_name}` 

1588 

1589 Raises: 

1590 BlockNotSavedError: Raised if the block has not been saved. 

1591 

1592 If a block has not been saved, the return value will be `None`. 

1593 """ 

1594 block_document_name = self._block_document_name 

1595 if not block_document_name: 

1596 raise BlockNotSavedError( 

1597 "Could not generate block placeholder for unsaved block." 

1598 ) 

1599 

1600 return f"prefect.blocks.{self.get_block_type_slug()}.{block_document_name}" 

1601 

1602 @classmethod 1a

1603 def model_json_schema( 1a

1604 cls, 

1605 by_alias: bool = True, 

1606 ref_template: str = "#/definitions/{model}", 

1607 schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema, 

1608 mode: Literal["validation", "serialization"] = "validation", 

1609 ) -> dict[str, Any]: 

1610 """TODO: stop overriding this method - use GenerateSchema in ConfigDict instead?""" 

1611 schema = super().model_json_schema( 1ab

1612 by_alias, ref_template, schema_generator, mode 

1613 ) 

1614 

1615 # ensure backwards compatibility by copying $defs into definitions 

1616 if "$defs" in schema: 1ab

1617 schema["definitions"] = schema.pop("$defs") 1ab

1618 

1619 schema = remove_nested_keys(["additionalProperties"], schema) 1ab

1620 return schema 1ab

1621 

1622 @classmethod 1a

1623 def model_validate( 1a

1624 cls: type[Self], 

1625 obj: dict[str, Any] | Any, 

1626 *, 

1627 strict: bool | None = None, 

1628 from_attributes: bool | None = None, 

1629 context: dict[str, Any] | None = None, 

1630 ) -> Self: 

1631 if isinstance(obj, dict): 

1632 obj = cast(dict[str, Any], obj) 

1633 extra_serializer_fields = { 

1634 "_block_document_id", 

1635 "_block_document_name", 

1636 "_is_anonymous", 

1637 }.intersection(obj.keys()) 

1638 for field in extra_serializer_fields: 

1639 obj.pop(field, None) 

1640 

1641 return super().model_validate( 

1642 obj, 

1643 strict=strict, 

1644 from_attributes=from_attributes, 

1645 context=context, 

1646 ) 

1647 

1648 def model_dump( 1a

1649 self, 

1650 *, 

1651 mode: Literal["json", "python"] | str = "python", 

1652 include: "IncEx | None" = None, 

1653 exclude: "IncEx | None" = None, 

1654 context: dict[str, Any] | None = None, 

1655 by_alias: bool = False, 

1656 exclude_unset: bool = False, 

1657 exclude_defaults: bool = False, 

1658 exclude_none: bool = False, 

1659 round_trip: bool = False, 

1660 warnings: bool | Literal["none", "warn", "error"] = True, 

1661 serialize_as_any: bool = False, 

1662 ) -> dict[str, Any]: 

1663 d = super().model_dump( 

1664 mode=mode, 

1665 include=include, 

1666 exclude=exclude, 

1667 context=context, 

1668 by_alias=by_alias, 

1669 exclude_unset=exclude_unset, 

1670 exclude_defaults=exclude_defaults, 

1671 exclude_none=exclude_none, 

1672 round_trip=round_trip, 

1673 warnings=warnings, 

1674 serialize_as_any=serialize_as_any, 

1675 ) 

1676 

1677 extra_serializer_fields = { 

1678 "block_type_slug", 

1679 "_block_document_id", 

1680 "_block_document_name", 

1681 "_is_anonymous", 

1682 }.intersection(d.keys()) 

1683 

1684 for field in extra_serializer_fields: 

1685 if (include and field not in include) or (exclude and field in exclude): 

1686 d.pop(field) 

1687 

1688 return d