Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_documents.py: 34%

185 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Functions for interacting with block document ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6from copy import copy 1a

7from typing import Dict, List, Optional, Sequence, Tuple, TypeVar, Union 1a

8from uuid import UUID, uuid4 1a

9 

10import sqlalchemy as sa 1a

11from sqlalchemy.ext.asyncio import AsyncSession 1a

12from sqlalchemy.sql import Select 1a

13 

14import prefect.server.models as models 1a

15from prefect.server import schemas 1a

16from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

17from prefect.server.schemas.actions import BlockDocumentReferenceCreate 1a

18from prefect.server.schemas.core import BlockDocument 1a

19from prefect.server.schemas.filters import BlockSchemaFilter 1a

20from prefect.server.utilities.database import UUID as UUIDTypeDecorator 1a

21from prefect.utilities.collections import dict_to_flatdict, flatdict_to_dict 1a

22from prefect.utilities.names import obfuscate 1a

23 

24T = TypeVar("T", bound=tuple) 1a

25 

26 

27@db_injector 1a

28async def create_block_document( 1a

29 db: PrefectDBInterface, 

30 session: AsyncSession, 

31 block_document: schemas.actions.BlockDocumentCreate, 

32) -> BlockDocument: 

33 # lookup block type name and copy to the block document table 

34 block_type = await models.block_types.read_block_type( 1cb

35 session=session, block_type_id=block_document.block_type_id 

36 ) 

37 assert block_type, f"Block type {block_document.block_type_id} not found" 

38 

39 name: Union[str, None] 

40 # anonymous block documents can be given a random name if none is provided 

41 if block_document.is_anonymous and not block_document.name: 

42 name = f"anonymous-{uuid4()}" 

43 else: 

44 name = block_document.name 

45 

46 orm_block = db.BlockDocument( 

47 name=name, 

48 block_schema_id=block_document.block_schema_id, 

49 block_type_id=block_document.block_type_id, 

50 block_type_name=block_type.name, 

51 is_anonymous=block_document.is_anonymous, 

52 ) 

53 

54 ( 

55 block_document_data_without_refs, 

56 block_document_references, 

57 ) = _separate_block_references_from_data(block_document.data) 

58 

59 # encrypt the data and store in block document 

60 await orm_block.encrypt_data(session=session, data=block_document_data_without_refs) 

61 

62 session.add(orm_block) 

63 await session.flush() 

64 

65 # Create a block document reference for each reference in the block document data 

66 for key, reference_block_document_id in block_document_references: 

67 await create_block_document_reference( 

68 session=session, 

69 block_document_reference=BlockDocumentReferenceCreate( 

70 parent_block_document_id=orm_block.id, 

71 reference_block_document_id=reference_block_document_id, 

72 name=key, 

73 ), 

74 ) 

75 

76 # reload the block document in order to load the associated block schema 

77 # relationship 

78 new_block_document = await read_block_document_by_id( 

79 session=session, 

80 block_document_id=orm_block.id, 

81 include_secrets=False, 

82 ) 

83 assert new_block_document 

84 

85 return new_block_document 

86 

87 

88@db_injector 1a

89async def block_document_with_unique_values_exists( 1a

90 db: PrefectDBInterface, session: AsyncSession, block_type_id: UUID, name: str 

91) -> bool: 

92 result = await session.execute( 

93 sa.select(sa.exists(db.BlockDocument)).where( 

94 db.BlockDocument.block_type_id == block_type_id, 

95 db.BlockDocument.name == name, 

96 ) 

97 ) 

98 return bool(result.scalar_one_or_none()) 

99 

100 

101def _separate_block_references_from_data( 1a

102 block_document_data: Dict, 

103) -> Tuple[Dict, List[Tuple[str, UUID]]]: 

104 """ 

105 Separates block document references from block document data so that a block 

106 document can be saved without references and the corresponding block document 

107 references can be saved. 

108 

109 Args: 

110 block_document_data: Dictionary of block document data passed with request 

111 to create new block document 

112 

113 Returns: 

114 block_document_data_with_out_refs: A copy of the block_document_data supplied 

115 with the block document references removed. 

116 block_document_references: A list of tuples each containing the name of the 

117 field referencing a block document and the ID of the referenced block 

118 document, 

119 """ 

120 block_document_references = [] 

121 block_document_data_without_refs = {} 

122 for key, value in block_document_data.items(): 

123 # Current assumption is that any block references will be stored on a key of 

124 # the block document data and not nested in any other data structures. 

125 if isinstance(value, dict) and "$ref" in value: 

126 reference_block_document_id = value["$ref"].get("block_document_id") 

127 if reference_block_document_id is None: 

128 raise ValueError( 

129 f"Received block reference without a block_document_id in key {key}" 

130 ) 

131 block_document_references.append((key, reference_block_document_id)) 

132 else: 

133 block_document_data_without_refs[key] = value 

134 return block_document_data_without_refs, block_document_references 

135 

136 

137async def read_block_document_by_id( 1a

138 session: AsyncSession, 

139 block_document_id: UUID, 

140 include_secrets: bool = False, 

141) -> Union[BlockDocument, None]: 

142 block_documents = await read_block_documents( 1cdefgb

143 session=session, 

144 block_document_filter=schemas.filters.BlockDocumentFilter( 

145 id=dict(any_=[block_document_id]), 

146 # don't apply any anonymous filtering 

147 is_anonymous=None, 

148 ), 

149 include_secrets=include_secrets, 

150 limit=1, 

151 ) 

152 return block_documents[0] if block_documents else None 

153 

154 

155async def _construct_full_block_document( 1a

156 db: PrefectDBInterface, 

157 session: AsyncSession, 

158 block_documents_with_references: Sequence[ 

159 Tuple[orm_models.ORMBlockDocument, Optional[str], Optional[UUID]] 

160 ], 

161 parent_block_document: Optional[BlockDocument] = None, 

162 include_secrets: bool = False, 

163) -> Optional[BlockDocument]: 

164 if len(block_documents_with_references) == 0: 

165 return None 

166 if parent_block_document is None: 

167 parent_block_document = copy( 

168 await _find_parent_block_document( 

169 session, 

170 block_documents_with_references, 

171 include_secrets=include_secrets, 

172 ) 

173 ) 

174 

175 if parent_block_document is None: 

176 raise ValueError("Unable to determine parent block document") 

177 

178 # Recursively walk block document tree and construct the full block 

179 # document data for each child and add it to the parent's block document 

180 # data 

181 for ( 

182 orm_block_document, 

183 name, 

184 parent_block_document_id, 

185 ) in block_documents_with_references: 

186 if parent_block_document_id == parent_block_document.id and name is not None: 

187 block_document = await BlockDocument.from_orm_model( 

188 session, orm_block_document, include_secrets=include_secrets 

189 ) 

190 full_child_block_document = await _construct_full_block_document( 

191 db, 

192 session, 

193 block_documents_with_references, 

194 parent_block_document=copy(block_document), 

195 include_secrets=include_secrets, 

196 ) 

197 assert full_child_block_document 

198 parent_block_document.data[name] = full_child_block_document.data 

199 parent_block_document.block_document_references[name] = { 

200 "block_document": { 

201 "id": block_document.id, 

202 "name": block_document.name, 

203 "block_type": block_document.block_type, 

204 "is_anonymous": block_document.is_anonymous, 

205 "block_document_references": ( 

206 full_child_block_document.block_document_references 

207 ), 

208 } 

209 } 

210 

211 return parent_block_document 

212 

213 

214async def _find_parent_block_document( 1a

215 session: AsyncSession, 

216 block_documents_with_references: Sequence[ 

217 Tuple[orm_models.ORMBlockDocument, Optional[str], Optional[UUID]] 

218 ], 

219 include_secrets: bool = False, 

220) -> Union[BlockDocument, None]: 

221 parent_orm_block_document = next( 

222 ( 

223 block_document 

224 for ( 

225 block_document, 

226 _, 

227 parent_block_document_id, 

228 ) in block_documents_with_references 

229 if parent_block_document_id is None 

230 ), 

231 None, 

232 ) 

233 return ( 

234 await BlockDocument.from_orm_model( 

235 session, 

236 parent_orm_block_document, 

237 include_secrets=include_secrets, 

238 ) 

239 if parent_orm_block_document is not None 

240 else None 

241 ) 

242 

243 

244async def read_block_document_by_name( 1a

245 session: AsyncSession, 

246 name: str, 

247 block_type_slug: str, 

248 include_secrets: bool = False, 

249) -> Union[BlockDocument, None]: 

250 """ 

251 Read a block document with the given name and block type slug. 

252 """ 

253 block_documents = await read_block_documents( 1cb

254 session=session, 

255 block_document_filter=schemas.filters.BlockDocumentFilter( 

256 name=dict(any_=[name]), 

257 # don't apply any anonymous filtering 

258 is_anonymous=None, 

259 ), 

260 block_type_filter=schemas.filters.BlockTypeFilter( 

261 slug=dict(any_=[block_type_slug]) 

262 ), 

263 include_secrets=include_secrets, 

264 limit=1, 

265 ) 

266 return block_documents[0] if block_documents else None 

267 

268 

269def _apply_block_document_filters( 1a

270 db: PrefectDBInterface, 

271 query: Select[T], 

272 block_document_filter: Optional[schemas.filters.BlockDocumentFilter] = None, 

273 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None, 

274 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None, 

275) -> Select[T]: 

276 # if no filter is provided, one is created that excludes anonymous blocks 

277 if block_document_filter is None: 1cdefb

278 block_document_filter = schemas.filters.BlockDocumentFilter( 1cb

279 is_anonymous=schemas.filters.BlockDocumentFilterIsAnonymous(eq_=False) 

280 ) 

281 

282 # --- Build an initial query that filters for the requested block documents 

283 query = query.where(block_document_filter.as_sql_filter()) 1cdefb

284 

285 if block_type_filter is not None: 1cdefb

286 block_type_exists_clause = sa.select(db.BlockType).where( 1cb

287 db.BlockType.id == db.BlockDocument.block_type_id, 

288 block_type_filter.as_sql_filter(), 

289 ) 

290 query = query.where(block_type_exists_clause.exists()) 1cb

291 

292 if block_schema_filter is not None: 1cdefb

293 block_schema_exists_clause = sa.select(db.BlockSchema).where( 1cb

294 db.BlockSchema.id == db.BlockDocument.block_schema_id, 

295 block_schema_filter.as_sql_filter(), 

296 ) 

297 query = query.where(block_schema_exists_clause.exists()) 1cb

298 

299 return query 1cdefb

300 

301 

302@db_injector 1a

303async def read_block_documents( 1a

304 db: PrefectDBInterface, 

305 session: AsyncSession, 

306 block_document_filter: Optional[schemas.filters.BlockDocumentFilter] = None, 

307 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None, 

308 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None, 

309 include_secrets: bool = False, 

310 sort: schemas.sorting.BlockDocumentSort = schemas.sorting.BlockDocumentSort.NAME_ASC, 

311 offset: Optional[int] = None, 

312 limit: Optional[int] = None, 

313) -> List[BlockDocument]: 

314 """ 

315 Read block documents with an optional limit and offset 

316 """ 

317 # --- Build an initial query that filters for the requested block documents 

318 filtered_block_documents_query = sa.select(db.BlockDocument.id) 1cdefb

319 filtered_block_documents_query = _apply_block_document_filters( 1cdefb

320 db, 

321 query=filtered_block_documents_query, 

322 block_document_filter=block_document_filter, 

323 block_type_filter=block_type_filter, 

324 block_schema_filter=block_schema_filter, 

325 ) 

326 filtered_block_documents_query = filtered_block_documents_query.order_by( 1cdefb

327 *sort.as_sql_sort() 

328 ) 

329 

330 if offset is not None: 1cdefb

331 filtered_block_documents_query = filtered_block_documents_query.offset(offset) 1cb

332 

333 if limit is not None: 333 ↛ 336line 333 didn't jump to line 336 because the condition on line 333 was always true1cdefb

334 filtered_block_documents_query = filtered_block_documents_query.limit(limit) 1cdefb

335 

336 filtered_block_documents_cte = filtered_block_documents_query.cte( 1cdefb

337 "filtered_block_documents" 

338 ) 

339 

340 # --- Build a recursive query that starts with the filtered block documents 

341 # and iteratively loads all referenced block documents. The query includes 

342 # the ID of each block document as well as the ID of the document that 

343 # references it and name it's referenced by, if applicable. 

344 parent_documents = ( 1cdefb

345 sa.select( 

346 filtered_block_documents_cte.c.id, 

347 sa.cast(sa.null(), sa.String).label("reference_name"), 

348 sa.cast(sa.null(), UUIDTypeDecorator).label( 

349 "reference_parent_block_document_id" 

350 ), 

351 ) 

352 .select_from(filtered_block_documents_cte) 

353 .cte("all_block_documents", recursive=True) 

354 ) 

355 # recursive part of query 

356 referenced_documents = ( 1cdefb

357 sa.select( 

358 db.BlockDocumentReference.reference_block_document_id, 

359 db.BlockDocumentReference.name, 

360 db.BlockDocumentReference.parent_block_document_id, 

361 ) 

362 .select_from(parent_documents) 

363 .join( 

364 db.BlockDocumentReference, 

365 db.BlockDocumentReference.parent_block_document_id == parent_documents.c.id, 

366 ) 

367 ) 

368 # union the recursive CTE 

369 all_block_documents_query = parent_documents.union_all(referenced_documents) 1cdefb

370 

371 # --- Join the recursive query that contains all required document IDs 

372 # back to the BlockDocument table to load info for every document 

373 # and order by name 

374 final_query = ( 1cdefb

375 sa.select( 

376 db.BlockDocument, 

377 all_block_documents_query.c.reference_name, 

378 all_block_documents_query.c.reference_parent_block_document_id, 

379 ) 

380 .select_from(all_block_documents_query) 

381 .join(db.BlockDocument, db.BlockDocument.id == all_block_documents_query.c.id) 

382 .order_by(*sort.as_sql_sort()) 

383 ) 

384 

385 result = await session.execute( 1cdefgb

386 final_query.execution_options(populate_existing=True) 

387 ) 

388 

389 block_documents_with_references = result.unique().all() 

390 

391 # identify true "parent" documents as those with no reference parent ids 

392 parent_block_document_ids = [ 

393 d[0].id 

394 for d in block_documents_with_references 

395 if d.reference_parent_block_document_id is None 

396 ] 

397 

398 # walk the resulting dataset and hydrate all block documents 

399 fully_constructed_block_documents: List[BlockDocument] = [] 

400 visited_block_document_ids = [] 

401 for root_orm_block_document, _, _ in block_documents_with_references: 

402 if ( 

403 root_orm_block_document.id in parent_block_document_ids 

404 and root_orm_block_document.id not in visited_block_document_ids 

405 ): 

406 root_block_document = await BlockDocument.from_orm_model( 

407 session, root_orm_block_document, include_secrets=include_secrets 

408 ) 

409 constructed = await _construct_full_block_document( 

410 db, 

411 session, 

412 block_documents_with_references, # type: ignore 

413 root_block_document, 

414 include_secrets=include_secrets, 

415 ) 

416 assert constructed 

417 

418 fully_constructed_block_documents.append(constructed) 

419 visited_block_document_ids.append(root_orm_block_document.id) 

420 

421 block_schema_ids = [ 

422 block_document.block_schema_id 

423 for block_document in fully_constructed_block_documents 

424 ] 

425 block_schemas = await models.block_schemas.read_block_schemas( 1cdefgb

426 session=session, 

427 block_schema_filter=BlockSchemaFilter(id=dict(any_=block_schema_ids)), 

428 ) 

429 for block_document in fully_constructed_block_documents: 

430 corresponding_block_schema = next( 

431 block_schema 

432 for block_schema in block_schemas 

433 if block_schema.id == block_document.block_schema_id 

434 ) 

435 block_document.block_schema = corresponding_block_schema 

436 

437 return fully_constructed_block_documents 

438 

439 

440@db_injector 1a

441async def count_block_documents( 1a

442 db: PrefectDBInterface, 

443 session: AsyncSession, 

444 block_document_filter: Optional[schemas.filters.BlockDocumentFilter] = None, 

445 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None, 

446 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None, 

447) -> int: 

448 """ 

449 Count block documents that match the filters. 

450 """ 

451 query = sa.select(sa.func.count()).select_from(db.BlockDocument) 1cb

452 

453 query = _apply_block_document_filters( 1cb

454 db, 

455 query=query, 

456 block_document_filter=block_document_filter, 

457 block_schema_filter=block_schema_filter, 

458 block_type_filter=block_type_filter, 

459 ) 

460 

461 result = await session.execute(query) 1cb

462 return result.scalar() # type: ignore 

463 

464 

465@db_injector 1a

466async def delete_block_document( 1ag

467 db: PrefectDBInterface, 

468 session: AsyncSession, 

469 block_document_id: UUID, 

470) -> bool: 

471 query = sa.delete(db.BlockDocument).where(db.BlockDocument.id == block_document_id) 1cb

472 result = await session.execute(query) 1cb

473 return result.rowcount > 0 

474 

475 

476@db_injector 1a

477async def update_block_document( 1a

478 db: PrefectDBInterface, 

479 session: AsyncSession, 

480 block_document_id: UUID, 

481 block_document: schemas.actions.BlockDocumentUpdate, 

482) -> bool: 

483 merge_existing_data = block_document.merge_existing_data 1cb

484 current_block_document = await session.get(db.BlockDocument, block_document_id) 1cgb

485 if not current_block_document: 

486 return False 

487 

488 update_values = block_document.model_dump_for_orm( 1g

489 exclude_unset=merge_existing_data, 

490 exclude={"merge_existing_data"}, 

491 ) 

492 

493 if "data" in update_values and update_values["data"] is not None: 

494 current_data = await current_block_document.decrypt_data(session=session) 

495 

496 # if a value for a secret field was provided that is identical to the 

497 # obfuscated value of the current secret value, it means someone is 

498 # probably trying to update all of the documents fields without 

499 # realizing they are posting back obfuscated data, so we disregard the update 

500 flat_update_data = dict_to_flatdict(update_values["data"]) 1g

501 flat_current_data = dict_to_flatdict(current_data) 

502 for secret_field in current_block_document.block_schema.fields.get( 

503 "secret_fields", [] 

504 ): 

505 secret_key = tuple(secret_field.split(".")) 

506 current_secret = flat_current_data.get(secret_key) 

507 if current_secret is not None: 507 ↛ anywhereline 507 didn't jump anywhere: it always raised an exception.1g

508 if flat_update_data.get(secret_key) == obfuscate(current_secret): 

509 flat_update_data[secret_key] = current_secret 

510 # Looks for obfuscated values nested under a secret field with a wildcard. 

511 # If any obfuscated values are found, we assume that it shouldn't be update, 

512 # and they are replaced with the current value for that key to avoid losing 

513 # data during update. 

514 elif "*" in secret_key: 

515 wildcard_index = secret_key.index("*") 

516 for data_key in flat_update_data.keys(): 

517 if ( 

518 secret_key[0:wildcard_index] == data_key[0:wildcard_index] 

519 ) and ( 

520 flat_update_data[data_key] 

521 == obfuscate(flat_update_data[data_key]) 

522 ): 

523 flat_update_data[data_key] = flat_current_data[data_key] 

524 

525 update_values["data"] = flatdict_to_dict(flat_update_data) 

526 

527 if merge_existing_data: 

528 # merge the existing data and the new data for partial updates 

529 current_data.update(update_values["data"]) 

530 update_values["data"] = current_data 

531 

532 current_block_document_references = ( 

533 ( 

534 await session.execute( 

535 sa.select(db.BlockDocumentReference).filter_by( 

536 parent_block_document_id=block_document_id 

537 ) 

538 ) 

539 ) 

540 .scalars() 

541 .all() 

542 ) 

543 ( 

544 block_document_data_without_refs, 

545 new_block_document_references, 

546 ) = _separate_block_references_from_data(update_values["data"]) 

547 

548 # encrypt the data and write updated data to the block document 

549 await current_block_document.encrypt_data( 

550 session=session, data=block_document_data_without_refs 

551 ) 

552 

553 # `proposed_block_schema` is always the same as the schema on the client-side 

554 # Block class that is calling `save`, which may or may not be the same schema 

555 # as the one on the saved block document 

556 proposed_block_schema_id = block_document.block_schema_id 

557 

558 # if a new schema is proposed, update the block schema id for the block document 

559 if ( 

560 proposed_block_schema_id is not None 

561 and proposed_block_schema_id != current_block_document.block_schema_id 

562 ): 

563 proposed_block_schema = await session.get( 

564 db.BlockSchema, proposed_block_schema_id 

565 ) 

566 assert proposed_block_schema, ( 

567 f"Block schema {proposed_block_schema_id} not found" 

568 ) 

569 

570 # make sure the proposed schema is of the same block type as the current document 

571 if ( 

572 proposed_block_schema.block_type_id 

573 != current_block_document.block_type_id 

574 ): 

575 raise ValueError( 

576 "Must migrate block document to a block schema of the same block" 

577 " type." 

578 ) 

579 await session.execute( 

580 sa.update(db.BlockDocument) 

581 .where(db.BlockDocument.id == block_document_id) 

582 .values(block_schema_id=proposed_block_schema_id) 

583 ) 

584 

585 unchanged_block_document_references = [] 

586 for name, reference_block_document_id in new_block_document_references: 

587 matching_current_block_document_reference = _find_block_document_reference( 

588 current_block_document_references, 

589 name, 

590 reference_block_document_id, 

591 ) 

592 if matching_current_block_document_reference is None: 

593 await create_block_document_reference( 

594 session=session, 

595 block_document_reference=BlockDocumentReferenceCreate( 

596 parent_block_document_id=block_document_id, 

597 reference_block_document_id=reference_block_document_id, 

598 name=name, 

599 ), 

600 ) 

601 else: 

602 unchanged_block_document_references.append( 

603 matching_current_block_document_reference 

604 ) 

605 

606 for block_document_reference in current_block_document_references: 

607 if block_document_reference not in unchanged_block_document_references: 

608 await delete_block_document_reference( 

609 session, block_document_reference_id=block_document_reference.id 

610 ) 

611 

612 return True 

613 

614 

615def _find_block_document_reference( 1a

616 block_document_references: Sequence[orm_models.BlockDocumentReference], 

617 name: str, 

618 reference_block_document_id: UUID, 

619) -> Optional[orm_models.BlockDocumentReference]: 

620 return next( 

621 ( 

622 block_document_reference 

623 for block_document_reference in block_document_references 

624 if block_document_reference.name == name 

625 and block_document_reference.reference_block_document_id 

626 == reference_block_document_id 

627 ), 

628 None, 

629 ) 

630 

631 

632@db_injector 1a

633async def create_block_document_reference( 1a

634 db: PrefectDBInterface, 

635 session: AsyncSession, 

636 block_document_reference: schemas.actions.BlockDocumentReferenceCreate, 

637) -> Union[orm_models.BlockDocumentReference, None]: 

638 insert_stmt = db.queries.insert(db.BlockDocumentReference).values( 

639 **block_document_reference.model_dump_for_orm( 

640 exclude_unset=True, exclude={"created", "updated"} 

641 ) 

642 ) 

643 await session.execute(insert_stmt) 

644 

645 result = await session.execute( 

646 sa.select(db.BlockDocumentReference).where( 

647 db.BlockDocumentReference.id == block_document_reference.id 

648 ) 

649 ) 

650 

651 return result.scalar() 

652 

653 

654@db_injector 1a

655async def delete_block_document_reference( 1a

656 db: PrefectDBInterface, 

657 session: AsyncSession, 

658 block_document_reference_id: UUID, 

659) -> bool: 

660 query = sa.delete(db.BlockDocumentReference).where( 

661 db.BlockDocumentReference.id == block_document_reference_id 

662 ) 

663 result = await session.execute(query) 

664 return result.rowcount > 0