Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/block_types.py: 71%
73 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from typing import List, Optional 1c
2from uuid import UUID 1c
4import sqlalchemy as sa 1c
5from fastapi import Body, Depends, HTTPException, Path, Query, status 1c
7from prefect.blocks.core import _should_update_block_type 1c
8from prefect.server import models, schemas 1c
9from prefect.server.api import dependencies 1c
10from prefect.server.database import PrefectDBInterface, provide_database_interface 1c
11from prefect.server.utilities.server import PrefectRouter 1c
13router: PrefectRouter = PrefectRouter(prefix="/block_types", tags=["Block types"]) 1c
16@router.post("/", status_code=status.HTTP_201_CREATED) 1c
17async def create_block_type( 1c
18 block_type: schemas.actions.BlockTypeCreate,
19 db: PrefectDBInterface = Depends(provide_database_interface),
20) -> schemas.core.BlockType:
21 """
22 Create a new block type.
24 For more information, see https://docs.prefect.io/v3/concepts/blocks.
25 """
26 # API-created blocks cannot start with the word "Prefect"
27 # as it is reserved for system use
28 if block_type.name.lower().startswith("prefect"): 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true1adb
29 raise HTTPException(
30 status.HTTP_403_FORBIDDEN,
31 detail="Block type names beginning with 'Prefect' are reserved.",
32 )
33 try: 1adb
34 async with db.session_context(begin_transaction=True) as session: 1adb
35 created_block_type = await models.block_types.create_block_type( 1adb
36 session, block_type=block_type
37 )
38 except sa.exc.IntegrityError: 1adb
39 raise HTTPException( 1adb
40 status.HTTP_409_CONFLICT,
41 detail=f'Block type with name "{block_type.name}" already exists',
42 )
43 return created_block_type 1ab
46@router.get("/{id:uuid}") 1c
47async def read_block_type_by_id( 1c
48 block_type_id: UUID = Path(..., description="The block type ID", alias="id"),
49 db: PrefectDBInterface = Depends(provide_database_interface),
50) -> schemas.core.BlockType:
51 """
52 Get a block type by ID.
53 """
54 async with db.session_context() as session: 1ab
55 block_type = await models.block_types.read_block_type( 1ab
56 session=session, block_type_id=block_type_id
57 )
58 if not block_type: 1ab
59 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block type not found") 1ab
60 return block_type 1ab
63@router.get("/slug/{slug}") 1c
64async def read_block_type_by_slug( 1c
65 block_type_slug: str = Path(..., description="The block type name", alias="slug"),
66 db: PrefectDBInterface = Depends(provide_database_interface),
67) -> schemas.core.BlockType:
68 """
69 Get a block type by name.
70 """
71 async with db.session_context() as session: 1ab
72 block_type = await models.block_types.read_block_type_by_slug( 1ab
73 session=session, block_type_slug=block_type_slug
74 )
75 if not block_type: 75 ↛ 77line 75 didn't jump to line 77 because the condition on line 75 was always true1ab
76 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block type not found") 1ab
77 return block_type
80@router.post("/filter") 1c
81async def read_block_types( 1c
82 block_types: Optional[schemas.filters.BlockTypeFilter] = None,
83 block_schemas: Optional[schemas.filters.BlockSchemaFilter] = None,
84 limit: int = dependencies.LimitBody(),
85 offset: int = Body(0, ge=0),
86 db: PrefectDBInterface = Depends(provide_database_interface),
87) -> List[schemas.core.BlockType]:
88 """
89 Gets all block types. Optionally limit return with limit and offset.
90 """
91 async with db.session_context() as session: 1adb
92 return await models.block_types.read_block_types( 1adb
93 session=session,
94 limit=limit,
95 offset=offset,
96 block_type_filter=block_types,
97 block_schema_filter=block_schemas,
98 )
101@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1c
102async def update_block_type( 1c
103 block_type: schemas.actions.BlockTypeUpdate,
104 block_type_id: UUID = Path(..., description="The block type ID", alias="id"),
105 db: PrefectDBInterface = Depends(provide_database_interface),
106) -> None:
107 """
108 Update a block type.
109 """
110 async with db.session_context(begin_transaction=True) as session: 1ab
111 db_block_type = await models.block_types.read_block_type( 1ab
112 session=session, block_type_id=block_type_id
113 )
114 if db_block_type is None:
115 raise HTTPException(
116 status.HTTP_404_NOT_FOUND, detail="Block type not found"
117 )
119 # Only update the block type if there is any meaningful changes.
120 # This avoids deadlocks when creating multiple blocks of the same type.
121 # This check happens client side, but we do it server side as well
122 # to accommodate older clients.
123 if _should_update_block_type(
124 block_type,
125 schemas.core.BlockType.model_validate(db_block_type, from_attributes=True),
126 ):
127 await models.block_types.update_block_type( 1ab
128 session=session, block_type=block_type, block_type_id=block_type_id
129 )
132@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1c
133async def delete_block_type( 1c
134 block_type_id: UUID = Path(..., description="The block type ID", alias="id"),
135 db: PrefectDBInterface = Depends(provide_database_interface),
136) -> None:
137 async with db.session_context(begin_transaction=True) as session: 1ab
138 db_block_type = await models.block_types.read_block_type( 1ab
139 session=session, block_type_id=block_type_id
140 )
141 if db_block_type is None:
142 raise HTTPException(
143 status_code=status.HTTP_404_NOT_FOUND, detail="Block type not found"
144 )
145 elif db_block_type.is_protected:
146 raise HTTPException(
147 status.HTTP_403_FORBIDDEN,
148 detail="protected block types cannot be deleted.",
149 )
150 await models.block_types.delete_block_type( 1ab
151 session=session, block_type_id=block_type_id
152 )
155@router.get("/slug/{slug}/block_documents", tags=router.tags + ["Block documents"]) 1c
156async def read_block_documents_for_block_type( 1c
157 db: PrefectDBInterface = Depends(provide_database_interface),
158 block_type_slug: str = Path(..., description="The block type name", alias="slug"),
159 include_secrets: bool = Query(
160 False, description="Whether to include sensitive values in the block document."
161 ),
162) -> List[schemas.core.BlockDocument]:
163 async with db.session_context() as session: 1ab
164 block_type = await models.block_types.read_block_type_by_slug( 1ab
165 session=session, block_type_slug=block_type_slug
166 )
167 if not block_type:
168 raise HTTPException(
169 status.HTTP_404_NOT_FOUND, detail="Block type not found"
170 )
171 return await models.block_documents.read_block_documents(
172 session=session,
173 block_document_filter=schemas.filters.BlockDocumentFilter(
174 block_type_id=dict(any_=[block_type.id])
175 ),
176 include_secrets=include_secrets,
177 )
180@router.get( 1c
181 "/slug/{slug}/block_documents/name/{block_document_name}",
182 tags=router.tags + ["Block documents"],
183)
184async def read_block_document_by_name_for_block_type( 1c
185 db: PrefectDBInterface = Depends(provide_database_interface),
186 block_type_slug: str = Path(..., description="The block type name", alias="slug"),
187 block_document_name: str = Path(..., description="The block type name"),
188 include_secrets: bool = Query(
189 False, description="Whether to include sensitive values in the block document."
190 ),
191) -> schemas.core.BlockDocument:
192 async with db.session_context() as session: 1ab
193 block_document = await models.block_documents.read_block_document_by_name( 1ab
194 session=session,
195 block_type_slug=block_type_slug,
196 name=block_document_name,
197 include_secrets=include_secrets,
198 )
199 if not block_document: 199 ↛ 203line 199 didn't jump to line 203 because the condition on line 199 was always true1ab
200 raise HTTPException( 1ab
201 status.HTTP_404_NOT_FOUND, detail="Block document not found"
202 )
203 return block_document
206@router.post("/install_system_block_types") 1c
207async def install_system_block_types( 1c
208 db: PrefectDBInterface = Depends(provide_database_interface),
209) -> None:
210 # Don't begin a transaction. _install_protected_system_blocks will manage
211 # the transactions.
212 async with db.session_context(begin_transaction=False) as session: 1a
213 await models.block_registration._install_protected_system_blocks(session) 1a