Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_schemas.py: 66%

191 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Functions for interacting with block schema ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6import json 1d

7from copy import copy 1d

8from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union 1d

9from uuid import UUID 1d

10 

11import sqlalchemy as sa 1d

12from sqlalchemy import delete, select 1d

13from sqlalchemy.ext.asyncio import AsyncSession 1d

14 

15from prefect.server import schemas 1d

16from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1d

17from prefect.server.models.block_types import read_block_type_by_slug 1d

18from prefect.server.schemas.actions import BlockSchemaCreate 1d

19from prefect.server.schemas.core import BlockSchema, BlockSchemaReference 1d

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1d

22 from prefect.client.schemas.actions import ( 

23 BlockSchemaCreate as ClientBlockSchemaCreate, 

24 ) 

25 from prefect.client.schemas.objects import BlockSchema as ClientBlockSchema 

26 

27 

28class MissingBlockTypeException(Exception): 1d

29 """Raised when the block type corresponding to a block schema cannot be found""" 

30 

31 

32@db_injector 1d

33async def create_block_schema( 1d

34 db: PrefectDBInterface, 

35 session: AsyncSession, 

36 block_schema: Union[ 

37 schemas.actions.BlockSchemaCreate, 

38 schemas.core.BlockSchema, 

39 "ClientBlockSchemaCreate", 

40 "ClientBlockSchema", 

41 ], 

42 override: bool = False, 

43 definitions: Optional[dict[str, Any]] = None, 

44) -> Union[BlockSchema, orm_models.BlockSchema]: 

45 """ 

46 Create a new block schema. 

47 

48 Args: 

49 session: A database session 

50 block_schema: a block schema object 

51 definitions: Definitions of fields from block schema fields 

52 attribute. Used when recursively creating nested block schemas 

53 

54 Returns: 

55 block_schema: an ORM block schema model 

56 """ 

57 from prefect.blocks.core import Block, _get_non_block_reference_definitions 1ab

58 

59 # We take a shortcut in many unit tests and in block registration to pass client 

60 # models directly to this function. We will support this by converting them to 

61 # the appropriate server model. 

62 if not isinstance(block_schema, schemas.actions.BlockSchemaCreate): 1ab

63 block_schema = schemas.actions.BlockSchemaCreate.model_validate( 1ab

64 block_schema.model_dump( 

65 mode="json", 

66 exclude={"id", "created", "updated", "checksum", "block_type"}, 

67 ) 

68 ) 

69 

70 insert_values = block_schema.model_dump_for_orm( 1ab

71 exclude_unset=False, 

72 exclude={"block_type", "id", "created", "updated"}, 

73 ) 

74 

75 definitions = definitions or block_schema.fields.get("definitions") 1ab

76 fields_for_checksum = insert_values["fields"] 1ab

77 if definitions: 1ab

78 # Ensure definitions are available if this is a nested schema 

79 # that is being registered 

80 fields_for_checksum["definitions"] = definitions 1ab

81 checksum = Block._calculate_schema_checksum(fields_for_checksum) 1ab

82 

83 # Check for existing block schema based on calculated checksum 

84 existing_block_schema = await read_block_schema_by_checksum( 1ab

85 session=session, checksum=checksum, version=block_schema.version 

86 ) 

87 # Return existing block schema if it exists. Allows block schema creation to be called multiple 

88 # times for the same schema without errors. 

89 if existing_block_schema: 

90 return existing_block_schema 

91 

92 insert_values["checksum"] = checksum 

93 

94 if definitions: 

95 # Get non block definitions for saving to the DB. 

96 non_block_definitions = _get_non_block_reference_definitions( 

97 insert_values["fields"], definitions 

98 ) 

99 if non_block_definitions: 

100 insert_values["fields"]["definitions"] = ( 

101 _get_non_block_reference_definitions( 

102 insert_values["fields"], definitions 

103 ) 

104 ) 

105 else: 

106 # Prevent storing definitions for blocks. Those are reconstructed on read. 

107 insert_values["fields"].pop("definitions", None) 

108 

109 # Prevent saving block schema references in the block_schema table. They have 

110 # their own table. 

111 block_schema_references: Dict = insert_values["fields"].pop( 

112 "block_schema_references", {} 

113 ) 

114 

115 insert_stmt = db.queries.insert(db.BlockSchema).values(**insert_values) 

116 if override: 

117 insert_stmt = insert_stmt.on_conflict_do_update( 

118 index_elements=db.orm.block_schema_unique_upsert_columns, 

119 set_=insert_values, 

120 ) 

121 await session.execute(insert_stmt) 1a

122 

123 query = ( 

124 sa.select(db.BlockSchema) 

125 .where( 

126 db.BlockSchema.checksum == insert_values["checksum"], 

127 ) 

128 .order_by(db.BlockSchema.created.desc()) 

129 .limit(1) 

130 .execution_options(populate_existing=True) 

131 ) 

132 

133 if block_schema.version is not None: 

134 query = query.where(db.BlockSchema.version == block_schema.version) 

135 

136 result = await session.execute(query) 1a

137 created_block_schema = copy(result.scalar_one()) 

138 

139 await _register_nested_block_schemas( 1a

140 db, 

141 session=session, 

142 parent_block_schema_id=created_block_schema.id, 

143 block_schema_references=block_schema_references, 

144 base_fields=insert_values["fields"], 

145 definitions=definitions, 

146 override=override, 

147 ) 

148 

149 created_block_schema.fields["block_schema_references"] = block_schema_references 

150 if definitions is not None: 

151 created_block_schema.fields["definitions"] = definitions 

152 

153 return created_block_schema 

154 

155 

156async def _register_nested_block_schemas( 1d

157 db: PrefectDBInterface, 

158 session: AsyncSession, 

159 parent_block_schema_id: UUID, 

160 block_schema_references: dict[str, Union[dict[str, str], List[dict[str, str]]]], 

161 base_fields: Dict, 

162 definitions: Optional[Dict], 

163 override: bool = False, 

164) -> None: 

165 """ 

166 Iterates through each of the block schema references declared on the block schema. 

167 Attempts to register each of the nested block schemas if they have not already been 

168 registered. An error is thrown if the corresponding block type for a block schema 

169 has not been registered. 

170 

171 Args: 

172 session: A database session. 

173 parent_block_schema_id: The ID of the parent block schema. 

174 block_schema_references: A dictionary containing the block schema references for 

175 the child block schemas of the parent block schema. 

176 base_fields: The field name and type declarations for the parent block schema. 

177 definitions: A dictionary of the field name and type declarations of each 

178 child block schema. 

179 override: Flag controlling if a block schema should updated in place. 

180 """ 

181 for reference_name, reference_values in block_schema_references.items(): 1a

182 # Operate on a list so that we can share the same code paths for union cases 

183 reference_values = ( 1a

184 reference_values 

185 if isinstance(reference_values, list) 

186 else [reference_values] 

187 ) 

188 for reference_values_entry in reference_values: 188 ↛ 181line 188 didn't jump to line 181 because the loop on line 188 didn't complete1a

189 # Check to make sure that associated block type exists 

190 reference_block_type = await read_block_type_by_slug( 1a

191 session=session, 

192 block_type_slug=reference_values_entry["block_type_slug"], 

193 ) 

194 if reference_block_type is None: 

195 raise MissingBlockTypeException( 

196 "Cannot create block schema because block type" 

197 f" {reference_values_entry['block_type_slug']!r} was not found.Did" 

198 " you forget to register the block type?" 

199 ) 

200 

201 reference_block_schema: Union[BlockSchema, orm_models.BlockSchema, None] 

202 

203 # Checks to see if the visited block schema has been previously created 

204 reference_block_schema = await read_block_schema_by_checksum( 1a

205 session=session, 

206 checksum=reference_values_entry["block_schema_checksum"], 

207 ) 

208 # Attempts to create block schema since it has not already been registered 

209 if reference_block_schema is None: 

210 if definitions is None: 

211 raise ValueError( 

212 "Unable to create nested block schema due to missing" 

213 " definitions in root block schema fields" 

214 ) 

215 sub_block_schema_fields = _get_fields_for_child_schema( 

216 db, definitions, base_fields, reference_name, reference_block_type 

217 ) 

218 

219 if sub_block_schema_fields is None: 

220 raise ValueError( 

221 "Unable to create nested block schema for block type" 

222 f" {reference_block_type.name!r} due to missing definition." 

223 ) 

224 

225 reference_block_schema = await create_block_schema( 1a

226 session=session, 

227 block_schema=BlockSchemaCreate( 

228 fields=sub_block_schema_fields, 

229 block_type_id=reference_block_type.id, 

230 ), 

231 override=override, 

232 definitions=definitions, 

233 ) 

234 # Create a block schema reference linking the nested block schema to its parent. 

235 await create_block_schema_reference( 1a

236 session=session, 

237 block_schema_reference=BlockSchemaReference( 

238 parent_block_schema_id=parent_block_schema_id, 

239 reference_block_schema_id=reference_block_schema.id, 

240 name=reference_name, 

241 ), 

242 ) 

243 

244 

245def _get_fields_for_child_schema( 1d

246 db: PrefectDBInterface, 

247 definitions: Dict, 

248 base_fields: Dict, 

249 reference_name: str, 

250 reference_block_type: orm_models.BlockType, 

251) -> dict[str, Any]: 

252 """ 

253 Returns the field definitions for a child schema. The fields definitions are pulled from the provided `definitions` 

254 dictionary based on the information extracted from `base_fields` using the `reference_name`. `reference_block_type` 

255 is used to disambiguate fields that have a union type. 

256 """ 

257 from prefect.blocks.core import _collect_nested_reference_strings 1a

258 

259 spec_reference = base_fields["properties"][reference_name] 1a

260 sub_block_schema_fields = None 1a

261 reference_strings = _collect_nested_reference_strings(spec_reference) 1a

262 if len(reference_strings) == 1: 1a

263 sub_block_schema_fields = definitions.get( 1a

264 reference_strings[0].replace("#/definitions/", "") 

265 ) 

266 else: 

267 for reference_string in reference_strings: 267 ↛ 282line 267 didn't jump to line 282 because the loop on line 267 didn't complete1a

268 definition_key = reference_string.replace("#/definitions/", "") 1a

269 potential_sub_block_schema_fields = definitions[definition_key] 1a

270 # Determines the definition to use when registering a child 

271 # block schema by verifying that the block type name stored in 

272 # the definition matches the name of the block type that we're 

273 # currently trying to register a block schema for. 

274 if ( 

275 definitions[definition_key]["block_type_slug"] 

276 == reference_block_type.slug 

277 ): 

278 # Once we've found the matching definition, we no longer 

279 # need to iterate 

280 sub_block_schema_fields = potential_sub_block_schema_fields 1a

281 break 1a

282 return sub_block_schema_fields # type: ignore 1a

283 

284 

285@db_injector 1d

286async def delete_block_schema( 1d

287 db: PrefectDBInterface, session: AsyncSession, block_schema_id: UUID 

288) -> bool: 

289 """ 

290 Delete a block schema by id. 

291 

292 Args: 

293 session: A database session 

294 block_schema_id: a block schema id 

295 

296 Returns: 

297 bool: whether or not the block schema was deleted 

298 """ 

299 

300 result = await session.execute( 

301 delete(db.BlockSchema).where(db.BlockSchema.id == block_schema_id) 

302 ) 

303 return result.rowcount > 0 

304 

305 

306@db_injector 1d

307async def read_block_schema( 1d

308 db: PrefectDBInterface, 

309 session: AsyncSession, 

310 block_schema_id: UUID, 

311) -> Union[BlockSchema, None]: 

312 """ 

313 Reads a block schema by id. Will reconstruct the block schema's fields attribute 

314 to include block schema references. 

315 

316 Args: 

317 session: A database session 

318 block_schema_id: a block_schema id 

319 

320 Returns: 

321 orm_models..BlockSchema: the block_schema 

322 """ 

323 

324 # Construction of a recursive query which returns the specified block schema 

325 # along with and nested block schemas coupled with the ID of their parent schema 

326 # the key that they reside under. 

327 block_schema_references_query = ( 1bc

328 sa.select(db.BlockSchemaReference) 

329 .select_from(db.BlockSchemaReference) 

330 .filter_by(parent_block_schema_id=block_schema_id) 

331 .cte("block_schema_references", recursive=True) 

332 ) 

333 block_schema_references_join = ( 1bc

334 sa.select(db.BlockSchemaReference) 

335 .select_from(db.BlockSchemaReference) 

336 .join( 

337 block_schema_references_query, 

338 db.BlockSchemaReference.parent_block_schema_id 

339 == block_schema_references_query.c.reference_block_schema_id, 

340 ) 

341 ) 

342 recursive_block_schema_references_cte = block_schema_references_query.union_all( 1bc

343 block_schema_references_join 

344 ) 

345 nested_block_schemas_query = ( 1bc

346 sa.select( 

347 db.BlockSchema, 

348 recursive_block_schema_references_cte.c.name, 

349 recursive_block_schema_references_cte.c.parent_block_schema_id, 

350 ) 

351 .select_from(db.BlockSchema) 

352 .join( 

353 recursive_block_schema_references_cte, 

354 db.BlockSchema.id 

355 == recursive_block_schema_references_cte.c.reference_block_schema_id, 

356 isouter=True, 

357 ) 

358 .filter( 

359 sa.or_( 

360 db.BlockSchema.id == block_schema_id, 

361 recursive_block_schema_references_cte.c.parent_block_schema_id.is_not( 

362 None 

363 ), 

364 ) 

365 ) 

366 ) 

367 result = await session.execute(nested_block_schemas_query) 1bc

368 

369 return _construct_full_block_schema(result.all()) # type: ignore[arg-type] 

370 

371 

372def _construct_full_block_schema( 1d

373 block_schemas_with_references: List[ 

374 Tuple[BlockSchema, Optional[str], Optional[UUID]] 

375 ], 

376 root_block_schema: Optional[BlockSchema] = None, 

377) -> Optional[BlockSchema]: 

378 """ 

379 Takes a list of block schemas along with reference information and reconstructs 

380 the root block schema's fields attribute to contain block schema references for 

381 client consumption. 

382 

383 Args: 

384 block_schema_with_references: A list of tuples with the structure: 

385 - A block schema object 

386 - The name the block schema lives under in the parent block schema 

387 - The ID of the block schema's parent block schema 

388 root_block_schema: Optional block schema to start traversal. Will attempt to 

389 determine root block schema if not provided. 

390 

391 Returns: 

392 BlockSchema: A block schema with a fully reconstructed fields attribute 

393 """ 

394 if len(block_schemas_with_references) == 0: 1abc

395 return None 1abc

396 root_block_schema = ( 1abc

397 copy(root_block_schema) 

398 if root_block_schema is not None 

399 else _find_root_block_schema(block_schemas_with_references) 

400 ) 

401 if root_block_schema is None: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true1abc

402 raise ValueError( 

403 "Unable to determine root block schema during schema reconstruction." 

404 ) 

405 root_block_schema.fields = _construct_block_schema_fields_with_block_references( 1abc

406 root_block_schema, block_schemas_with_references 

407 ) 

408 definitions = _construct_block_schema_spec_definitions( 1abc

409 root_block_schema, block_schemas_with_references 

410 ) 

411 # Definitions for non block object may already exist in the block schema OpenAPI 

412 # spec, so we need to combine block and non-block definitions. 

413 if definitions or root_block_schema.fields.get("definitions"): 1abc

414 root_block_schema.fields["definitions"] = { 1abc

415 **root_block_schema.fields.get("definitions", {}), 

416 **definitions, 

417 } 

418 return root_block_schema 1abc

419 

420 

421def _find_root_block_schema( 1d

422 block_schemas_with_references: List[ 

423 Tuple[BlockSchema, Optional[str], Optional[UUID]] 

424 ], 

425) -> Union[BlockSchema, None]: 

426 """ 

427 Attempts to find the root block schema from a list of block schemas 

428 with references. Returns None if a root block schema is not found. 

429 Returns only the first potential root block schema if multiple are found. 

430 """ 

431 return next( 1abc

432 ( 

433 copy(block_schema) 

434 for ( 

435 block_schema, 

436 _, 

437 parent_block_schema_id, 

438 ) in block_schemas_with_references 

439 if parent_block_schema_id is None 

440 ), 

441 None, 

442 ) 

443 

444 

445def _construct_block_schema_spec_definitions( 1d

446 root_block_schema: BlockSchema, 

447 block_schemas_with_references: List[ 

448 Tuple[BlockSchema, Optional[str], Optional[UUID]] 

449 ], 

450) -> dict[str, Any]: 

451 """ 

452 Constructs field definitions for a block schema based on the nested block schemas 

453 as defined in the block_schemas_with_references list. 

454 """ 

455 definitions: dict[str, Any] = {} 1abc

456 for _, block_schema_references in root_block_schema.fields[ 1abc

457 "block_schema_references" 

458 ].items(): 

459 block_schema_references = ( 1abc

460 block_schema_references 

461 if isinstance(block_schema_references, list) 

462 else [block_schema_references] 

463 ) 

464 for block_schema_reference in block_schema_references: 1abc

465 child_block_schema = _find_block_schema_via_checksum( 1abc

466 block_schemas_with_references, 

467 block_schema_reference["block_schema_checksum"], 

468 ) 

469 

470 if child_block_schema is not None: 470 ↛ 464line 470 didn't jump to line 464 because the condition on line 470 was always true1abc

471 child_block_schema = _construct_full_block_schema( 1abc

472 block_schemas_with_references=block_schemas_with_references, 

473 root_block_schema=child_block_schema, 

474 ) 

475 assert child_block_schema 1abc

476 definitions = _add_block_schemas_fields_to_definitions( 1abc

477 definitions, child_block_schema 

478 ) 

479 return definitions 1abc

480 

481 

482def _find_block_schema_via_checksum( 1d

483 block_schemas_with_references: List[ 

484 Tuple[BlockSchema, Optional[str], Optional[UUID]] 

485 ], 

486 checksum: str, 

487) -> Optional[BlockSchema]: 

488 """Attempt to find a block schema via a given checksum. Returns None if not found.""" 

489 return next( 1abc

490 ( 

491 block_schema 

492 for block_schema, _, _ in block_schemas_with_references 

493 if block_schema.checksum == checksum 

494 ), 

495 None, 

496 ) 

497 

498 

499def _add_block_schemas_fields_to_definitions( 1d

500 definitions: Dict, child_block_schema: BlockSchema 

501) -> dict[str, Any]: 

502 """ 

503 Returns a new definitions dict with the fields of a block schema and it's child 

504 block schemas added to the existing definitions. 

505 """ 

506 block_schema_title = child_block_schema.fields.get("title") 1abc

507 if block_schema_title is not None: 507 ↛ 517line 507 didn't jump to line 517 because the condition on line 507 was always true1abc

508 # Definitions are declared as a flat dict, so we pop off definitions 

509 # from child schemas and add them to the parent definitions dict 

510 child_definitions = child_block_schema.fields.pop("definitions", {}) 1abc

511 return { 1abc

512 **definitions, 

513 **{block_schema_title: child_block_schema.fields}, 

514 **child_definitions, 

515 } 

516 else: 

517 return definitions 

518 

519 

520def _construct_block_schema_fields_with_block_references( 1d

521 parent_block_schema: BlockSchema, 

522 block_schemas_with_references: List[ 

523 Tuple[BlockSchema, Optional[str], Optional[UUID]] 

524 ], 

525) -> dict[str, Any]: 

526 """ 

527 Constructs the block_schema_references in a block schema's fields attributes. Returns 

528 a copy of the block schema with block_schema_references added. 

529 

530 Args: 

531 parent_block_schema: The block schema that needs block references populated. 

532 block_schema_with_references: A list of tuples with the structure: 

533 - A block schema object 

534 - The name the block schema lives under in the parent block schema 

535 - The ID of the block schema's parent block schema 

536 

537 Returns: 

538 Dict: Block schema fields with block schema references added. 

539 

540 """ 

541 block_schema_fields_copy = { 1abc

542 **parent_block_schema.fields, 

543 "block_schema_references": {}, 

544 } 

545 for ( 1abc

546 nested_block_schema, 

547 name, 

548 parent_block_schema_id, 

549 ) in block_schemas_with_references: 

550 if parent_block_schema_id == parent_block_schema.id: 1abc

551 assert nested_block_schema.block_type, ( 1abc

552 f"{nested_block_schema} has no block type" 

553 ) 

554 

555 new_block_schema_reference = { 1abc

556 "block_schema_checksum": nested_block_schema.checksum, 

557 "block_type_slug": nested_block_schema.block_type.slug, 

558 } 

559 # A block reference for this key does not yet exist 

560 if name not in block_schema_fields_copy["block_schema_references"]: 1abc

561 block_schema_fields_copy["block_schema_references"][name] = ( 1abc

562 new_block_schema_reference 

563 ) 

564 else: 

565 # List of block references for this key already exist and the block 

566 # reference that we are attempting add isn't present 

567 if ( 

568 isinstance( 

569 block_schema_fields_copy["block_schema_references"][name], 

570 list, 

571 ) 

572 and new_block_schema_reference 

573 not in block_schema_fields_copy["block_schema_references"][name] 

574 ): 

575 block_schema_fields_copy["block_schema_references"][name].append( 1abc

576 new_block_schema_reference 

577 ) 

578 # A single block reference for this key already exists and it does not 

579 # match the block reference that we are attempting to add 

580 elif ( 580 ↛ 545line 580 didn't jump to line 545 because the condition on line 580 was always true

581 block_schema_fields_copy["block_schema_references"][name] 

582 != new_block_schema_reference 

583 ): 

584 block_schema_fields_copy["block_schema_references"][name] = [ 1abc

585 block_schema_fields_copy["block_schema_references"][name], 

586 new_block_schema_reference, 

587 ] 

588 return block_schema_fields_copy 1abc

589 

590 

591@db_injector 1d

592async def read_block_schemas( 1d

593 db: PrefectDBInterface, 

594 session: AsyncSession, 

595 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None, 

596 limit: Optional[int] = None, 

597 offset: Optional[int] = None, 

598) -> List[BlockSchema]: 

599 """ 

600 Reads block schemas, optionally filtered by type or name. 

601 

602 Args: 

603 session: A database session 

604 block_schema_filter: a block schema filter object 

605 limit (int): query limit 

606 offset (int): query offset 

607 

608 Returns: 

609 List[orm_models.BlockSchema]: the block_schemas 

610 """ 

611 # schemas are ordered by `created DESC` to get the most recently created 

612 # ones first (and to facilitate getting the newest one with `limit=1`). 

613 filtered_block_schemas_query = select(db.BlockSchema.id).order_by( 1befghc

614 db.BlockSchema.created.desc() 

615 ) 

616 

617 if block_schema_filter: 1befghc

618 filtered_block_schemas_query = filtered_block_schemas_query.where( 1befghc

619 block_schema_filter.as_sql_filter() 

620 ) 

621 

622 if offset is not None: 1befghc

623 filtered_block_schemas_query = filtered_block_schemas_query.offset(offset) 1bc

624 if limit is not None: 1befghc

625 filtered_block_schemas_query = filtered_block_schemas_query.limit(limit) 1bc

626 

627 filtered_block_schema_ids = ( 

628 (await session.execute(filtered_block_schemas_query)).scalars().unique().all() 

629 ) 

630 

631 block_schema_references_query = ( 

632 sa.select(db.BlockSchemaReference) 

633 .select_from(db.BlockSchemaReference) 

634 .filter( 

635 db.BlockSchemaReference.parent_block_schema_id.in_( 

636 filtered_block_schemas_query 

637 ) 

638 ) 

639 .cte("block_schema_references", recursive=True) 

640 ) 

641 block_schema_references_join = ( 

642 sa.select(db.BlockSchemaReference) 

643 .select_from(db.BlockSchemaReference) 

644 .join( 

645 block_schema_references_query, 

646 db.BlockSchemaReference.parent_block_schema_id 

647 == block_schema_references_query.c.reference_block_schema_id, 

648 ) 

649 ) 

650 recursive_block_schema_references_cte = block_schema_references_query.union_all( 

651 block_schema_references_join 

652 ) 

653 

654 nested_block_schemas_query = ( 

655 sa.select( 

656 db.BlockSchema, 

657 recursive_block_schema_references_cte.c.name, 

658 recursive_block_schema_references_cte.c.parent_block_schema_id, 

659 ) 

660 .select_from(db.BlockSchema) 

661 # in order to reconstruct nested block schemas efficiently, we need to visit them 

662 # in the order they were created (so that we guarantee that nested/referenced schemas) 

663 # have already been seen. Therefore this second query sorts by created ASC 

664 .order_by(db.BlockSchema.created.asc()) 

665 .join( 

666 recursive_block_schema_references_cte, 

667 db.BlockSchema.id 

668 == recursive_block_schema_references_cte.c.reference_block_schema_id, 

669 isouter=True, 

670 ) 

671 .filter( 

672 sa.or_( 

673 db.BlockSchema.id.in_(filtered_block_schemas_query), 

674 recursive_block_schema_references_cte.c.parent_block_schema_id.is_not( 

675 None 

676 ), 

677 ) 

678 ) 

679 ) 

680 

681 block_schemas_with_references = ( 

682 (await session.execute(nested_block_schemas_query)).unique().all() 

683 ) 

684 fully_constructed_block_schemas = [] 

685 visited_block_schema_ids = [] 

686 for root_block_schema, _, _ in block_schemas_with_references: 

687 if ( 

688 root_block_schema.id in filtered_block_schema_ids 

689 and root_block_schema.id not in visited_block_schema_ids 

690 ): 

691 constructed = _construct_full_block_schema( 

692 block_schemas_with_references=block_schemas_with_references, # type: ignore[arg-type] 

693 root_block_schema=root_block_schema, 

694 ) 

695 assert constructed 

696 fully_constructed_block_schemas.append(constructed) 

697 visited_block_schema_ids.append(root_block_schema.id) 

698 

699 # because we reconstructed schemas ordered by created ASC, we 

700 # reverse the final output to restore created DESC 

701 return list(reversed(fully_constructed_block_schemas)) 

702 

703 

704@db_injector 1d

705async def read_block_schema_by_checksum( 1d

706 db: PrefectDBInterface, 

707 session: AsyncSession, 

708 checksum: str, 

709 version: Optional[str] = None, 

710) -> Optional[BlockSchema]: 

711 """ 

712 Reads a block_schema by checksum. Will reconstruct the block schema's fields 

713 attribute to include block schema references. 

714 

715 Args: 

716 session: A database session 

717 checksum: a block_schema checksum 

718 version: A block_schema version 

719 

720 Returns: 

721 orm_models.BlockSchema: the block_schema 

722 """ 

723 # Construction of a recursive query which returns the specified block schema 

724 # along with and nested block schemas coupled with the ID of their parent schema 

725 # the key that they reside under. 

726 

727 # The same checksum with different versions can occur in the DB. Return only the 

728 # most recently created one. 

729 root_block_schema_query = ( 1abc

730 sa.select(db.BlockSchema) 

731 .filter_by(checksum=checksum) 

732 .order_by(db.BlockSchema.created.desc()) 

733 .limit(1) 

734 ) 

735 

736 if version is not None: 1abc

737 root_block_schema_query = root_block_schema_query.filter_by(version=version) 1abc

738 

739 root_block_schema_cte = root_block_schema_query.cte("root_block_schema") 1abc

740 

741 block_schema_references_query = ( 1abc

742 sa.select(db.BlockSchemaReference) 

743 .select_from(db.BlockSchemaReference) 

744 .filter_by(parent_block_schema_id=root_block_schema_cte.c.id) 

745 .cte("block_schema_references", recursive=True) 

746 ) 

747 block_schema_references_join = ( 1abc

748 sa.select(db.BlockSchemaReference) 

749 .select_from(db.BlockSchemaReference) 

750 .join( 

751 block_schema_references_query, 

752 db.BlockSchemaReference.parent_block_schema_id 

753 == block_schema_references_query.c.reference_block_schema_id, 

754 ) 

755 ) 

756 recursive_block_schema_references_cte = block_schema_references_query.union_all( 1abc

757 block_schema_references_join 

758 ) 

759 nested_block_schemas_query = ( 1abc

760 sa.select( 

761 db.BlockSchema, 

762 recursive_block_schema_references_cte.c.name, 

763 recursive_block_schema_references_cte.c.parent_block_schema_id, 

764 ) 

765 .select_from(db.BlockSchema) 

766 .join( 

767 recursive_block_schema_references_cte, 

768 db.BlockSchema.id 

769 == recursive_block_schema_references_cte.c.reference_block_schema_id, 

770 isouter=True, 

771 ) 

772 .filter( 

773 sa.or_( 

774 db.BlockSchema.id == root_block_schema_cte.c.id, 

775 recursive_block_schema_references_cte.c.parent_block_schema_id.is_not( 

776 None 

777 ), 

778 ) 

779 ) 

780 ) 

781 result = await session.execute(nested_block_schemas_query) 1abc

782 return _construct_full_block_schema(result.all()) # type: ignore[arg-type] 

783 

784 

785@db_injector 1d

786async def read_available_block_capabilities( 1d

787 db: PrefectDBInterface, 

788 session: AsyncSession, 

789) -> List[str]: 

790 """ 

791 Retrieves a list of all available block capabilities. 

792 

793 Args: 

794 session: A database session. 

795 

796 Returns: 

797 List[str]: List of all available block capabilities. 

798 """ 

799 query = sa.select( 1bc

800 db.queries.json_arr_agg( 

801 db.queries.cast_to_json(db.BlockSchema.capabilities.distinct()) 

802 ) 

803 ) 

804 capability_combinations = (await session.execute(query)).scalars().first() or list() 1bc

805 if db.queries.uses_json_strings and isinstance(capability_combinations, str): 

806 capability_combinations = json.loads(capability_combinations) 

807 return list({c for capabilities in capability_combinations for c in capabilities}) 

808 

809 

810@db_injector 1d

811async def create_block_schema_reference( 1d

812 db: PrefectDBInterface, 

813 session: AsyncSession, 

814 block_schema_reference: schemas.core.BlockSchemaReference, 

815) -> Union[orm_models.BlockSchemaReference, None]: 

816 """ 

817 Retrieves a list of all available block capabilities. 

818 

819 Args: 

820 session: A database session. 

821 block_schema_reference: A block schema reference object. 

822 

823 Returns: 

824 orm_models.BlockSchemaReference: The created BlockSchemaReference 

825 """ 

826 query_stmt = sa.select(db.BlockSchemaReference).where( 1a

827 db.BlockSchemaReference.name == block_schema_reference.name, 

828 db.BlockSchemaReference.parent_block_schema_id 

829 == block_schema_reference.parent_block_schema_id, 

830 db.BlockSchemaReference.reference_block_schema_id 

831 == block_schema_reference.reference_block_schema_id, 

832 ) 

833 

834 existing_reference = (await session.execute(query_stmt)).scalar() 1a

835 if existing_reference: 

836 return existing_reference 

837 

838 insert_stmt = db.queries.insert(db.BlockSchemaReference).values( 

839 **block_schema_reference.model_dump_for_orm( 

840 exclude_unset=True, exclude={"created", "updated"} 

841 ) 

842 ) 

843 await session.execute(insert_stmt) 1a

844 

845 result = await session.execute( 1a

846 sa.select(db.BlockSchemaReference).where( 

847 db.BlockSchemaReference.id == block_schema_reference.id 

848 ) 

849 ) 

850 return result.scalar()