Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/block_schemas.py: 61%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Routes for interacting with block schema objects. 

3""" 

4 

5from typing import List, Optional 1a

6from uuid import UUID 1a

7 

8from fastapi import ( 1a

9 Body, 

10 Depends, 

11 HTTPException, 

12 Path, 

13 Query, 

14 Response, 

15 status, 

16) 

17 

18from prefect.server import models, schemas 1a

19from prefect.server.api import dependencies 1a

20from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

21from prefect.server.models.block_schemas import MissingBlockTypeException 1a

22from prefect.server.utilities.server import PrefectRouter 1a

23 

24router: PrefectRouter = PrefectRouter(prefix="/block_schemas", tags=["Block schemas"]) 1a

25 

26 

27@router.post("/", status_code=status.HTTP_201_CREATED) 1a

28async def create_block_schema( 1a

29 block_schema: schemas.actions.BlockSchemaCreate, 

30 response: Response, 

31 db: PrefectDBInterface = Depends(provide_database_interface), 

32) -> schemas.core.BlockSchema: 

33 """ 

34 Create a block schema. 

35 

36 For more information, see https://docs.prefect.io/v3/concepts/blocks. 

37 """ 

38 from prefect.blocks.core import Block 1cdb

39 

40 async with db.session_context(begin_transaction=True) as session: 1cdb

41 block_type = await models.block_types.read_block_type( 1cdb

42 session=session, block_type_id=block_schema.block_type_id 

43 ) 

44 if block_type is None: 

45 raise HTTPException( 

46 status.HTTP_404_NOT_FOUND, 

47 detail=f"Block type {block_schema.block_type_id} not found.", 

48 ) 

49 

50 block_schema_checksum = Block._calculate_schema_checksum(block_schema.fields) 

51 existing_block_schema = ( 

52 await models.block_schemas.read_block_schema_by_checksum( 

53 session=session, 

54 checksum=block_schema_checksum, 

55 version=block_schema.version, 

56 ) 

57 ) 

58 if existing_block_schema: 

59 response.status_code = status.HTTP_200_OK 

60 return existing_block_schema 

61 try: 

62 model = await models.block_schemas.create_block_schema( 

63 session=session, 

64 block_schema=block_schema, 

65 ) 

66 except MissingBlockTypeException as ex: 

67 raise HTTPException(status.HTTP_409_CONFLICT, detail=str(ex)) 

68 

69 return model 

70 

71 

72@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

73async def delete_block_schema( 1a

74 block_schema_id: UUID = Path(..., description="The block schema id", alias="id"), 

75 db: PrefectDBInterface = Depends(provide_database_interface), 

76 api_version: str = Depends(dependencies.provide_request_api_version), 

77) -> None: 

78 """ 

79 Delete a block schema by id. 

80 """ 

81 async with db.session_context(begin_transaction=True) as session: 1cb

82 block_schema = await models.block_schemas.read_block_schema( 1cb

83 session=session, block_schema_id=block_schema_id 

84 ) 

85 if not block_schema: 

86 raise HTTPException( 

87 status_code=status.HTTP_404_NOT_FOUND, detail="Block schema not found" 

88 ) 

89 

90 if block_schema.block_type.is_protected: 

91 raise HTTPException( 

92 status.HTTP_403_FORBIDDEN, 

93 detail="Block schemas for protected block types cannot be deleted.", 

94 ) 

95 

96 await models.block_schemas.delete_block_schema( 

97 session=session, block_schema_id=block_schema_id 

98 ) 

99 

100 

101@router.post("/filter") 1a

102async def read_block_schemas( 1a

103 block_schemas: Optional[schemas.filters.BlockSchemaFilter] = None, 

104 limit: int = dependencies.LimitBody(), 

105 offset: int = Body(0, ge=0), 

106 db: PrefectDBInterface = Depends(provide_database_interface), 

107) -> List[schemas.core.BlockSchema]: 

108 """ 

109 Read all block schemas, optionally filtered by type 

110 """ 

111 async with db.session_context() as session: 1cb

112 result = await models.block_schemas.read_block_schemas( 1cb

113 session=session, 

114 block_schema_filter=block_schemas, 

115 limit=limit, 

116 offset=offset, 

117 ) 

118 return result 1cb

119 

120 

121@router.get("/{id:uuid}") 1a

122async def read_block_schema_by_id( 1a

123 block_schema_id: UUID = Path(..., description="The block schema id", alias="id"), 

124 db: PrefectDBInterface = Depends(provide_database_interface), 

125) -> schemas.core.BlockSchema: 

126 """ 

127 Get a block schema by id. 

128 """ 

129 async with db.session_context() as session: 1cb

130 block_schema = await models.block_schemas.read_block_schema( 1cb

131 session=session, block_schema_id=block_schema_id 

132 ) 

133 if not block_schema: 1cb

134 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block schema not found") 1c

135 return block_schema 

136 

137 

138@router.get("/checksum/{checksum}") 1a

139async def read_block_schema_by_checksum( 1a

140 block_schema_checksum: str = Path( 

141 ..., description="The block schema checksum", alias="checksum" 

142 ), 

143 db: PrefectDBInterface = Depends(provide_database_interface), 

144 version: Optional[str] = Query( 

145 None, 

146 description=( 

147 "Version of block schema. If not provided the most recently created block" 

148 " schema with the matching checksum will be returned." 

149 ), 

150 ), 

151) -> schemas.core.BlockSchema: 

152 async with db.session_context() as session: 1cb

153 block_schema = await models.block_schemas.read_block_schema_by_checksum( 1cb

154 session=session, checksum=block_schema_checksum, version=version 

155 ) 

156 if not block_schema: 156 ↛ 158line 156 didn't jump to line 158 because the condition on line 156 was always true1cb

157 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block schema not found") 1cb

158 return block_schema