Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_registration.py: 68%

81 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import json 1b

2from pathlib import Path 1b

3from typing import TYPE_CHECKING, Any, Dict, List, Union, cast 1b

4from uuid import UUID 1b

5 

6from sqlalchemy.ext.asyncio import AsyncSession 1b

7 

8from prefect.blocks.core import Block 1b

9from prefect.blocks.system import Secret 1b

10from prefect.blocks.webhook import Webhook 1b

11from prefect.filesystems import LocalFileSystem 1b

12from prefect.logging import get_logger 1b

13from prefect.server import models, schemas 1b

14 

15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1b

16 import logging 

17 

18 from prefect.client.schemas import BlockSchema as ClientBlockSchema 

19 from prefect.client.schemas import BlockType as ClientBlockType 

20 

21logger: "logging.Logger" = get_logger("server") 1b

22 

23COLLECTIONS_BLOCKS_DATA_PATH = ( 1b

24 Path(__file__).parent.parent / "collection_blocks_data.json" 

25) 

26 

27 

28async def _install_protected_system_blocks(session: AsyncSession) -> None: 1b

29 """Install block types that the system expects to be present""" 

30 protected_system_blocks = cast(List[Block], [Webhook, Secret, LocalFileSystem]) 1a

31 

32 for block in protected_system_blocks: 32 ↛ exitline 32 didn't return from function '_install_protected_system_blocks' because the loop on line 32 didn't complete1a

33 async with session.begin(): 1a

34 block_type = block._to_block_type() 1a

35 

36 server_block_type = schemas.core.BlockType.model_validate( 1a

37 block_type.model_dump() 

38 ) 

39 block_type.is_protected = True 1a

40 server_block_type.is_protected = True 1a

41 

42 orm_block_type = await models.block_types.create_block_type( 1a

43 session=session, block_type=server_block_type, override=True 

44 ) 

45 assert orm_block_type is not None, ( 

46 f"Failed to create block type {block_type}" 

47 ) 

48 

49 await models.block_schemas.create_block_schema( 1a

50 session=session, 

51 block_schema=block._to_block_schema(block_type_id=orm_block_type.id), 

52 override=True, 

53 ) 

54 

55 

56async def register_block_schema( 1b

57 session: AsyncSession, 

58 block_schema: Union[schemas.core.BlockSchema, "ClientBlockSchema"], 

59) -> UUID: 

60 """ 

61 Stores the provided block schema in the Prefect REST API database. 

62 

63 If a block schema with a matching checksum and version is already saved, 

64 then the ID of the existing block schema will be returned. 

65 

66 Args: 

67 session: A database session. 

68 block_schema: A block schema object. 

69 

70 Returns: 

71 The ID of the registered block schema. 

72 """ 

73 

74 from prefect.server.models.block_schemas import ( 1a

75 create_block_schema, 

76 read_block_schema_by_checksum, 

77 ) 

78 

79 existing_block_schema = await read_block_schema_by_checksum( 1a

80 session=session, checksum=block_schema.checksum, version=block_schema.version 

81 ) 

82 if existing_block_schema is None: 

83 new_block_schema = await create_block_schema( 1a

84 session=session, 

85 block_schema=block_schema, 

86 ) 

87 return new_block_schema.id 

88 else: 

89 return existing_block_schema.id 

90 

91 

92async def register_block_type( 1b

93 session: AsyncSession, 

94 block_type: Union[schemas.core.BlockType, "ClientBlockType"], 

95) -> UUID: 

96 """ 

97 Stores the provided block type in the Prefect REST API database. 

98 

99 If a block type with a matching slug is already saved, then the block type 

100 will be updated to match the passed in block type. 

101 

102 Args: 

103 session: A database session. 

104 block_type: A block type object. 

105 

106 Returns: 

107 The ID of the registered block type. 

108 """ 

109 from prefect.server.models.block_types import ( 1a

110 create_block_type, 

111 read_block_type_by_slug, 

112 update_block_type, 

113 ) 

114 

115 existing_block_type = await read_block_type_by_slug( 1a

116 session=session, 

117 block_type_slug=block_type.slug, 

118 ) 

119 if existing_block_type is None: 

120 new_block_type = await create_block_type( 1a

121 session=session, 

122 block_type=block_type, 

123 ) 

124 assert new_block_type is not None, f"Failed to create block type {block_type}" 

125 return new_block_type.id 

126 else: 

127 await update_block_type( 1a

128 session=session, 

129 block_type_id=existing_block_type.id, 

130 block_type=block_type, 

131 ) 

132 return existing_block_type.id 

133 

134 

135async def _load_collection_blocks_data() -> Dict[str, Any]: 1b

136 """Loads blocks data for whitelisted collections.""" 

137 import anyio 1a

138 

139 async with await anyio.open_file(COLLECTIONS_BLOCKS_DATA_PATH, "r") as f: 1a

140 return json.loads(await f.read()) 1a

141 

142 

143async def _register_registry_blocks(session: AsyncSession) -> None: 1b

144 """Registers block from the client block registry.""" 

145 from prefect.blocks.core import Block 1a

146 from prefect.utilities.dispatch import get_registry_for_type 1a

147 

148 block_registry = get_registry_for_type(Block) or {} 1a

149 

150 for block_class in block_registry.values(): 150 ↛ exitline 150 didn't return from function '_register_registry_blocks' because the loop on line 150 didn't complete1a

151 # each block schema gets its own transaction 

152 async with session.begin(): 1a

153 try: 1a

154 block_type_id = await register_block_type( 1a

155 session=session, 

156 block_type=block_class._to_block_type(), 

157 ) 

158 await register_block_schema( 1a

159 session=session, 

160 block_schema=block_class._to_block_schema( 

161 block_type_id=block_type_id 

162 ), 

163 ) 

164 except Exception: 

165 logger.exception( 

166 f"Failed to register block schema for block class {block_class}" 

167 ) 

168 

169 

170async def _register_collection_blocks(session: AsyncSession) -> None: 1b

171 """Registers blocks from whitelisted collections.""" 

172 collections_blocks_data = await _load_collection_blocks_data() 1a

173 

174 block_types = [ 1a

175 block_type 

176 for collection in collections_blocks_data["collections"].values() 

177 for block_type in collection["block_types"].values() 

178 ] 

179 

180 # due to schema reference dependencies, we need to register all block types first 

181 # and then register all block schemas 

182 block_schemas: dict[str, dict[str, Any]] = {} 1a

183 

184 async with session.begin(): 1a

185 for block_type in block_types: 185 ↛ 198line 185 didn't jump to line 1981a

186 block_schema = block_type.pop("block_schema", None) 1a

187 if not block_schema: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true1a

188 raise RuntimeError( 

189 f"Block schema not found for block type {block_type.get('slug')!r}" 

190 ) 

191 block_type_id = await register_block_type( 1a

192 session=session, 

193 block_type=schemas.core.BlockType.model_validate(block_type), 

194 ) 

195 block_schema["block_type_id"] = block_type_id 

196 block_schemas[block_type["slug"]] = block_schema 

197 

198 async with session.begin(): 1a

199 for block_type_slug, block_schema in block_schemas.items(): 

200 try: 

201 await register_block_schema( 1a

202 session=session, 

203 block_schema=schemas.core.BlockSchema.model_validate(block_schema), 

204 ) 

205 except Exception: 

206 logger.exception( 

207 f"Failed to register block schema for block type {block_type_slug}" 

208 ) 

209 

210 

211async def run_block_auto_registration(session: AsyncSession) -> None: 1b

212 """ 

213 Registers all blocks in the client block registry and any blocks from Prefect 

214 Collections that are configured for auto-registration. 

215 

216 Args: 

217 session: A database session. 

218 """ 

219 await _install_protected_system_blocks(session) 1a

220 await _register_registry_blocks(session) 1a

221 await _register_collection_blocks(session=session) 1a