Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_registration.py: 68%
81 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import json 1b
2from pathlib import Path 1b
3from typing import TYPE_CHECKING, Any, Dict, List, Union, cast 1b
4from uuid import UUID 1b
6from sqlalchemy.ext.asyncio import AsyncSession 1b
8from prefect.blocks.core import Block 1b
9from prefect.blocks.system import Secret 1b
10from prefect.blocks.webhook import Webhook 1b
11from prefect.filesystems import LocalFileSystem 1b
12from prefect.logging import get_logger 1b
13from prefect.server import models, schemas 1b
15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1b
16 import logging
18 from prefect.client.schemas import BlockSchema as ClientBlockSchema
19 from prefect.client.schemas import BlockType as ClientBlockType
21logger: "logging.Logger" = get_logger("server") 1b
23COLLECTIONS_BLOCKS_DATA_PATH = ( 1b
24 Path(__file__).parent.parent / "collection_blocks_data.json"
25)
28async def _install_protected_system_blocks(session: AsyncSession) -> None: 1b
29 """Install block types that the system expects to be present"""
30 protected_system_blocks = cast(List[Block], [Webhook, Secret, LocalFileSystem]) 1ac
32 for block in protected_system_blocks: 32 ↛ exitline 32 didn't return from function '_install_protected_system_blocks' because the loop on line 32 didn't complete1ac
33 async with session.begin(): 1ac
34 block_type = block._to_block_type() 1ac
36 server_block_type = schemas.core.BlockType.model_validate( 1ac
37 block_type.model_dump()
38 )
39 block_type.is_protected = True 1ac
40 server_block_type.is_protected = True 1ac
42 orm_block_type = await models.block_types.create_block_type( 1ac
43 session=session, block_type=server_block_type, override=True
44 )
45 assert orm_block_type is not None, (
46 f"Failed to create block type {block_type}"
47 )
49 await models.block_schemas.create_block_schema( 1ac
50 session=session,
51 block_schema=block._to_block_schema(block_type_id=orm_block_type.id),
52 override=True,
53 )
56async def register_block_schema( 1b
57 session: AsyncSession,
58 block_schema: Union[schemas.core.BlockSchema, "ClientBlockSchema"],
59) -> UUID:
60 """
61 Stores the provided block schema in the Prefect REST API database.
63 If a block schema with a matching checksum and version is already saved,
64 then the ID of the existing block schema will be returned.
66 Args:
67 session: A database session.
68 block_schema: A block schema object.
70 Returns:
71 The ID of the registered block schema.
72 """
74 from prefect.server.models.block_schemas import ( 1a
75 create_block_schema,
76 read_block_schema_by_checksum,
77 )
79 existing_block_schema = await read_block_schema_by_checksum( 1a
80 session=session, checksum=block_schema.checksum, version=block_schema.version
81 )
82 if existing_block_schema is None:
83 new_block_schema = await create_block_schema( 1a
84 session=session,
85 block_schema=block_schema,
86 )
87 return new_block_schema.id
88 else:
89 return existing_block_schema.id
92async def register_block_type( 1b
93 session: AsyncSession,
94 block_type: Union[schemas.core.BlockType, "ClientBlockType"],
95) -> UUID:
96 """
97 Stores the provided block type in the Prefect REST API database.
99 If a block type with a matching slug is already saved, then the block type
100 will be updated to match the passed in block type.
102 Args:
103 session: A database session.
104 block_type: A block type object.
106 Returns:
107 The ID of the registered block type.
108 """
109 from prefect.server.models.block_types import ( 1a
110 create_block_type,
111 read_block_type_by_slug,
112 update_block_type,
113 )
115 existing_block_type = await read_block_type_by_slug( 1a
116 session=session,
117 block_type_slug=block_type.slug,
118 )
119 if existing_block_type is None:
120 new_block_type = await create_block_type( 1a
121 session=session,
122 block_type=block_type,
123 )
124 assert new_block_type is not None, f"Failed to create block type {block_type}"
125 return new_block_type.id
126 else:
127 await update_block_type( 1a
128 session=session,
129 block_type_id=existing_block_type.id,
130 block_type=block_type,
131 )
132 return existing_block_type.id
135async def _load_collection_blocks_data() -> Dict[str, Any]: 1b
136 """Loads blocks data for whitelisted collections."""
137 import anyio 1a
139 async with await anyio.open_file(COLLECTIONS_BLOCKS_DATA_PATH, "r") as f: 1a
140 return json.loads(await f.read()) 1a
143async def _register_registry_blocks(session: AsyncSession) -> None: 1b
144 """Registers block from the client block registry."""
145 from prefect.blocks.core import Block 1a
146 from prefect.utilities.dispatch import get_registry_for_type 1a
148 block_registry = get_registry_for_type(Block) or {} 1a
150 for block_class in block_registry.values(): 150 ↛ exitline 150 didn't return from function '_register_registry_blocks' because the loop on line 150 didn't complete1a
151 # each block schema gets its own transaction
152 async with session.begin(): 1a
153 try: 1a
154 block_type_id = await register_block_type( 1a
155 session=session,
156 block_type=block_class._to_block_type(),
157 )
158 await register_block_schema( 1a
159 session=session,
160 block_schema=block_class._to_block_schema(
161 block_type_id=block_type_id
162 ),
163 )
164 except Exception:
165 logger.exception(
166 f"Failed to register block schema for block class {block_class}"
167 )
170async def _register_collection_blocks(session: AsyncSession) -> None: 1b
171 """Registers blocks from whitelisted collections."""
172 collections_blocks_data = await _load_collection_blocks_data() 1a
174 block_types = [ 1a
175 block_type
176 for collection in collections_blocks_data["collections"].values()
177 for block_type in collection["block_types"].values()
178 ]
180 # due to schema reference dependencies, we need to register all block types first
181 # and then register all block schemas
182 block_schemas: dict[str, dict[str, Any]] = {} 1a
184 async with session.begin(): 1a
185 for block_type in block_types: 185 ↛ 198line 185 didn't jump to line 1981a
186 block_schema = block_type.pop("block_schema", None) 1a
187 if not block_schema: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true1a
188 raise RuntimeError(
189 f"Block schema not found for block type {block_type.get('slug')!r}"
190 )
191 block_type_id = await register_block_type( 1a
192 session=session,
193 block_type=schemas.core.BlockType.model_validate(block_type),
194 )
195 block_schema["block_type_id"] = block_type_id
196 block_schemas[block_type["slug"]] = block_schema
198 async with session.begin(): 1a
199 for block_type_slug, block_schema in block_schemas.items():
200 try:
201 await register_block_schema( 1a
202 session=session,
203 block_schema=schemas.core.BlockSchema.model_validate(block_schema),
204 )
205 except Exception:
206 logger.exception(
207 f"Failed to register block schema for block type {block_type_slug}"
208 )
211async def run_block_auto_registration(session: AsyncSession) -> None: 1b
212 """
213 Registers all blocks in the client block registry and any blocks from Prefect
214 Collections that are configured for auto-registration.
216 Args:
217 session: A database session.
218 """
219 await _install_protected_system_blocks(session) 1a
220 await _register_registry_blocks(session) 1a
221 await _register_collection_blocks(session=session) 1a