Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_types.py: 58%

59 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Functions for interacting with block type ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6import html 1a

7from typing import TYPE_CHECKING, Optional, Sequence, Union 1a

8from uuid import UUID 1a

9 

10import sqlalchemy as sa 1a

11from sqlalchemy.ext.asyncio import AsyncSession 1a

12 

13from prefect.server import schemas 1a

14from prefect.server.database import PrefectDBInterface, db_injector 1a

15from prefect.server.database.orm_models import BlockType 1a

16 

17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1a

18 from prefect.client.schemas import BlockType as ClientBlockType 

19 from prefect.client.schemas.actions import BlockTypeUpdate as ClientBlockTypeUpdate 

20 

21 

22@db_injector 1a

23async def create_block_type( 1a

24 db: PrefectDBInterface, 

25 session: AsyncSession, 

26 block_type: Union[schemas.core.BlockType, "ClientBlockType"], 

27 override: bool = False, 

28) -> Union[BlockType, None]: 

29 """ 

30 Create a new block type. 

31 

32 Args: 

33 session: A database session 

34 block_type: a block type object 

35 

36 Returns: 

37 block_type: an ORM block type model 

38 """ 

39 # We take a shortcut in many unit tests and in block registration to pass client 

40 # models directly to this function. We will support this by converting them to 

41 # the appropriate server model. 

42 if not isinstance(block_type, schemas.core.BlockType): 1b

43 block_type = schemas.core.BlockType.model_validate( 1b

44 block_type.model_dump(mode="json") 

45 ) 

46 

47 insert_values = block_type.model_dump_for_orm( 1b

48 exclude_unset=False, exclude={"created", "updated", "id"} 

49 ) 

50 if insert_values.get("description") is not None: 50 ↛ 54line 50 didn't jump to line 54 because the condition on line 50 was always true1b

51 insert_values["description"] = html.escape( 1b

52 insert_values["description"], quote=False 

53 ) 

54 if insert_values.get("code_example") is not None: 54 ↛ 58line 54 didn't jump to line 58 because the condition on line 54 was always true1b

55 insert_values["code_example"] = html.escape( 1b

56 insert_values["code_example"], quote=False 

57 ) 

58 insert_stmt = db.queries.insert(db.BlockType).values(**insert_values) 1b

59 if override: 1b

60 insert_stmt = insert_stmt.on_conflict_do_update( 1b

61 index_elements=db.orm.block_type_unique_upsert_columns, 

62 set_=insert_values, 

63 ) 

64 await session.execute(insert_stmt) 1b

65 

66 query = ( 

67 sa.select(db.BlockType) 

68 .where( 

69 sa.and_( 

70 db.BlockType.name == insert_values["name"], 

71 ) 

72 ) 

73 .execution_options(populate_existing=True) 

74 ) 

75 

76 result = await session.execute(query) 1b

77 return result.scalar() 

78 

79 

80@db_injector 1a

81async def read_block_type( 1a

82 db: PrefectDBInterface, 

83 session: AsyncSession, 

84 block_type_id: UUID, 

85) -> Union[BlockType, None]: 

86 """ 

87 Reads a block type by id. 

88 

89 Args: 

90 session: A database session 

91 block_type_id: a block_type id 

92 

93 Returns: 

94 BlockType: an ORM block type model 

95 """ 

96 return await session.get(db.BlockType, block_type_id) 

97 

98 

99@db_injector 1a

100async def read_block_type_by_slug( 1a

101 db: PrefectDBInterface, session: AsyncSession, block_type_slug: str 

102) -> Union[BlockType, None]: 

103 """ 

104 Reads a block type by slug. 

105 

106 Args: 

107 session: A database session 

108 block_type_slug: a block type slug 

109 

110 Returns: 

111 BlockType: an ORM block type model 

112 

113 """ 

114 result = await session.execute( 1b

115 sa.select(db.BlockType).where(db.BlockType.slug == block_type_slug) 

116 ) 

117 return result.scalar() 

118 

119 

120@db_injector 1a

121async def read_block_types( 1a

122 db: PrefectDBInterface, 

123 session: AsyncSession, 

124 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None, 

125 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None, 

126 limit: Optional[int] = None, 

127 offset: Optional[int] = None, 

128) -> Sequence[BlockType]: 

129 """ 

130 Reads block types with an optional limit and offset 

131 

132 Args: 

133 

134 Returns: 

135 List[BlockType]: List of 

136 """ 

137 query = sa.select(db.BlockType).order_by(db.BlockType.name) 

138 

139 if block_type_filter is not None: 

140 query = query.where(block_type_filter.as_sql_filter()) 

141 

142 if block_schema_filter is not None: 

143 exists_clause = sa.select(db.BlockSchema).where( 

144 db.BlockSchema.block_type_id == db.BlockType.id, 

145 block_schema_filter.as_sql_filter(), 

146 ) 

147 query = query.where(exists_clause.exists()) 

148 

149 if offset is not None: 

150 query = query.offset(offset) 

151 

152 if limit is not None: 

153 query = query.limit(limit) 

154 

155 result = await session.execute(query) 

156 return result.scalars().unique().all() 

157 

158 

159@db_injector 1a

160async def update_block_type( 1a

161 db: PrefectDBInterface, 

162 session: AsyncSession, 

163 block_type_id: Union[str, UUID], 

164 block_type: Union[ 

165 schemas.actions.BlockTypeUpdate, 

166 schemas.core.BlockType, 

167 "ClientBlockTypeUpdate", 

168 "ClientBlockType", 

169 ], 

170) -> bool: 

171 """ 

172 Update a block type by id. 

173 

174 Args: 

175 session: A database session 

176 block_type_id: Data to update block type with 

177 block_type: A block type id 

178 

179 Returns: 

180 bool: True if the block type was updated 

181 """ 

182 

183 # We take a shortcut in many unit tests and in block registration to pass client 

184 # models directly to this function. We will support this by converting them to 

185 # the appropriate server model. 

186 if not isinstance(block_type, schemas.actions.BlockTypeUpdate): 186 ↛ 194line 186 didn't jump to line 194 because the condition on line 186 was always true1b

187 block_type = schemas.actions.BlockTypeUpdate.model_validate( 1b

188 block_type.model_dump( 

189 mode="json", 

190 exclude={"id", "created", "updated", "name", "slug", "is_protected"}, 

191 ) 

192 ) 

193 

194 update_statement = ( 1b

195 sa.update(db.BlockType) 

196 .where(db.BlockType.id == block_type_id) 

197 .values(**block_type.model_dump_for_orm(exclude_unset=True, exclude={"id"})) 

198 ) 

199 result = await session.execute(update_statement) 1b

200 return result.rowcount > 0 

201 

202 

203@db_injector 1a

204async def delete_block_type( 1a

205 db: PrefectDBInterface, session: AsyncSession, block_type_id: str 

206) -> bool: 

207 """ 

208 Delete a block type by id. 

209 

210 Args: 

211 session: A database session 

212 block_type_id: A block type id 

213 

214 Returns: 

215 bool: True if the block type was updated 

216 """ 

217 

218 result = await session.execute( 

219 sa.delete(db.BlockType).where(db.BlockType.id == block_type_id) 

220 ) 

221 return result.rowcount > 0