Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_schemas.py: 58%
191 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Functions for interacting with block schema ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6import json 1b
7from copy import copy 1b
8from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union 1b
9from uuid import UUID 1b
11import sqlalchemy as sa 1b
12from sqlalchemy import delete, select 1b
13from sqlalchemy.ext.asyncio import AsyncSession 1b
15from prefect.server import schemas 1b
16from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1b
17from prefect.server.models.block_types import read_block_type_by_slug 1b
18from prefect.server.schemas.actions import BlockSchemaCreate 1b
19from prefect.server.schemas.core import BlockSchema, BlockSchemaReference 1b
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1b
22 from prefect.client.schemas.actions import (
23 BlockSchemaCreate as ClientBlockSchemaCreate,
24 )
25 from prefect.client.schemas.objects import BlockSchema as ClientBlockSchema
28class MissingBlockTypeException(Exception): 1b
29 """Raised when the block type corresponding to a block schema cannot be found"""
32@db_injector 1b
33async def create_block_schema( 1b
34 db: PrefectDBInterface,
35 session: AsyncSession,
36 block_schema: Union[
37 schemas.actions.BlockSchemaCreate,
38 schemas.core.BlockSchema,
39 "ClientBlockSchemaCreate",
40 "ClientBlockSchema",
41 ],
42 override: bool = False,
43 definitions: Optional[dict[str, Any]] = None,
44) -> Union[BlockSchema, orm_models.BlockSchema]:
45 """
46 Create a new block schema.
48 Args:
49 session: A database session
50 block_schema: a block schema object
51 definitions: Definitions of fields from block schema fields
52 attribute. Used when recursively creating nested block schemas
54 Returns:
55 block_schema: an ORM block schema model
56 """
57 from prefect.blocks.core import Block, _get_non_block_reference_definitions 1a
59 # We take a shortcut in many unit tests and in block registration to pass client
60 # models directly to this function. We will support this by converting them to
61 # the appropriate server model.
62 if not isinstance(block_schema, schemas.actions.BlockSchemaCreate): 1a
63 block_schema = schemas.actions.BlockSchemaCreate.model_validate( 1a
64 block_schema.model_dump(
65 mode="json",
66 exclude={"id", "created", "updated", "checksum", "block_type"},
67 )
68 )
70 insert_values = block_schema.model_dump_for_orm( 1a
71 exclude_unset=False,
72 exclude={"block_type", "id", "created", "updated"},
73 )
75 definitions = definitions or block_schema.fields.get("definitions") 1a
76 fields_for_checksum = insert_values["fields"] 1a
77 if definitions: 1a
78 # Ensure definitions are available if this is a nested schema
79 # that is being registered
80 fields_for_checksum["definitions"] = definitions 1a
81 checksum = Block._calculate_schema_checksum(fields_for_checksum) 1a
83 # Check for existing block schema based on calculated checksum
84 existing_block_schema = await read_block_schema_by_checksum( 1a
85 session=session, checksum=checksum, version=block_schema.version
86 )
87 # Return existing block schema if it exists. Allows block schema creation to be called multiple
88 # times for the same schema without errors.
89 if existing_block_schema:
90 return existing_block_schema
92 insert_values["checksum"] = checksum
94 if definitions:
95 # Get non block definitions for saving to the DB.
96 non_block_definitions = _get_non_block_reference_definitions(
97 insert_values["fields"], definitions
98 )
99 if non_block_definitions:
100 insert_values["fields"]["definitions"] = (
101 _get_non_block_reference_definitions(
102 insert_values["fields"], definitions
103 )
104 )
105 else:
106 # Prevent storing definitions for blocks. Those are reconstructed on read.
107 insert_values["fields"].pop("definitions", None)
109 # Prevent saving block schema references in the block_schema table. They have
110 # their own table.
111 block_schema_references: Dict = insert_values["fields"].pop(
112 "block_schema_references", {}
113 )
115 insert_stmt = db.queries.insert(db.BlockSchema).values(**insert_values)
116 if override:
117 insert_stmt = insert_stmt.on_conflict_do_update(
118 index_elements=db.orm.block_schema_unique_upsert_columns,
119 set_=insert_values,
120 )
121 await session.execute(insert_stmt) 1a
123 query = (
124 sa.select(db.BlockSchema)
125 .where(
126 db.BlockSchema.checksum == insert_values["checksum"],
127 )
128 .order_by(db.BlockSchema.created.desc())
129 .limit(1)
130 .execution_options(populate_existing=True)
131 )
133 if block_schema.version is not None:
134 query = query.where(db.BlockSchema.version == block_schema.version)
136 result = await session.execute(query) 1a
137 created_block_schema = copy(result.scalar_one())
139 await _register_nested_block_schemas( 1a
140 db,
141 session=session,
142 parent_block_schema_id=created_block_schema.id,
143 block_schema_references=block_schema_references,
144 base_fields=insert_values["fields"],
145 definitions=definitions,
146 override=override,
147 )
149 created_block_schema.fields["block_schema_references"] = block_schema_references
150 if definitions is not None:
151 created_block_schema.fields["definitions"] = definitions
153 return created_block_schema
156async def _register_nested_block_schemas( 1b
157 db: PrefectDBInterface,
158 session: AsyncSession,
159 parent_block_schema_id: UUID,
160 block_schema_references: dict[str, Union[dict[str, str], List[dict[str, str]]]],
161 base_fields: Dict,
162 definitions: Optional[Dict],
163 override: bool = False,
164) -> None:
165 """
166 Iterates through each of the block schema references declared on the block schema.
167 Attempts to register each of the nested block schemas if they have not already been
168 registered. An error is thrown if the corresponding block type for a block schema
169 has not been registered.
171 Args:
172 session: A database session.
173 parent_block_schema_id: The ID of the parent block schema.
174 block_schema_references: A dictionary containing the block schema references for
175 the child block schemas of the parent block schema.
176 base_fields: The field name and type declarations for the parent block schema.
177 definitions: A dictionary of the field name and type declarations of each
178 child block schema.
179 override: Flag controlling if a block schema should updated in place.
180 """
181 for reference_name, reference_values in block_schema_references.items(): 1a
182 # Operate on a list so that we can share the same code paths for union cases
183 reference_values = ( 1a
184 reference_values
185 if isinstance(reference_values, list)
186 else [reference_values]
187 )
188 for reference_values_entry in reference_values: 188 ↛ 181line 188 didn't jump to line 181 because the loop on line 188 didn't complete1a
189 # Check to make sure that associated block type exists
190 reference_block_type = await read_block_type_by_slug( 1a
191 session=session,
192 block_type_slug=reference_values_entry["block_type_slug"],
193 )
194 if reference_block_type is None:
195 raise MissingBlockTypeException(
196 "Cannot create block schema because block type"
197 f" {reference_values_entry['block_type_slug']!r} was not found.Did"
198 " you forget to register the block type?"
199 )
201 reference_block_schema: Union[BlockSchema, orm_models.BlockSchema, None]
203 # Checks to see if the visited block schema has been previously created
204 reference_block_schema = await read_block_schema_by_checksum( 1a
205 session=session,
206 checksum=reference_values_entry["block_schema_checksum"],
207 )
208 # Attempts to create block schema since it has not already been registered
209 if reference_block_schema is None:
210 if definitions is None:
211 raise ValueError(
212 "Unable to create nested block schema due to missing"
213 " definitions in root block schema fields"
214 )
215 sub_block_schema_fields = _get_fields_for_child_schema(
216 db, definitions, base_fields, reference_name, reference_block_type
217 )
219 if sub_block_schema_fields is None:
220 raise ValueError(
221 "Unable to create nested block schema for block type"
222 f" {reference_block_type.name!r} due to missing definition."
223 )
225 reference_block_schema = await create_block_schema( 1a
226 session=session,
227 block_schema=BlockSchemaCreate(
228 fields=sub_block_schema_fields,
229 block_type_id=reference_block_type.id,
230 ),
231 override=override,
232 definitions=definitions,
233 )
234 # Create a block schema reference linking the nested block schema to its parent.
235 await create_block_schema_reference( 1a
236 session=session,
237 block_schema_reference=BlockSchemaReference(
238 parent_block_schema_id=parent_block_schema_id,
239 reference_block_schema_id=reference_block_schema.id,
240 name=reference_name,
241 ),
242 )
245def _get_fields_for_child_schema( 1b
246 db: PrefectDBInterface,
247 definitions: Dict,
248 base_fields: Dict,
249 reference_name: str,
250 reference_block_type: orm_models.BlockType,
251) -> dict[str, Any]:
252 """
253 Returns the field definitions for a child schema. The fields definitions are pulled from the provided `definitions`
254 dictionary based on the information extracted from `base_fields` using the `reference_name`. `reference_block_type`
255 is used to disambiguate fields that have a union type.
256 """
257 from prefect.blocks.core import _collect_nested_reference_strings 1a
259 spec_reference = base_fields["properties"][reference_name] 1a
260 sub_block_schema_fields = None 1a
261 reference_strings = _collect_nested_reference_strings(spec_reference) 1a
262 if len(reference_strings) == 1: 1a
263 sub_block_schema_fields = definitions.get( 1a
264 reference_strings[0].replace("#/definitions/", "")
265 )
266 else:
267 for reference_string in reference_strings: 267 ↛ 282line 267 didn't jump to line 282 because the loop on line 267 didn't complete1a
268 definition_key = reference_string.replace("#/definitions/", "") 1a
269 potential_sub_block_schema_fields = definitions[definition_key] 1a
270 # Determines the definition to use when registering a child
271 # block schema by verifying that the block type name stored in
272 # the definition matches the name of the block type that we're
273 # currently trying to register a block schema for.
274 if (
275 definitions[definition_key]["block_type_slug"]
276 == reference_block_type.slug
277 ):
278 # Once we've found the matching definition, we no longer
279 # need to iterate
280 sub_block_schema_fields = potential_sub_block_schema_fields 1a
281 break 1a
282 return sub_block_schema_fields # type: ignore 1a
285@db_injector 1b
286async def delete_block_schema( 1b
287 db: PrefectDBInterface, session: AsyncSession, block_schema_id: UUID
288) -> bool:
289 """
290 Delete a block schema by id.
292 Args:
293 session: A database session
294 block_schema_id: a block schema id
296 Returns:
297 bool: whether or not the block schema was deleted
298 """
300 result = await session.execute(
301 delete(db.BlockSchema).where(db.BlockSchema.id == block_schema_id)
302 )
303 return result.rowcount > 0
306@db_injector 1b
307async def read_block_schema( 1b
308 db: PrefectDBInterface,
309 session: AsyncSession,
310 block_schema_id: UUID,
311) -> Union[BlockSchema, None]:
312 """
313 Reads a block schema by id. Will reconstruct the block schema's fields attribute
314 to include block schema references.
316 Args:
317 session: A database session
318 block_schema_id: a block_schema id
320 Returns:
321 orm_models..BlockSchema: the block_schema
322 """
324 # Construction of a recursive query which returns the specified block schema
325 # along with and nested block schemas coupled with the ID of their parent schema
326 # the key that they reside under.
327 block_schema_references_query = (
328 sa.select(db.BlockSchemaReference)
329 .select_from(db.BlockSchemaReference)
330 .filter_by(parent_block_schema_id=block_schema_id)
331 .cte("block_schema_references", recursive=True)
332 )
333 block_schema_references_join = (
334 sa.select(db.BlockSchemaReference)
335 .select_from(db.BlockSchemaReference)
336 .join(
337 block_schema_references_query,
338 db.BlockSchemaReference.parent_block_schema_id
339 == block_schema_references_query.c.reference_block_schema_id,
340 )
341 )
342 recursive_block_schema_references_cte = block_schema_references_query.union_all(
343 block_schema_references_join
344 )
345 nested_block_schemas_query = (
346 sa.select(
347 db.BlockSchema,
348 recursive_block_schema_references_cte.c.name,
349 recursive_block_schema_references_cte.c.parent_block_schema_id,
350 )
351 .select_from(db.BlockSchema)
352 .join(
353 recursive_block_schema_references_cte,
354 db.BlockSchema.id
355 == recursive_block_schema_references_cte.c.reference_block_schema_id,
356 isouter=True,
357 )
358 .filter(
359 sa.or_(
360 db.BlockSchema.id == block_schema_id,
361 recursive_block_schema_references_cte.c.parent_block_schema_id.is_not(
362 None
363 ),
364 )
365 )
366 )
367 result = await session.execute(nested_block_schemas_query)
369 return _construct_full_block_schema(result.all()) # type: ignore[arg-type]
372def _construct_full_block_schema( 1b
373 block_schemas_with_references: List[
374 Tuple[BlockSchema, Optional[str], Optional[UUID]]
375 ],
376 root_block_schema: Optional[BlockSchema] = None,
377) -> Optional[BlockSchema]:
378 """
379 Takes a list of block schemas along with reference information and reconstructs
380 the root block schema's fields attribute to contain block schema references for
381 client consumption.
383 Args:
384 block_schema_with_references: A list of tuples with the structure:
385 - A block schema object
386 - The name the block schema lives under in the parent block schema
387 - The ID of the block schema's parent block schema
388 root_block_schema: Optional block schema to start traversal. Will attempt to
389 determine root block schema if not provided.
391 Returns:
392 BlockSchema: A block schema with a fully reconstructed fields attribute
393 """
394 if len(block_schemas_with_references) == 0: 1a
395 return None 1a
396 root_block_schema = ( 1a
397 copy(root_block_schema)
398 if root_block_schema is not None
399 else _find_root_block_schema(block_schemas_with_references)
400 )
401 if root_block_schema is None: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true1a
402 raise ValueError(
403 "Unable to determine root block schema during schema reconstruction."
404 )
405 root_block_schema.fields = _construct_block_schema_fields_with_block_references( 1a
406 root_block_schema, block_schemas_with_references
407 )
408 definitions = _construct_block_schema_spec_definitions( 1a
409 root_block_schema, block_schemas_with_references
410 )
411 # Definitions for non block object may already exist in the block schema OpenAPI
412 # spec, so we need to combine block and non-block definitions.
413 if definitions or root_block_schema.fields.get("definitions"): 1a
414 root_block_schema.fields["definitions"] = { 1a
415 **root_block_schema.fields.get("definitions", {}),
416 **definitions,
417 }
418 return root_block_schema 1a
421def _find_root_block_schema( 1b
422 block_schemas_with_references: List[
423 Tuple[BlockSchema, Optional[str], Optional[UUID]]
424 ],
425) -> Union[BlockSchema, None]:
426 """
427 Attempts to find the root block schema from a list of block schemas
428 with references. Returns None if a root block schema is not found.
429 Returns only the first potential root block schema if multiple are found.
430 """
431 return next( 1a
432 (
433 copy(block_schema)
434 for (
435 block_schema,
436 _,
437 parent_block_schema_id,
438 ) in block_schemas_with_references
439 if parent_block_schema_id is None
440 ),
441 None,
442 )
445def _construct_block_schema_spec_definitions( 1b
446 root_block_schema: BlockSchema,
447 block_schemas_with_references: List[
448 Tuple[BlockSchema, Optional[str], Optional[UUID]]
449 ],
450) -> dict[str, Any]:
451 """
452 Constructs field definitions for a block schema based on the nested block schemas
453 as defined in the block_schemas_with_references list.
454 """
455 definitions: dict[str, Any] = {} 1a
456 for _, block_schema_references in root_block_schema.fields[ 1a
457 "block_schema_references"
458 ].items():
459 block_schema_references = ( 1a
460 block_schema_references
461 if isinstance(block_schema_references, list)
462 else [block_schema_references]
463 )
464 for block_schema_reference in block_schema_references: 1a
465 child_block_schema = _find_block_schema_via_checksum( 1a
466 block_schemas_with_references,
467 block_schema_reference["block_schema_checksum"],
468 )
470 if child_block_schema is not None: 470 ↛ 464line 470 didn't jump to line 464 because the condition on line 470 was always true1a
471 child_block_schema = _construct_full_block_schema( 1a
472 block_schemas_with_references=block_schemas_with_references,
473 root_block_schema=child_block_schema,
474 )
475 assert child_block_schema 1a
476 definitions = _add_block_schemas_fields_to_definitions( 1a
477 definitions, child_block_schema
478 )
479 return definitions 1a
482def _find_block_schema_via_checksum( 1b
483 block_schemas_with_references: List[
484 Tuple[BlockSchema, Optional[str], Optional[UUID]]
485 ],
486 checksum: str,
487) -> Optional[BlockSchema]:
488 """Attempt to find a block schema via a given checksum. Returns None if not found."""
489 return next( 1a
490 (
491 block_schema
492 for block_schema, _, _ in block_schemas_with_references
493 if block_schema.checksum == checksum
494 ),
495 None,
496 )
499def _add_block_schemas_fields_to_definitions( 1b
500 definitions: Dict, child_block_schema: BlockSchema
501) -> dict[str, Any]:
502 """
503 Returns a new definitions dict with the fields of a block schema and it's child
504 block schemas added to the existing definitions.
505 """
506 block_schema_title = child_block_schema.fields.get("title") 1a
507 if block_schema_title is not None: 507 ↛ 517line 507 didn't jump to line 517 because the condition on line 507 was always true1a
508 # Definitions are declared as a flat dict, so we pop off definitions
509 # from child schemas and add them to the parent definitions dict
510 child_definitions = child_block_schema.fields.pop("definitions", {}) 1a
511 return { 1a
512 **definitions,
513 **{block_schema_title: child_block_schema.fields},
514 **child_definitions,
515 }
516 else:
517 return definitions
520def _construct_block_schema_fields_with_block_references( 1b
521 parent_block_schema: BlockSchema,
522 block_schemas_with_references: List[
523 Tuple[BlockSchema, Optional[str], Optional[UUID]]
524 ],
525) -> dict[str, Any]:
526 """
527 Constructs the block_schema_references in a block schema's fields attributes. Returns
528 a copy of the block schema with block_schema_references added.
530 Args:
531 parent_block_schema: The block schema that needs block references populated.
532 block_schema_with_references: A list of tuples with the structure:
533 - A block schema object
534 - The name the block schema lives under in the parent block schema
535 - The ID of the block schema's parent block schema
537 Returns:
538 Dict: Block schema fields with block schema references added.
540 """
541 block_schema_fields_copy = { 1a
542 **parent_block_schema.fields,
543 "block_schema_references": {},
544 }
545 for ( 1a
546 nested_block_schema,
547 name,
548 parent_block_schema_id,
549 ) in block_schemas_with_references:
550 if parent_block_schema_id == parent_block_schema.id: 1a
551 assert nested_block_schema.block_type, ( 1a
552 f"{nested_block_schema} has no block type"
553 )
555 new_block_schema_reference = { 1a
556 "block_schema_checksum": nested_block_schema.checksum,
557 "block_type_slug": nested_block_schema.block_type.slug,
558 }
559 # A block reference for this key does not yet exist
560 if name not in block_schema_fields_copy["block_schema_references"]: 1a
561 block_schema_fields_copy["block_schema_references"][name] = ( 1a
562 new_block_schema_reference
563 )
564 else:
565 # List of block references for this key already exist and the block
566 # reference that we are attempting add isn't present
567 if (
568 isinstance(
569 block_schema_fields_copy["block_schema_references"][name],
570 list,
571 )
572 and new_block_schema_reference
573 not in block_schema_fields_copy["block_schema_references"][name]
574 ):
575 block_schema_fields_copy["block_schema_references"][name].append( 1a
576 new_block_schema_reference
577 )
578 # A single block reference for this key already exists and it does not
579 # match the block reference that we are attempting to add
580 elif ( 580 ↛ 545line 580 didn't jump to line 545 because the condition on line 580 was always true
581 block_schema_fields_copy["block_schema_references"][name]
582 != new_block_schema_reference
583 ):
584 block_schema_fields_copy["block_schema_references"][name] = [ 1a
585 block_schema_fields_copy["block_schema_references"][name],
586 new_block_schema_reference,
587 ]
588 return block_schema_fields_copy 1a
591@db_injector 1b
592async def read_block_schemas( 1b
593 db: PrefectDBInterface,
594 session: AsyncSession,
595 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
596 limit: Optional[int] = None,
597 offset: Optional[int] = None,
598) -> List[BlockSchema]:
599 """
600 Reads block schemas, optionally filtered by type or name.
602 Args:
603 session: A database session
604 block_schema_filter: a block schema filter object
605 limit (int): query limit
606 offset (int): query offset
608 Returns:
609 List[orm_models.BlockSchema]: the block_schemas
610 """
611 # schemas are ordered by `created DESC` to get the most recently created
612 # ones first (and to facilitate getting the newest one with `limit=1`).
613 filtered_block_schemas_query = select(db.BlockSchema.id).order_by(
614 db.BlockSchema.created.desc()
615 )
617 if block_schema_filter:
618 filtered_block_schemas_query = filtered_block_schemas_query.where(
619 block_schema_filter.as_sql_filter()
620 )
622 if offset is not None:
623 filtered_block_schemas_query = filtered_block_schemas_query.offset(offset)
624 if limit is not None:
625 filtered_block_schemas_query = filtered_block_schemas_query.limit(limit)
627 filtered_block_schema_ids = (
628 (await session.execute(filtered_block_schemas_query)).scalars().unique().all()
629 )
631 block_schema_references_query = (
632 sa.select(db.BlockSchemaReference)
633 .select_from(db.BlockSchemaReference)
634 .filter(
635 db.BlockSchemaReference.parent_block_schema_id.in_(
636 filtered_block_schemas_query
637 )
638 )
639 .cte("block_schema_references", recursive=True)
640 )
641 block_schema_references_join = (
642 sa.select(db.BlockSchemaReference)
643 .select_from(db.BlockSchemaReference)
644 .join(
645 block_schema_references_query,
646 db.BlockSchemaReference.parent_block_schema_id
647 == block_schema_references_query.c.reference_block_schema_id,
648 )
649 )
650 recursive_block_schema_references_cte = block_schema_references_query.union_all(
651 block_schema_references_join
652 )
654 nested_block_schemas_query = (
655 sa.select(
656 db.BlockSchema,
657 recursive_block_schema_references_cte.c.name,
658 recursive_block_schema_references_cte.c.parent_block_schema_id,
659 )
660 .select_from(db.BlockSchema)
661 # in order to reconstruct nested block schemas efficiently, we need to visit them
662 # in the order they were created (so that we guarantee that nested/referenced schemas)
663 # have already been seen. Therefore this second query sorts by created ASC
664 .order_by(db.BlockSchema.created.asc())
665 .join(
666 recursive_block_schema_references_cte,
667 db.BlockSchema.id
668 == recursive_block_schema_references_cte.c.reference_block_schema_id,
669 isouter=True,
670 )
671 .filter(
672 sa.or_(
673 db.BlockSchema.id.in_(filtered_block_schemas_query),
674 recursive_block_schema_references_cte.c.parent_block_schema_id.is_not(
675 None
676 ),
677 )
678 )
679 )
681 block_schemas_with_references = (
682 (await session.execute(nested_block_schemas_query)).unique().all()
683 )
684 fully_constructed_block_schemas = []
685 visited_block_schema_ids = []
686 for root_block_schema, _, _ in block_schemas_with_references:
687 if (
688 root_block_schema.id in filtered_block_schema_ids
689 and root_block_schema.id not in visited_block_schema_ids
690 ):
691 constructed = _construct_full_block_schema(
692 block_schemas_with_references=block_schemas_with_references, # type: ignore[arg-type]
693 root_block_schema=root_block_schema,
694 )
695 assert constructed
696 fully_constructed_block_schemas.append(constructed)
697 visited_block_schema_ids.append(root_block_schema.id)
699 # because we reconstructed schemas ordered by created ASC, we
700 # reverse the final output to restore created DESC
701 return list(reversed(fully_constructed_block_schemas))
704@db_injector 1b
705async def read_block_schema_by_checksum( 1b
706 db: PrefectDBInterface,
707 session: AsyncSession,
708 checksum: str,
709 version: Optional[str] = None,
710) -> Optional[BlockSchema]:
711 """
712 Reads a block_schema by checksum. Will reconstruct the block schema's fields
713 attribute to include block schema references.
715 Args:
716 session: A database session
717 checksum: a block_schema checksum
718 version: A block_schema version
720 Returns:
721 orm_models.BlockSchema: the block_schema
722 """
723 # Construction of a recursive query which returns the specified block schema
724 # along with and nested block schemas coupled with the ID of their parent schema
725 # the key that they reside under.
727 # The same checksum with different versions can occur in the DB. Return only the
728 # most recently created one.
729 root_block_schema_query = ( 1a
730 sa.select(db.BlockSchema)
731 .filter_by(checksum=checksum)
732 .order_by(db.BlockSchema.created.desc())
733 .limit(1)
734 )
736 if version is not None: 1a
737 root_block_schema_query = root_block_schema_query.filter_by(version=version) 1a
739 root_block_schema_cte = root_block_schema_query.cte("root_block_schema") 1a
741 block_schema_references_query = ( 1a
742 sa.select(db.BlockSchemaReference)
743 .select_from(db.BlockSchemaReference)
744 .filter_by(parent_block_schema_id=root_block_schema_cte.c.id)
745 .cte("block_schema_references", recursive=True)
746 )
747 block_schema_references_join = ( 1a
748 sa.select(db.BlockSchemaReference)
749 .select_from(db.BlockSchemaReference)
750 .join(
751 block_schema_references_query,
752 db.BlockSchemaReference.parent_block_schema_id
753 == block_schema_references_query.c.reference_block_schema_id,
754 )
755 )
756 recursive_block_schema_references_cte = block_schema_references_query.union_all( 1a
757 block_schema_references_join
758 )
759 nested_block_schemas_query = ( 1a
760 sa.select(
761 db.BlockSchema,
762 recursive_block_schema_references_cte.c.name,
763 recursive_block_schema_references_cte.c.parent_block_schema_id,
764 )
765 .select_from(db.BlockSchema)
766 .join(
767 recursive_block_schema_references_cte,
768 db.BlockSchema.id
769 == recursive_block_schema_references_cte.c.reference_block_schema_id,
770 isouter=True,
771 )
772 .filter(
773 sa.or_(
774 db.BlockSchema.id == root_block_schema_cte.c.id,
775 recursive_block_schema_references_cte.c.parent_block_schema_id.is_not(
776 None
777 ),
778 )
779 )
780 )
781 result = await session.execute(nested_block_schemas_query) 1a
782 return _construct_full_block_schema(result.all()) # type: ignore[arg-type]
785@db_injector 1b
786async def read_available_block_capabilities( 1b
787 db: PrefectDBInterface,
788 session: AsyncSession,
789) -> List[str]:
790 """
791 Retrieves a list of all available block capabilities.
793 Args:
794 session: A database session.
796 Returns:
797 List[str]: List of all available block capabilities.
798 """
799 query = sa.select(
800 db.queries.json_arr_agg(
801 db.queries.cast_to_json(db.BlockSchema.capabilities.distinct())
802 )
803 )
804 capability_combinations = (await session.execute(query)).scalars().first() or list()
805 if db.queries.uses_json_strings and isinstance(capability_combinations, str):
806 capability_combinations = json.loads(capability_combinations)
807 return list({c for capabilities in capability_combinations for c in capabilities})
810@db_injector 1b
811async def create_block_schema_reference( 1b
812 db: PrefectDBInterface,
813 session: AsyncSession,
814 block_schema_reference: schemas.core.BlockSchemaReference,
815) -> Union[orm_models.BlockSchemaReference, None]:
816 """
817 Retrieves a list of all available block capabilities.
819 Args:
820 session: A database session.
821 block_schema_reference: A block schema reference object.
823 Returns:
824 orm_models.BlockSchemaReference: The created BlockSchemaReference
825 """
826 query_stmt = sa.select(db.BlockSchemaReference).where( 1a
827 db.BlockSchemaReference.name == block_schema_reference.name,
828 db.BlockSchemaReference.parent_block_schema_id
829 == block_schema_reference.parent_block_schema_id,
830 db.BlockSchemaReference.reference_block_schema_id
831 == block_schema_reference.reference_block_schema_id,
832 )
834 existing_reference = (await session.execute(query_stmt)).scalar() 1a
835 if existing_reference:
836 return existing_reference
838 insert_stmt = db.queries.insert(db.BlockSchemaReference).values(
839 **block_schema_reference.model_dump_for_orm(
840 exclude_unset=True, exclude={"created", "updated"}
841 )
842 )
843 await session.execute(insert_stmt) 1a
845 result = await session.execute( 1a
846 sa.select(db.BlockSchemaReference).where(
847 db.BlockSchemaReference.id == block_schema_reference.id
848 )
849 )
850 return result.scalar()