Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_types.py: 86%

59 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Functions for interacting with block type ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6import html 1c

7from typing import TYPE_CHECKING, Optional, Sequence, Union 1c

8from uuid import UUID 1c

9 

10import sqlalchemy as sa 1c

11from sqlalchemy.ext.asyncio import AsyncSession 1c

12 

13from prefect.server import schemas 1c

14from prefect.server.database import PrefectDBInterface, db_injector 1c

15from prefect.server.database.orm_models import BlockType 1c

16 

17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1c

18 from prefect.client.schemas import BlockType as ClientBlockType 

19 from prefect.client.schemas.actions import BlockTypeUpdate as ClientBlockTypeUpdate 

20 

21 

22@db_injector 1c

23async def create_block_type( 1c

24 db: PrefectDBInterface, 

25 session: AsyncSession, 

26 block_type: Union[schemas.core.BlockType, "ClientBlockType"], 

27 override: bool = False, 

28) -> Union[BlockType, None]: 

29 """ 

30 Create a new block type. 

31 

32 Args: 

33 session: A database session 

34 block_type: a block type object 

35 

36 Returns: 

37 block_type: an ORM block type model 

38 """ 

39 # We take a shortcut in many unit tests and in block registration to pass client 

40 # models directly to this function. We will support this by converting them to 

41 # the appropriate server model. 

42 if not isinstance(block_type, schemas.core.BlockType): 1eadb

43 block_type = schemas.core.BlockType.model_validate( 1eadb

44 block_type.model_dump(mode="json") 

45 ) 

46 

47 insert_values = block_type.model_dump_for_orm( 1eadb

48 exclude_unset=False, exclude={"created", "updated", "id"} 

49 ) 

50 if insert_values.get("description") is not None: 1eadb

51 insert_values["description"] = html.escape( 1eab

52 insert_values["description"], quote=False 

53 ) 

54 if insert_values.get("code_example") is not None: 1eadb

55 insert_values["code_example"] = html.escape( 1eab

56 insert_values["code_example"], quote=False 

57 ) 

58 insert_stmt = db.queries.insert(db.BlockType).values(**insert_values) 1eadb

59 if override: 1eadb

60 insert_stmt = insert_stmt.on_conflict_do_update( 1ea

61 index_elements=db.orm.block_type_unique_upsert_columns, 

62 set_=insert_values, 

63 ) 

64 await session.execute(insert_stmt) 1eadb

65 

66 query = ( 

67 sa.select(db.BlockType) 

68 .where( 

69 sa.and_( 

70 db.BlockType.name == insert_values["name"], 

71 ) 

72 ) 

73 .execution_options(populate_existing=True) 

74 ) 

75 

76 result = await session.execute(query) 1eab

77 return result.scalar() 

78 

79 

80@db_injector 1c

81async def read_block_type( 1c

82 db: PrefectDBInterface, 

83 session: AsyncSession, 

84 block_type_id: UUID, 

85) -> Union[BlockType, None]: 

86 """ 

87 Reads a block type by id. 

88 

89 Args: 

90 session: A database session 

91 block_type_id: a block_type id 

92 

93 Returns: 

94 BlockType: an ORM block type model 

95 """ 

96 return await session.get(db.BlockType, block_type_id) 1adb

97 

98 

99@db_injector 1c

100async def read_block_type_by_slug( 1c

101 db: PrefectDBInterface, session: AsyncSession, block_type_slug: str 

102) -> Union[BlockType, None]: 

103 """ 

104 Reads a block type by slug. 

105 

106 Args: 

107 session: A database session 

108 block_type_slug: a block type slug 

109 

110 Returns: 

111 BlockType: an ORM block type model 

112 

113 """ 

114 result = await session.execute( 1eab

115 sa.select(db.BlockType).where(db.BlockType.slug == block_type_slug) 

116 ) 

117 return result.scalar() 

118 

119 

120@db_injector 1c

121async def read_block_types( 1c

122 db: PrefectDBInterface, 

123 session: AsyncSession, 

124 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None, 

125 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None, 

126 limit: Optional[int] = None, 

127 offset: Optional[int] = None, 

128) -> Sequence[BlockType]: 

129 """ 

130 Reads block types with an optional limit and offset 

131 

132 Args: 

133 

134 Returns: 

135 List[BlockType]: List of 

136 """ 

137 query = sa.select(db.BlockType).order_by(db.BlockType.name) 1adb

138 

139 if block_type_filter is not None: 1adb

140 query = query.where(block_type_filter.as_sql_filter()) 1adb

141 

142 if block_schema_filter is not None: 1adb

143 exists_clause = sa.select(db.BlockSchema).where( 1adb

144 db.BlockSchema.block_type_id == db.BlockType.id, 

145 block_schema_filter.as_sql_filter(), 

146 ) 

147 query = query.where(exists_clause.exists()) 1adb

148 

149 if offset is not None: 149 ↛ 152line 149 didn't jump to line 152 because the condition on line 149 was always true1adb

150 query = query.offset(offset) 1adb

151 

152 if limit is not None: 152 ↛ 155line 152 didn't jump to line 155 because the condition on line 152 was always true1adb

153 query = query.limit(limit) 1adb

154 

155 result = await session.execute(query) 1adb

156 return result.scalars().unique().all() 

157 

158 

159@db_injector 1c

160async def update_block_type( 1c

161 db: PrefectDBInterface, 

162 session: AsyncSession, 

163 block_type_id: Union[str, UUID], 

164 block_type: Union[ 

165 schemas.actions.BlockTypeUpdate, 

166 schemas.core.BlockType, 

167 "ClientBlockTypeUpdate", 

168 "ClientBlockType", 

169 ], 

170) -> bool: 

171 """ 

172 Update a block type by id. 

173 

174 Args: 

175 session: A database session 

176 block_type_id: Data to update block type with 

177 block_type: A block type id 

178 

179 Returns: 

180 bool: True if the block type was updated 

181 """ 

182 

183 # We take a shortcut in many unit tests and in block registration to pass client 

184 # models directly to this function. We will support this by converting them to 

185 # the appropriate server model. 

186 if not isinstance(block_type, schemas.actions.BlockTypeUpdate): 1eab

187 block_type = schemas.actions.BlockTypeUpdate.model_validate( 1e

188 block_type.model_dump( 

189 mode="json", 

190 exclude={"id", "created", "updated", "name", "slug", "is_protected"}, 

191 ) 

192 ) 

193 

194 update_statement = ( 1eab

195 sa.update(db.BlockType) 

196 .where(db.BlockType.id == block_type_id) 

197 .values(**block_type.model_dump_for_orm(exclude_unset=True, exclude={"id"})) 

198 ) 

199 result = await session.execute(update_statement) 1eab

200 return result.rowcount > 0 

201 

202 

203@db_injector 1c

204async def delete_block_type( 1c

205 db: PrefectDBInterface, session: AsyncSession, block_type_id: str 

206) -> bool: 

207 """ 

208 Delete a block type by id. 

209 

210 Args: 

211 session: A database session 

212 block_type_id: A block type id 

213 

214 Returns: 

215 bool: True if the block type was updated 

216 """ 

217 

218 result = await session.execute( 1ab

219 sa.delete(db.BlockType).where(db.BlockType.id == block_type_id) 

220 ) 

221 return result.rowcount > 0