Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/block_types.py: 31%

73 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from typing import List, Optional 1a

2from uuid import UUID 1a

3 

4import sqlalchemy as sa 1a

5from fastapi import Body, Depends, HTTPException, Path, Query, status 1a

6 

7from prefect.blocks.core import _should_update_block_type 1a

8from prefect.server import models, schemas 1a

9from prefect.server.api import dependencies 1a

10from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

11from prefect.server.utilities.server import PrefectRouter 1a

12 

13router: PrefectRouter = PrefectRouter(prefix="/block_types", tags=["Block types"]) 1a

14 

15 

16@router.post("/", status_code=status.HTTP_201_CREATED) 1a

17async def create_block_type( 1a

18 block_type: schemas.actions.BlockTypeCreate, 

19 db: PrefectDBInterface = Depends(provide_database_interface), 

20) -> schemas.core.BlockType: 

21 """ 

22 Create a new block type. 

23 

24 For more information, see https://docs.prefect.io/v3/concepts/blocks. 

25 """ 

26 # API-created blocks cannot start with the word "Prefect" 

27 # as it is reserved for system use 

28 if block_type.name.lower().startswith("prefect"): 

29 raise HTTPException( 

30 status.HTTP_403_FORBIDDEN, 

31 detail="Block type names beginning with 'Prefect' are reserved.", 

32 ) 

33 try: 

34 async with db.session_context(begin_transaction=True) as session: 

35 created_block_type = await models.block_types.create_block_type( 

36 session, block_type=block_type 

37 ) 

38 except sa.exc.IntegrityError: 

39 raise HTTPException( 

40 status.HTTP_409_CONFLICT, 

41 detail=f'Block type with name "{block_type.name}" already exists', 

42 ) 

43 return created_block_type 

44 

45 

46@router.get("/{id:uuid}") 1a

47async def read_block_type_by_id( 1a

48 block_type_id: UUID = Path(..., description="The block type ID", alias="id"), 

49 db: PrefectDBInterface = Depends(provide_database_interface), 

50) -> schemas.core.BlockType: 

51 """ 

52 Get a block type by ID. 

53 """ 

54 async with db.session_context() as session: 

55 block_type = await models.block_types.read_block_type( 

56 session=session, block_type_id=block_type_id 

57 ) 

58 if not block_type: 

59 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block type not found") 

60 return block_type 

61 

62 

63@router.get("/slug/{slug}") 1a

64async def read_block_type_by_slug( 1a

65 block_type_slug: str = Path(..., description="The block type name", alias="slug"), 

66 db: PrefectDBInterface = Depends(provide_database_interface), 

67) -> schemas.core.BlockType: 

68 """ 

69 Get a block type by name. 

70 """ 

71 async with db.session_context() as session: 

72 block_type = await models.block_types.read_block_type_by_slug( 

73 session=session, block_type_slug=block_type_slug 

74 ) 

75 if not block_type: 

76 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block type not found") 

77 return block_type 

78 

79 

80@router.post("/filter") 1a

81async def read_block_types( 1a

82 block_types: Optional[schemas.filters.BlockTypeFilter] = None, 

83 block_schemas: Optional[schemas.filters.BlockSchemaFilter] = None, 

84 limit: int = dependencies.LimitBody(), 

85 offset: int = Body(0, ge=0), 

86 db: PrefectDBInterface = Depends(provide_database_interface), 

87) -> List[schemas.core.BlockType]: 

88 """ 

89 Gets all block types. Optionally limit return with limit and offset. 

90 """ 

91 async with db.session_context() as session: 

92 return await models.block_types.read_block_types( 

93 session=session, 

94 limit=limit, 

95 offset=offset, 

96 block_type_filter=block_types, 

97 block_schema_filter=block_schemas, 

98 ) 

99 

100 

101@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

102async def update_block_type( 1a

103 block_type: schemas.actions.BlockTypeUpdate, 

104 block_type_id: UUID = Path(..., description="The block type ID", alias="id"), 

105 db: PrefectDBInterface = Depends(provide_database_interface), 

106) -> None: 

107 """ 

108 Update a block type. 

109 """ 

110 async with db.session_context(begin_transaction=True) as session: 

111 db_block_type = await models.block_types.read_block_type( 

112 session=session, block_type_id=block_type_id 

113 ) 

114 if db_block_type is None: 

115 raise HTTPException( 

116 status.HTTP_404_NOT_FOUND, detail="Block type not found" 

117 ) 

118 

119 # Only update the block type if there is any meaningful changes. 

120 # This avoids deadlocks when creating multiple blocks of the same type. 

121 # This check happens client side, but we do it server side as well 

122 # to accommodate older clients. 

123 if _should_update_block_type( 

124 block_type, 

125 schemas.core.BlockType.model_validate(db_block_type, from_attributes=True), 

126 ): 

127 await models.block_types.update_block_type( 

128 session=session, block_type=block_type, block_type_id=block_type_id 

129 ) 

130 

131 

132@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

133async def delete_block_type( 1a

134 block_type_id: UUID = Path(..., description="The block type ID", alias="id"), 

135 db: PrefectDBInterface = Depends(provide_database_interface), 

136) -> None: 

137 async with db.session_context(begin_transaction=True) as session: 

138 db_block_type = await models.block_types.read_block_type( 

139 session=session, block_type_id=block_type_id 

140 ) 

141 if db_block_type is None: 

142 raise HTTPException( 

143 status_code=status.HTTP_404_NOT_FOUND, detail="Block type not found" 

144 ) 

145 elif db_block_type.is_protected: 

146 raise HTTPException( 

147 status.HTTP_403_FORBIDDEN, 

148 detail="protected block types cannot be deleted.", 

149 ) 

150 await models.block_types.delete_block_type( 

151 session=session, block_type_id=block_type_id 

152 ) 

153 

154 

155@router.get("/slug/{slug}/block_documents", tags=router.tags + ["Block documents"]) 1a

156async def read_block_documents_for_block_type( 1a

157 db: PrefectDBInterface = Depends(provide_database_interface), 

158 block_type_slug: str = Path(..., description="The block type name", alias="slug"), 

159 include_secrets: bool = Query( 

160 False, description="Whether to include sensitive values in the block document." 

161 ), 

162) -> List[schemas.core.BlockDocument]: 

163 async with db.session_context() as session: 

164 block_type = await models.block_types.read_block_type_by_slug( 

165 session=session, block_type_slug=block_type_slug 

166 ) 

167 if not block_type: 

168 raise HTTPException( 

169 status.HTTP_404_NOT_FOUND, detail="Block type not found" 

170 ) 

171 return await models.block_documents.read_block_documents( 

172 session=session, 

173 block_document_filter=schemas.filters.BlockDocumentFilter( 

174 block_type_id=dict(any_=[block_type.id]) 

175 ), 

176 include_secrets=include_secrets, 

177 ) 

178 

179 

180@router.get( 1a

181 "/slug/{slug}/block_documents/name/{block_document_name}", 

182 tags=router.tags + ["Block documents"], 

183) 

184async def read_block_document_by_name_for_block_type( 1a

185 db: PrefectDBInterface = Depends(provide_database_interface), 

186 block_type_slug: str = Path(..., description="The block type name", alias="slug"), 

187 block_document_name: str = Path(..., description="The block type name"), 

188 include_secrets: bool = Query( 

189 False, description="Whether to include sensitive values in the block document." 

190 ), 

191) -> schemas.core.BlockDocument: 

192 async with db.session_context() as session: 

193 block_document = await models.block_documents.read_block_document_by_name( 

194 session=session, 

195 block_type_slug=block_type_slug, 

196 name=block_document_name, 

197 include_secrets=include_secrets, 

198 ) 

199 if not block_document: 

200 raise HTTPException( 

201 status.HTTP_404_NOT_FOUND, detail="Block document not found" 

202 ) 

203 return block_document 

204 

205 

206@router.post("/install_system_block_types") 1a

207async def install_system_block_types( 1a

208 db: PrefectDBInterface = Depends(provide_database_interface), 

209) -> None: 

210 # Don't begin a transaction. _install_protected_system_blocks will manage 

211 # the transactions. 

212 async with db.session_context(begin_transaction=False) as session: 

213 await models.block_registration._install_protected_system_blocks(session)