Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/block_schemas.py: 61%
54 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Routes for interacting with block schema objects.
3"""
5from typing import List, Optional 1a
6from uuid import UUID 1a
8from fastapi import ( 1a
9 Body,
10 Depends,
11 HTTPException,
12 Path,
13 Query,
14 Response,
15 status,
16)
18from prefect.server import models, schemas 1a
19from prefect.server.api import dependencies 1a
20from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
21from prefect.server.models.block_schemas import MissingBlockTypeException 1a
22from prefect.server.utilities.server import PrefectRouter 1a
24router: PrefectRouter = PrefectRouter(prefix="/block_schemas", tags=["Block schemas"]) 1a
27@router.post("/", status_code=status.HTTP_201_CREATED) 1a
28async def create_block_schema( 1a
29 block_schema: schemas.actions.BlockSchemaCreate,
30 response: Response,
31 db: PrefectDBInterface = Depends(provide_database_interface),
32) -> schemas.core.BlockSchema:
33 """
34 Create a block schema.
36 For more information, see https://docs.prefect.io/v3/concepts/blocks.
37 """
38 from prefect.blocks.core import Block 1cdb
40 async with db.session_context(begin_transaction=True) as session: 1cdb
41 block_type = await models.block_types.read_block_type( 1cdb
42 session=session, block_type_id=block_schema.block_type_id
43 )
44 if block_type is None:
45 raise HTTPException(
46 status.HTTP_404_NOT_FOUND,
47 detail=f"Block type {block_schema.block_type_id} not found.",
48 )
50 block_schema_checksum = Block._calculate_schema_checksum(block_schema.fields)
51 existing_block_schema = (
52 await models.block_schemas.read_block_schema_by_checksum(
53 session=session,
54 checksum=block_schema_checksum,
55 version=block_schema.version,
56 )
57 )
58 if existing_block_schema:
59 response.status_code = status.HTTP_200_OK
60 return existing_block_schema
61 try:
62 model = await models.block_schemas.create_block_schema(
63 session=session,
64 block_schema=block_schema,
65 )
66 except MissingBlockTypeException as ex:
67 raise HTTPException(status.HTTP_409_CONFLICT, detail=str(ex))
69 return model
72@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a
73async def delete_block_schema( 1a
74 block_schema_id: UUID = Path(..., description="The block schema id", alias="id"),
75 db: PrefectDBInterface = Depends(provide_database_interface),
76 api_version: str = Depends(dependencies.provide_request_api_version),
77) -> None:
78 """
79 Delete a block schema by id.
80 """
81 async with db.session_context(begin_transaction=True) as session: 1cb
82 block_schema = await models.block_schemas.read_block_schema( 1cb
83 session=session, block_schema_id=block_schema_id
84 )
85 if not block_schema:
86 raise HTTPException(
87 status_code=status.HTTP_404_NOT_FOUND, detail="Block schema not found"
88 )
90 if block_schema.block_type.is_protected:
91 raise HTTPException(
92 status.HTTP_403_FORBIDDEN,
93 detail="Block schemas for protected block types cannot be deleted.",
94 )
96 await models.block_schemas.delete_block_schema(
97 session=session, block_schema_id=block_schema_id
98 )
101@router.post("/filter") 1a
102async def read_block_schemas( 1a
103 block_schemas: Optional[schemas.filters.BlockSchemaFilter] = None,
104 limit: int = dependencies.LimitBody(),
105 offset: int = Body(0, ge=0),
106 db: PrefectDBInterface = Depends(provide_database_interface),
107) -> List[schemas.core.BlockSchema]:
108 """
109 Read all block schemas, optionally filtered by type
110 """
111 async with db.session_context() as session: 1cb
112 result = await models.block_schemas.read_block_schemas( 1cb
113 session=session,
114 block_schema_filter=block_schemas,
115 limit=limit,
116 offset=offset,
117 )
118 return result 1cb
121@router.get("/{id:uuid}") 1a
122async def read_block_schema_by_id( 1a
123 block_schema_id: UUID = Path(..., description="The block schema id", alias="id"),
124 db: PrefectDBInterface = Depends(provide_database_interface),
125) -> schemas.core.BlockSchema:
126 """
127 Get a block schema by id.
128 """
129 async with db.session_context() as session: 1cb
130 block_schema = await models.block_schemas.read_block_schema( 1cb
131 session=session, block_schema_id=block_schema_id
132 )
133 if not block_schema: 1cb
134 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block schema not found") 1c
135 return block_schema
138@router.get("/checksum/{checksum}") 1a
139async def read_block_schema_by_checksum( 1a
140 block_schema_checksum: str = Path(
141 ..., description="The block schema checksum", alias="checksum"
142 ),
143 db: PrefectDBInterface = Depends(provide_database_interface),
144 version: Optional[str] = Query(
145 None,
146 description=(
147 "Version of block schema. If not provided the most recently created block"
148 " schema with the matching checksum will be returned."
149 ),
150 ),
151) -> schemas.core.BlockSchema:
152 async with db.session_context() as session: 1cb
153 block_schema = await models.block_schemas.read_block_schema_by_checksum( 1cb
154 session=session, checksum=block_schema_checksum, version=version
155 )
156 if not block_schema: 156 ↛ 158line 156 didn't jump to line 158 because the condition on line 156 was always true1cb
157 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block schema not found") 1cb
158 return block_schema