Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_types.py: 86%
59 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Functions for interacting with block type ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6import html 1c
7from typing import TYPE_CHECKING, Optional, Sequence, Union 1c
8from uuid import UUID 1c
10import sqlalchemy as sa 1c
11from sqlalchemy.ext.asyncio import AsyncSession 1c
13from prefect.server import schemas 1c
14from prefect.server.database import PrefectDBInterface, db_injector 1c
15from prefect.server.database.orm_models import BlockType 1c
17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1c
18 from prefect.client.schemas import BlockType as ClientBlockType
19 from prefect.client.schemas.actions import BlockTypeUpdate as ClientBlockTypeUpdate
22@db_injector 1c
23async def create_block_type( 1c
24 db: PrefectDBInterface,
25 session: AsyncSession,
26 block_type: Union[schemas.core.BlockType, "ClientBlockType"],
27 override: bool = False,
28) -> Union[BlockType, None]:
29 """
30 Create a new block type.
32 Args:
33 session: A database session
34 block_type: a block type object
36 Returns:
37 block_type: an ORM block type model
38 """
39 # We take a shortcut in many unit tests and in block registration to pass client
40 # models directly to this function. We will support this by converting them to
41 # the appropriate server model.
42 if not isinstance(block_type, schemas.core.BlockType): 1eadb
43 block_type = schemas.core.BlockType.model_validate( 1eadb
44 block_type.model_dump(mode="json")
45 )
47 insert_values = block_type.model_dump_for_orm( 1eadb
48 exclude_unset=False, exclude={"created", "updated", "id"}
49 )
50 if insert_values.get("description") is not None: 1eadb
51 insert_values["description"] = html.escape( 1eab
52 insert_values["description"], quote=False
53 )
54 if insert_values.get("code_example") is not None: 1eadb
55 insert_values["code_example"] = html.escape( 1eab
56 insert_values["code_example"], quote=False
57 )
58 insert_stmt = db.queries.insert(db.BlockType).values(**insert_values) 1eadb
59 if override: 1eadb
60 insert_stmt = insert_stmt.on_conflict_do_update( 1ea
61 index_elements=db.orm.block_type_unique_upsert_columns,
62 set_=insert_values,
63 )
64 await session.execute(insert_stmt) 1eadb
66 query = (
67 sa.select(db.BlockType)
68 .where(
69 sa.and_(
70 db.BlockType.name == insert_values["name"],
71 )
72 )
73 .execution_options(populate_existing=True)
74 )
76 result = await session.execute(query) 1eab
77 return result.scalar()
80@db_injector 1c
81async def read_block_type( 1c
82 db: PrefectDBInterface,
83 session: AsyncSession,
84 block_type_id: UUID,
85) -> Union[BlockType, None]:
86 """
87 Reads a block type by id.
89 Args:
90 session: A database session
91 block_type_id: a block_type id
93 Returns:
94 BlockType: an ORM block type model
95 """
96 return await session.get(db.BlockType, block_type_id) 1adb
99@db_injector 1c
100async def read_block_type_by_slug( 1c
101 db: PrefectDBInterface, session: AsyncSession, block_type_slug: str
102) -> Union[BlockType, None]:
103 """
104 Reads a block type by slug.
106 Args:
107 session: A database session
108 block_type_slug: a block type slug
110 Returns:
111 BlockType: an ORM block type model
113 """
114 result = await session.execute( 1eab
115 sa.select(db.BlockType).where(db.BlockType.slug == block_type_slug)
116 )
117 return result.scalar()
120@db_injector 1c
121async def read_block_types( 1c
122 db: PrefectDBInterface,
123 session: AsyncSession,
124 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None,
125 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
126 limit: Optional[int] = None,
127 offset: Optional[int] = None,
128) -> Sequence[BlockType]:
129 """
130 Reads block types with an optional limit and offset
132 Args:
134 Returns:
135 List[BlockType]: List of
136 """
137 query = sa.select(db.BlockType).order_by(db.BlockType.name) 1adb
139 if block_type_filter is not None: 1adb
140 query = query.where(block_type_filter.as_sql_filter()) 1adb
142 if block_schema_filter is not None: 1adb
143 exists_clause = sa.select(db.BlockSchema).where( 1adb
144 db.BlockSchema.block_type_id == db.BlockType.id,
145 block_schema_filter.as_sql_filter(),
146 )
147 query = query.where(exists_clause.exists()) 1adb
149 if offset is not None: 149 ↛ 152line 149 didn't jump to line 152 because the condition on line 149 was always true1adb
150 query = query.offset(offset) 1adb
152 if limit is not None: 152 ↛ 155line 152 didn't jump to line 155 because the condition on line 152 was always true1adb
153 query = query.limit(limit) 1adb
155 result = await session.execute(query) 1adb
156 return result.scalars().unique().all()
159@db_injector 1c
160async def update_block_type( 1c
161 db: PrefectDBInterface,
162 session: AsyncSession,
163 block_type_id: Union[str, UUID],
164 block_type: Union[
165 schemas.actions.BlockTypeUpdate,
166 schemas.core.BlockType,
167 "ClientBlockTypeUpdate",
168 "ClientBlockType",
169 ],
170) -> bool:
171 """
172 Update a block type by id.
174 Args:
175 session: A database session
176 block_type_id: Data to update block type with
177 block_type: A block type id
179 Returns:
180 bool: True if the block type was updated
181 """
183 # We take a shortcut in many unit tests and in block registration to pass client
184 # models directly to this function. We will support this by converting them to
185 # the appropriate server model.
186 if not isinstance(block_type, schemas.actions.BlockTypeUpdate): 1eab
187 block_type = schemas.actions.BlockTypeUpdate.model_validate( 1e
188 block_type.model_dump(
189 mode="json",
190 exclude={"id", "created", "updated", "name", "slug", "is_protected"},
191 )
192 )
194 update_statement = ( 1eab
195 sa.update(db.BlockType)
196 .where(db.BlockType.id == block_type_id)
197 .values(**block_type.model_dump_for_orm(exclude_unset=True, exclude={"id"}))
198 )
199 result = await session.execute(update_statement) 1eab
200 return result.rowcount > 0
203@db_injector 1c
204async def delete_block_type( 1c
205 db: PrefectDBInterface, session: AsyncSession, block_type_id: str
206) -> bool:
207 """
208 Delete a block type by id.
210 Args:
211 session: A database session
212 block_type_id: A block type id
214 Returns:
215 bool: True if the block type was updated
216 """
218 result = await session.execute( 1ab
219 sa.delete(db.BlockType).where(db.BlockType.id == block_type_id)
220 )
221 return result.rowcount > 0