Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/block_types.py: 31%
73 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from typing import List, Optional 1a
2from uuid import UUID 1a
4import sqlalchemy as sa 1a
5from fastapi import Body, Depends, HTTPException, Path, Query, status 1a
7from prefect.blocks.core import _should_update_block_type 1a
8from prefect.server import models, schemas 1a
9from prefect.server.api import dependencies 1a
10from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
11from prefect.server.utilities.server import PrefectRouter 1a
13router: PrefectRouter = PrefectRouter(prefix="/block_types", tags=["Block types"]) 1a
16@router.post("/", status_code=status.HTTP_201_CREATED) 1a
17async def create_block_type( 1a
18 block_type: schemas.actions.BlockTypeCreate,
19 db: PrefectDBInterface = Depends(provide_database_interface),
20) -> schemas.core.BlockType:
21 """
22 Create a new block type.
24 For more information, see https://docs.prefect.io/v3/concepts/blocks.
25 """
26 # API-created blocks cannot start with the word "Prefect"
27 # as it is reserved for system use
28 if block_type.name.lower().startswith("prefect"):
29 raise HTTPException(
30 status.HTTP_403_FORBIDDEN,
31 detail="Block type names beginning with 'Prefect' are reserved.",
32 )
33 try:
34 async with db.session_context(begin_transaction=True) as session:
35 created_block_type = await models.block_types.create_block_type(
36 session, block_type=block_type
37 )
38 except sa.exc.IntegrityError:
39 raise HTTPException(
40 status.HTTP_409_CONFLICT,
41 detail=f'Block type with name "{block_type.name}" already exists',
42 )
43 return created_block_type
46@router.get("/{id:uuid}") 1a
47async def read_block_type_by_id( 1a
48 block_type_id: UUID = Path(..., description="The block type ID", alias="id"),
49 db: PrefectDBInterface = Depends(provide_database_interface),
50) -> schemas.core.BlockType:
51 """
52 Get a block type by ID.
53 """
54 async with db.session_context() as session:
55 block_type = await models.block_types.read_block_type(
56 session=session, block_type_id=block_type_id
57 )
58 if not block_type:
59 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block type not found")
60 return block_type
63@router.get("/slug/{slug}") 1a
64async def read_block_type_by_slug( 1a
65 block_type_slug: str = Path(..., description="The block type name", alias="slug"),
66 db: PrefectDBInterface = Depends(provide_database_interface),
67) -> schemas.core.BlockType:
68 """
69 Get a block type by name.
70 """
71 async with db.session_context() as session:
72 block_type = await models.block_types.read_block_type_by_slug(
73 session=session, block_type_slug=block_type_slug
74 )
75 if not block_type:
76 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block type not found")
77 return block_type
80@router.post("/filter") 1a
81async def read_block_types( 1a
82 block_types: Optional[schemas.filters.BlockTypeFilter] = None,
83 block_schemas: Optional[schemas.filters.BlockSchemaFilter] = None,
84 limit: int = dependencies.LimitBody(),
85 offset: int = Body(0, ge=0),
86 db: PrefectDBInterface = Depends(provide_database_interface),
87) -> List[schemas.core.BlockType]:
88 """
89 Gets all block types. Optionally limit return with limit and offset.
90 """
91 async with db.session_context() as session:
92 return await models.block_types.read_block_types(
93 session=session,
94 limit=limit,
95 offset=offset,
96 block_type_filter=block_types,
97 block_schema_filter=block_schemas,
98 )
101@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a
102async def update_block_type( 1a
103 block_type: schemas.actions.BlockTypeUpdate,
104 block_type_id: UUID = Path(..., description="The block type ID", alias="id"),
105 db: PrefectDBInterface = Depends(provide_database_interface),
106) -> None:
107 """
108 Update a block type.
109 """
110 async with db.session_context(begin_transaction=True) as session:
111 db_block_type = await models.block_types.read_block_type(
112 session=session, block_type_id=block_type_id
113 )
114 if db_block_type is None:
115 raise HTTPException(
116 status.HTTP_404_NOT_FOUND, detail="Block type not found"
117 )
119 # Only update the block type if there is any meaningful changes.
120 # This avoids deadlocks when creating multiple blocks of the same type.
121 # This check happens client side, but we do it server side as well
122 # to accommodate older clients.
123 if _should_update_block_type(
124 block_type,
125 schemas.core.BlockType.model_validate(db_block_type, from_attributes=True),
126 ):
127 await models.block_types.update_block_type(
128 session=session, block_type=block_type, block_type_id=block_type_id
129 )
132@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a
133async def delete_block_type( 1a
134 block_type_id: UUID = Path(..., description="The block type ID", alias="id"),
135 db: PrefectDBInterface = Depends(provide_database_interface),
136) -> None:
137 async with db.session_context(begin_transaction=True) as session:
138 db_block_type = await models.block_types.read_block_type(
139 session=session, block_type_id=block_type_id
140 )
141 if db_block_type is None:
142 raise HTTPException(
143 status_code=status.HTTP_404_NOT_FOUND, detail="Block type not found"
144 )
145 elif db_block_type.is_protected:
146 raise HTTPException(
147 status.HTTP_403_FORBIDDEN,
148 detail="protected block types cannot be deleted.",
149 )
150 await models.block_types.delete_block_type(
151 session=session, block_type_id=block_type_id
152 )
155@router.get("/slug/{slug}/block_documents", tags=router.tags + ["Block documents"]) 1a
156async def read_block_documents_for_block_type( 1a
157 db: PrefectDBInterface = Depends(provide_database_interface),
158 block_type_slug: str = Path(..., description="The block type name", alias="slug"),
159 include_secrets: bool = Query(
160 False, description="Whether to include sensitive values in the block document."
161 ),
162) -> List[schemas.core.BlockDocument]:
163 async with db.session_context() as session:
164 block_type = await models.block_types.read_block_type_by_slug(
165 session=session, block_type_slug=block_type_slug
166 )
167 if not block_type:
168 raise HTTPException(
169 status.HTTP_404_NOT_FOUND, detail="Block type not found"
170 )
171 return await models.block_documents.read_block_documents(
172 session=session,
173 block_document_filter=schemas.filters.BlockDocumentFilter(
174 block_type_id=dict(any_=[block_type.id])
175 ),
176 include_secrets=include_secrets,
177 )
180@router.get( 1a
181 "/slug/{slug}/block_documents/name/{block_document_name}",
182 tags=router.tags + ["Block documents"],
183)
184async def read_block_document_by_name_for_block_type( 1a
185 db: PrefectDBInterface = Depends(provide_database_interface),
186 block_type_slug: str = Path(..., description="The block type name", alias="slug"),
187 block_document_name: str = Path(..., description="The block type name"),
188 include_secrets: bool = Query(
189 False, description="Whether to include sensitive values in the block document."
190 ),
191) -> schemas.core.BlockDocument:
192 async with db.session_context() as session:
193 block_document = await models.block_documents.read_block_document_by_name(
194 session=session,
195 block_type_slug=block_type_slug,
196 name=block_document_name,
197 include_secrets=include_secrets,
198 )
199 if not block_document:
200 raise HTTPException(
201 status.HTTP_404_NOT_FOUND, detail="Block document not found"
202 )
203 return block_document
206@router.post("/install_system_block_types") 1a
207async def install_system_block_types( 1a
208 db: PrefectDBInterface = Depends(provide_database_interface),
209) -> None:
210 # Don't begin a transaction. _install_protected_system_blocks will manage
211 # the transactions.
212 async with db.session_context(begin_transaction=False) as session:
213 await models.block_registration._install_protected_system_blocks(session)