Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_documents.py: 34%
185 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Functions for interacting with block document ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6from copy import copy 1a
7from typing import Dict, List, Optional, Sequence, Tuple, TypeVar, Union 1a
8from uuid import UUID, uuid4 1a
10import sqlalchemy as sa 1a
11from sqlalchemy.ext.asyncio import AsyncSession 1a
12from sqlalchemy.sql import Select 1a
14import prefect.server.models as models 1a
15from prefect.server import schemas 1a
16from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a
17from prefect.server.schemas.actions import BlockDocumentReferenceCreate 1a
18from prefect.server.schemas.core import BlockDocument 1a
19from prefect.server.schemas.filters import BlockSchemaFilter 1a
20from prefect.server.utilities.database import UUID as UUIDTypeDecorator 1a
21from prefect.utilities.collections import dict_to_flatdict, flatdict_to_dict 1a
22from prefect.utilities.names import obfuscate 1a
24T = TypeVar("T", bound=tuple) 1a
27@db_injector 1a
28async def create_block_document( 1a
29 db: PrefectDBInterface,
30 session: AsyncSession,
31 block_document: schemas.actions.BlockDocumentCreate,
32) -> BlockDocument:
33 # lookup block type name and copy to the block document table
34 block_type = await models.block_types.read_block_type( 1cb
35 session=session, block_type_id=block_document.block_type_id
36 )
37 assert block_type, f"Block type {block_document.block_type_id} not found"
39 name: Union[str, None]
40 # anonymous block documents can be given a random name if none is provided
41 if block_document.is_anonymous and not block_document.name:
42 name = f"anonymous-{uuid4()}"
43 else:
44 name = block_document.name
46 orm_block = db.BlockDocument(
47 name=name,
48 block_schema_id=block_document.block_schema_id,
49 block_type_id=block_document.block_type_id,
50 block_type_name=block_type.name,
51 is_anonymous=block_document.is_anonymous,
52 )
54 (
55 block_document_data_without_refs,
56 block_document_references,
57 ) = _separate_block_references_from_data(block_document.data)
59 # encrypt the data and store in block document
60 await orm_block.encrypt_data(session=session, data=block_document_data_without_refs)
62 session.add(orm_block)
63 await session.flush()
65 # Create a block document reference for each reference in the block document data
66 for key, reference_block_document_id in block_document_references:
67 await create_block_document_reference(
68 session=session,
69 block_document_reference=BlockDocumentReferenceCreate(
70 parent_block_document_id=orm_block.id,
71 reference_block_document_id=reference_block_document_id,
72 name=key,
73 ),
74 )
76 # reload the block document in order to load the associated block schema
77 # relationship
78 new_block_document = await read_block_document_by_id(
79 session=session,
80 block_document_id=orm_block.id,
81 include_secrets=False,
82 )
83 assert new_block_document
85 return new_block_document
88@db_injector 1a
89async def block_document_with_unique_values_exists( 1a
90 db: PrefectDBInterface, session: AsyncSession, block_type_id: UUID, name: str
91) -> bool:
92 result = await session.execute(
93 sa.select(sa.exists(db.BlockDocument)).where(
94 db.BlockDocument.block_type_id == block_type_id,
95 db.BlockDocument.name == name,
96 )
97 )
98 return bool(result.scalar_one_or_none())
101def _separate_block_references_from_data( 1a
102 block_document_data: Dict,
103) -> Tuple[Dict, List[Tuple[str, UUID]]]:
104 """
105 Separates block document references from block document data so that a block
106 document can be saved without references and the corresponding block document
107 references can be saved.
109 Args:
110 block_document_data: Dictionary of block document data passed with request
111 to create new block document
113 Returns:
114 block_document_data_with_out_refs: A copy of the block_document_data supplied
115 with the block document references removed.
116 block_document_references: A list of tuples each containing the name of the
117 field referencing a block document and the ID of the referenced block
118 document,
119 """
120 block_document_references = []
121 block_document_data_without_refs = {}
122 for key, value in block_document_data.items():
123 # Current assumption is that any block references will be stored on a key of
124 # the block document data and not nested in any other data structures.
125 if isinstance(value, dict) and "$ref" in value:
126 reference_block_document_id = value["$ref"].get("block_document_id")
127 if reference_block_document_id is None:
128 raise ValueError(
129 f"Received block reference without a block_document_id in key {key}"
130 )
131 block_document_references.append((key, reference_block_document_id))
132 else:
133 block_document_data_without_refs[key] = value
134 return block_document_data_without_refs, block_document_references
137async def read_block_document_by_id( 1a
138 session: AsyncSession,
139 block_document_id: UUID,
140 include_secrets: bool = False,
141) -> Union[BlockDocument, None]:
142 block_documents = await read_block_documents( 1cdefgb
143 session=session,
144 block_document_filter=schemas.filters.BlockDocumentFilter(
145 id=dict(any_=[block_document_id]),
146 # don't apply any anonymous filtering
147 is_anonymous=None,
148 ),
149 include_secrets=include_secrets,
150 limit=1,
151 )
152 return block_documents[0] if block_documents else None
155async def _construct_full_block_document( 1a
156 db: PrefectDBInterface,
157 session: AsyncSession,
158 block_documents_with_references: Sequence[
159 Tuple[orm_models.ORMBlockDocument, Optional[str], Optional[UUID]]
160 ],
161 parent_block_document: Optional[BlockDocument] = None,
162 include_secrets: bool = False,
163) -> Optional[BlockDocument]:
164 if len(block_documents_with_references) == 0:
165 return None
166 if parent_block_document is None:
167 parent_block_document = copy(
168 await _find_parent_block_document(
169 session,
170 block_documents_with_references,
171 include_secrets=include_secrets,
172 )
173 )
175 if parent_block_document is None:
176 raise ValueError("Unable to determine parent block document")
178 # Recursively walk block document tree and construct the full block
179 # document data for each child and add it to the parent's block document
180 # data
181 for (
182 orm_block_document,
183 name,
184 parent_block_document_id,
185 ) in block_documents_with_references:
186 if parent_block_document_id == parent_block_document.id and name is not None:
187 block_document = await BlockDocument.from_orm_model(
188 session, orm_block_document, include_secrets=include_secrets
189 )
190 full_child_block_document = await _construct_full_block_document(
191 db,
192 session,
193 block_documents_with_references,
194 parent_block_document=copy(block_document),
195 include_secrets=include_secrets,
196 )
197 assert full_child_block_document
198 parent_block_document.data[name] = full_child_block_document.data
199 parent_block_document.block_document_references[name] = {
200 "block_document": {
201 "id": block_document.id,
202 "name": block_document.name,
203 "block_type": block_document.block_type,
204 "is_anonymous": block_document.is_anonymous,
205 "block_document_references": (
206 full_child_block_document.block_document_references
207 ),
208 }
209 }
211 return parent_block_document
214async def _find_parent_block_document( 1a
215 session: AsyncSession,
216 block_documents_with_references: Sequence[
217 Tuple[orm_models.ORMBlockDocument, Optional[str], Optional[UUID]]
218 ],
219 include_secrets: bool = False,
220) -> Union[BlockDocument, None]:
221 parent_orm_block_document = next(
222 (
223 block_document
224 for (
225 block_document,
226 _,
227 parent_block_document_id,
228 ) in block_documents_with_references
229 if parent_block_document_id is None
230 ),
231 None,
232 )
233 return (
234 await BlockDocument.from_orm_model(
235 session,
236 parent_orm_block_document,
237 include_secrets=include_secrets,
238 )
239 if parent_orm_block_document is not None
240 else None
241 )
244async def read_block_document_by_name( 1a
245 session: AsyncSession,
246 name: str,
247 block_type_slug: str,
248 include_secrets: bool = False,
249) -> Union[BlockDocument, None]:
250 """
251 Read a block document with the given name and block type slug.
252 """
253 block_documents = await read_block_documents( 1cb
254 session=session,
255 block_document_filter=schemas.filters.BlockDocumentFilter(
256 name=dict(any_=[name]),
257 # don't apply any anonymous filtering
258 is_anonymous=None,
259 ),
260 block_type_filter=schemas.filters.BlockTypeFilter(
261 slug=dict(any_=[block_type_slug])
262 ),
263 include_secrets=include_secrets,
264 limit=1,
265 )
266 return block_documents[0] if block_documents else None
269def _apply_block_document_filters( 1a
270 db: PrefectDBInterface,
271 query: Select[T],
272 block_document_filter: Optional[schemas.filters.BlockDocumentFilter] = None,
273 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
274 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None,
275) -> Select[T]:
276 # if no filter is provided, one is created that excludes anonymous blocks
277 if block_document_filter is None: 1cdefb
278 block_document_filter = schemas.filters.BlockDocumentFilter( 1cb
279 is_anonymous=schemas.filters.BlockDocumentFilterIsAnonymous(eq_=False)
280 )
282 # --- Build an initial query that filters for the requested block documents
283 query = query.where(block_document_filter.as_sql_filter()) 1cdefb
285 if block_type_filter is not None: 1cdefb
286 block_type_exists_clause = sa.select(db.BlockType).where( 1cb
287 db.BlockType.id == db.BlockDocument.block_type_id,
288 block_type_filter.as_sql_filter(),
289 )
290 query = query.where(block_type_exists_clause.exists()) 1cb
292 if block_schema_filter is not None: 1cdefb
293 block_schema_exists_clause = sa.select(db.BlockSchema).where( 1cb
294 db.BlockSchema.id == db.BlockDocument.block_schema_id,
295 block_schema_filter.as_sql_filter(),
296 )
297 query = query.where(block_schema_exists_clause.exists()) 1cb
299 return query 1cdefb
302@db_injector 1a
303async def read_block_documents( 1a
304 db: PrefectDBInterface,
305 session: AsyncSession,
306 block_document_filter: Optional[schemas.filters.BlockDocumentFilter] = None,
307 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None,
308 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
309 include_secrets: bool = False,
310 sort: schemas.sorting.BlockDocumentSort = schemas.sorting.BlockDocumentSort.NAME_ASC,
311 offset: Optional[int] = None,
312 limit: Optional[int] = None,
313) -> List[BlockDocument]:
314 """
315 Read block documents with an optional limit and offset
316 """
317 # --- Build an initial query that filters for the requested block documents
318 filtered_block_documents_query = sa.select(db.BlockDocument.id) 1cdefb
319 filtered_block_documents_query = _apply_block_document_filters( 1cdefb
320 db,
321 query=filtered_block_documents_query,
322 block_document_filter=block_document_filter,
323 block_type_filter=block_type_filter,
324 block_schema_filter=block_schema_filter,
325 )
326 filtered_block_documents_query = filtered_block_documents_query.order_by( 1cdefb
327 *sort.as_sql_sort()
328 )
330 if offset is not None: 1cdefb
331 filtered_block_documents_query = filtered_block_documents_query.offset(offset) 1cb
333 if limit is not None: 333 ↛ 336line 333 didn't jump to line 336 because the condition on line 333 was always true1cdefb
334 filtered_block_documents_query = filtered_block_documents_query.limit(limit) 1cdefb
336 filtered_block_documents_cte = filtered_block_documents_query.cte( 1cdefb
337 "filtered_block_documents"
338 )
340 # --- Build a recursive query that starts with the filtered block documents
341 # and iteratively loads all referenced block documents. The query includes
342 # the ID of each block document as well as the ID of the document that
343 # references it and name it's referenced by, if applicable.
344 parent_documents = ( 1cdefb
345 sa.select(
346 filtered_block_documents_cte.c.id,
347 sa.cast(sa.null(), sa.String).label("reference_name"),
348 sa.cast(sa.null(), UUIDTypeDecorator).label(
349 "reference_parent_block_document_id"
350 ),
351 )
352 .select_from(filtered_block_documents_cte)
353 .cte("all_block_documents", recursive=True)
354 )
355 # recursive part of query
356 referenced_documents = ( 1cdefb
357 sa.select(
358 db.BlockDocumentReference.reference_block_document_id,
359 db.BlockDocumentReference.name,
360 db.BlockDocumentReference.parent_block_document_id,
361 )
362 .select_from(parent_documents)
363 .join(
364 db.BlockDocumentReference,
365 db.BlockDocumentReference.parent_block_document_id == parent_documents.c.id,
366 )
367 )
368 # union the recursive CTE
369 all_block_documents_query = parent_documents.union_all(referenced_documents) 1cdefb
371 # --- Join the recursive query that contains all required document IDs
372 # back to the BlockDocument table to load info for every document
373 # and order by name
374 final_query = ( 1cdefb
375 sa.select(
376 db.BlockDocument,
377 all_block_documents_query.c.reference_name,
378 all_block_documents_query.c.reference_parent_block_document_id,
379 )
380 .select_from(all_block_documents_query)
381 .join(db.BlockDocument, db.BlockDocument.id == all_block_documents_query.c.id)
382 .order_by(*sort.as_sql_sort())
383 )
385 result = await session.execute( 1cdefgb
386 final_query.execution_options(populate_existing=True)
387 )
389 block_documents_with_references = result.unique().all()
391 # identify true "parent" documents as those with no reference parent ids
392 parent_block_document_ids = [
393 d[0].id
394 for d in block_documents_with_references
395 if d.reference_parent_block_document_id is None
396 ]
398 # walk the resulting dataset and hydrate all block documents
399 fully_constructed_block_documents: List[BlockDocument] = []
400 visited_block_document_ids = []
401 for root_orm_block_document, _, _ in block_documents_with_references:
402 if (
403 root_orm_block_document.id in parent_block_document_ids
404 and root_orm_block_document.id not in visited_block_document_ids
405 ):
406 root_block_document = await BlockDocument.from_orm_model(
407 session, root_orm_block_document, include_secrets=include_secrets
408 )
409 constructed = await _construct_full_block_document(
410 db,
411 session,
412 block_documents_with_references, # type: ignore
413 root_block_document,
414 include_secrets=include_secrets,
415 )
416 assert constructed
418 fully_constructed_block_documents.append(constructed)
419 visited_block_document_ids.append(root_orm_block_document.id)
421 block_schema_ids = [
422 block_document.block_schema_id
423 for block_document in fully_constructed_block_documents
424 ]
425 block_schemas = await models.block_schemas.read_block_schemas( 1cdefgb
426 session=session,
427 block_schema_filter=BlockSchemaFilter(id=dict(any_=block_schema_ids)),
428 )
429 for block_document in fully_constructed_block_documents:
430 corresponding_block_schema = next(
431 block_schema
432 for block_schema in block_schemas
433 if block_schema.id == block_document.block_schema_id
434 )
435 block_document.block_schema = corresponding_block_schema
437 return fully_constructed_block_documents
440@db_injector 1a
441async def count_block_documents( 1a
442 db: PrefectDBInterface,
443 session: AsyncSession,
444 block_document_filter: Optional[schemas.filters.BlockDocumentFilter] = None,
445 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None,
446 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
447) -> int:
448 """
449 Count block documents that match the filters.
450 """
451 query = sa.select(sa.func.count()).select_from(db.BlockDocument) 1cb
453 query = _apply_block_document_filters( 1cb
454 db,
455 query=query,
456 block_document_filter=block_document_filter,
457 block_schema_filter=block_schema_filter,
458 block_type_filter=block_type_filter,
459 )
461 result = await session.execute(query) 1cb
462 return result.scalar() # type: ignore
465@db_injector 1a
466async def delete_block_document( 1ag
467 db: PrefectDBInterface,
468 session: AsyncSession,
469 block_document_id: UUID,
470) -> bool:
471 query = sa.delete(db.BlockDocument).where(db.BlockDocument.id == block_document_id) 1cb
472 result = await session.execute(query) 1cb
473 return result.rowcount > 0
476@db_injector 1a
477async def update_block_document( 1a
478 db: PrefectDBInterface,
479 session: AsyncSession,
480 block_document_id: UUID,
481 block_document: schemas.actions.BlockDocumentUpdate,
482) -> bool:
483 merge_existing_data = block_document.merge_existing_data 1cb
484 current_block_document = await session.get(db.BlockDocument, block_document_id) 1cgb
485 if not current_block_document:
486 return False
488 update_values = block_document.model_dump_for_orm( 1g
489 exclude_unset=merge_existing_data,
490 exclude={"merge_existing_data"},
491 )
493 if "data" in update_values and update_values["data"] is not None:
494 current_data = await current_block_document.decrypt_data(session=session)
496 # if a value for a secret field was provided that is identical to the
497 # obfuscated value of the current secret value, it means someone is
498 # probably trying to update all of the documents fields without
499 # realizing they are posting back obfuscated data, so we disregard the update
500 flat_update_data = dict_to_flatdict(update_values["data"]) 1g
501 flat_current_data = dict_to_flatdict(current_data)
502 for secret_field in current_block_document.block_schema.fields.get(
503 "secret_fields", []
504 ):
505 secret_key = tuple(secret_field.split("."))
506 current_secret = flat_current_data.get(secret_key)
507 if current_secret is not None: 507 ↛ anywhereline 507 didn't jump anywhere: it always raised an exception.1g
508 if flat_update_data.get(secret_key) == obfuscate(current_secret):
509 flat_update_data[secret_key] = current_secret
510 # Looks for obfuscated values nested under a secret field with a wildcard.
511 # If any obfuscated values are found, we assume that it shouldn't be update,
512 # and they are replaced with the current value for that key to avoid losing
513 # data during update.
514 elif "*" in secret_key:
515 wildcard_index = secret_key.index("*")
516 for data_key in flat_update_data.keys():
517 if (
518 secret_key[0:wildcard_index] == data_key[0:wildcard_index]
519 ) and (
520 flat_update_data[data_key]
521 == obfuscate(flat_update_data[data_key])
522 ):
523 flat_update_data[data_key] = flat_current_data[data_key]
525 update_values["data"] = flatdict_to_dict(flat_update_data)
527 if merge_existing_data:
528 # merge the existing data and the new data for partial updates
529 current_data.update(update_values["data"])
530 update_values["data"] = current_data
532 current_block_document_references = (
533 (
534 await session.execute(
535 sa.select(db.BlockDocumentReference).filter_by(
536 parent_block_document_id=block_document_id
537 )
538 )
539 )
540 .scalars()
541 .all()
542 )
543 (
544 block_document_data_without_refs,
545 new_block_document_references,
546 ) = _separate_block_references_from_data(update_values["data"])
548 # encrypt the data and write updated data to the block document
549 await current_block_document.encrypt_data(
550 session=session, data=block_document_data_without_refs
551 )
553 # `proposed_block_schema` is always the same as the schema on the client-side
554 # Block class that is calling `save`, which may or may not be the same schema
555 # as the one on the saved block document
556 proposed_block_schema_id = block_document.block_schema_id
558 # if a new schema is proposed, update the block schema id for the block document
559 if (
560 proposed_block_schema_id is not None
561 and proposed_block_schema_id != current_block_document.block_schema_id
562 ):
563 proposed_block_schema = await session.get(
564 db.BlockSchema, proposed_block_schema_id
565 )
566 assert proposed_block_schema, (
567 f"Block schema {proposed_block_schema_id} not found"
568 )
570 # make sure the proposed schema is of the same block type as the current document
571 if (
572 proposed_block_schema.block_type_id
573 != current_block_document.block_type_id
574 ):
575 raise ValueError(
576 "Must migrate block document to a block schema of the same block"
577 " type."
578 )
579 await session.execute(
580 sa.update(db.BlockDocument)
581 .where(db.BlockDocument.id == block_document_id)
582 .values(block_schema_id=proposed_block_schema_id)
583 )
585 unchanged_block_document_references = []
586 for name, reference_block_document_id in new_block_document_references:
587 matching_current_block_document_reference = _find_block_document_reference(
588 current_block_document_references,
589 name,
590 reference_block_document_id,
591 )
592 if matching_current_block_document_reference is None:
593 await create_block_document_reference(
594 session=session,
595 block_document_reference=BlockDocumentReferenceCreate(
596 parent_block_document_id=block_document_id,
597 reference_block_document_id=reference_block_document_id,
598 name=name,
599 ),
600 )
601 else:
602 unchanged_block_document_references.append(
603 matching_current_block_document_reference
604 )
606 for block_document_reference in current_block_document_references:
607 if block_document_reference not in unchanged_block_document_references:
608 await delete_block_document_reference(
609 session, block_document_reference_id=block_document_reference.id
610 )
612 return True
615def _find_block_document_reference( 1a
616 block_document_references: Sequence[orm_models.BlockDocumentReference],
617 name: str,
618 reference_block_document_id: UUID,
619) -> Optional[orm_models.BlockDocumentReference]:
620 return next(
621 (
622 block_document_reference
623 for block_document_reference in block_document_references
624 if block_document_reference.name == name
625 and block_document_reference.reference_block_document_id
626 == reference_block_document_id
627 ),
628 None,
629 )
632@db_injector 1a
633async def create_block_document_reference( 1a
634 db: PrefectDBInterface,
635 session: AsyncSession,
636 block_document_reference: schemas.actions.BlockDocumentReferenceCreate,
637) -> Union[orm_models.BlockDocumentReference, None]:
638 insert_stmt = db.queries.insert(db.BlockDocumentReference).values(
639 **block_document_reference.model_dump_for_orm(
640 exclude_unset=True, exclude={"created", "updated"}
641 )
642 )
643 await session.execute(insert_stmt)
645 result = await session.execute(
646 sa.select(db.BlockDocumentReference).where(
647 db.BlockDocumentReference.id == block_document_reference.id
648 )
649 )
651 return result.scalar()
654@db_injector 1a
655async def delete_block_document_reference( 1a
656 db: PrefectDBInterface,
657 session: AsyncSession,
658 block_document_reference_id: UUID,
659) -> bool:
660 query = sa.delete(db.BlockDocumentReference).where(
661 db.BlockDocumentReference.id == block_document_reference_id
662 )
663 result = await session.execute(query)
664 return result.rowcount > 0