Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/block_types.py: 58%
59 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Functions for interacting with block type ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6import html 1a
7from typing import TYPE_CHECKING, Optional, Sequence, Union 1a
8from uuid import UUID 1a
10import sqlalchemy as sa 1a
11from sqlalchemy.ext.asyncio import AsyncSession 1a
13from prefect.server import schemas 1a
14from prefect.server.database import PrefectDBInterface, db_injector 1a
15from prefect.server.database.orm_models import BlockType 1a
17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1a
18 from prefect.client.schemas import BlockType as ClientBlockType
19 from prefect.client.schemas.actions import BlockTypeUpdate as ClientBlockTypeUpdate
22@db_injector 1a
23async def create_block_type( 1a
24 db: PrefectDBInterface,
25 session: AsyncSession,
26 block_type: Union[schemas.core.BlockType, "ClientBlockType"],
27 override: bool = False,
28) -> Union[BlockType, None]:
29 """
30 Create a new block type.
32 Args:
33 session: A database session
34 block_type: a block type object
36 Returns:
37 block_type: an ORM block type model
38 """
39 # We take a shortcut in many unit tests and in block registration to pass client
40 # models directly to this function. We will support this by converting them to
41 # the appropriate server model.
42 if not isinstance(block_type, schemas.core.BlockType): 1b
43 block_type = schemas.core.BlockType.model_validate( 1b
44 block_type.model_dump(mode="json")
45 )
47 insert_values = block_type.model_dump_for_orm( 1b
48 exclude_unset=False, exclude={"created", "updated", "id"}
49 )
50 if insert_values.get("description") is not None: 50 ↛ 54line 50 didn't jump to line 54 because the condition on line 50 was always true1b
51 insert_values["description"] = html.escape( 1b
52 insert_values["description"], quote=False
53 )
54 if insert_values.get("code_example") is not None: 54 ↛ 58line 54 didn't jump to line 58 because the condition on line 54 was always true1b
55 insert_values["code_example"] = html.escape( 1b
56 insert_values["code_example"], quote=False
57 )
58 insert_stmt = db.queries.insert(db.BlockType).values(**insert_values) 1b
59 if override: 1b
60 insert_stmt = insert_stmt.on_conflict_do_update( 1b
61 index_elements=db.orm.block_type_unique_upsert_columns,
62 set_=insert_values,
63 )
64 await session.execute(insert_stmt) 1b
66 query = (
67 sa.select(db.BlockType)
68 .where(
69 sa.and_(
70 db.BlockType.name == insert_values["name"],
71 )
72 )
73 .execution_options(populate_existing=True)
74 )
76 result = await session.execute(query) 1b
77 return result.scalar()
80@db_injector 1a
81async def read_block_type( 1a
82 db: PrefectDBInterface,
83 session: AsyncSession,
84 block_type_id: UUID,
85) -> Union[BlockType, None]:
86 """
87 Reads a block type by id.
89 Args:
90 session: A database session
91 block_type_id: a block_type id
93 Returns:
94 BlockType: an ORM block type model
95 """
96 return await session.get(db.BlockType, block_type_id)
99@db_injector 1a
100async def read_block_type_by_slug( 1a
101 db: PrefectDBInterface, session: AsyncSession, block_type_slug: str
102) -> Union[BlockType, None]:
103 """
104 Reads a block type by slug.
106 Args:
107 session: A database session
108 block_type_slug: a block type slug
110 Returns:
111 BlockType: an ORM block type model
113 """
114 result = await session.execute( 1b
115 sa.select(db.BlockType).where(db.BlockType.slug == block_type_slug)
116 )
117 return result.scalar()
120@db_injector 1a
121async def read_block_types( 1a
122 db: PrefectDBInterface,
123 session: AsyncSession,
124 block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None,
125 block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
126 limit: Optional[int] = None,
127 offset: Optional[int] = None,
128) -> Sequence[BlockType]:
129 """
130 Reads block types with an optional limit and offset
132 Args:
134 Returns:
135 List[BlockType]: List of
136 """
137 query = sa.select(db.BlockType).order_by(db.BlockType.name)
139 if block_type_filter is not None:
140 query = query.where(block_type_filter.as_sql_filter())
142 if block_schema_filter is not None:
143 exists_clause = sa.select(db.BlockSchema).where(
144 db.BlockSchema.block_type_id == db.BlockType.id,
145 block_schema_filter.as_sql_filter(),
146 )
147 query = query.where(exists_clause.exists())
149 if offset is not None:
150 query = query.offset(offset)
152 if limit is not None:
153 query = query.limit(limit)
155 result = await session.execute(query)
156 return result.scalars().unique().all()
159@db_injector 1a
160async def update_block_type( 1a
161 db: PrefectDBInterface,
162 session: AsyncSession,
163 block_type_id: Union[str, UUID],
164 block_type: Union[
165 schemas.actions.BlockTypeUpdate,
166 schemas.core.BlockType,
167 "ClientBlockTypeUpdate",
168 "ClientBlockType",
169 ],
170) -> bool:
171 """
172 Update a block type by id.
174 Args:
175 session: A database session
176 block_type_id: Data to update block type with
177 block_type: A block type id
179 Returns:
180 bool: True if the block type was updated
181 """
183 # We take a shortcut in many unit tests and in block registration to pass client
184 # models directly to this function. We will support this by converting them to
185 # the appropriate server model.
186 if not isinstance(block_type, schemas.actions.BlockTypeUpdate): 186 ↛ 194line 186 didn't jump to line 194 because the condition on line 186 was always true1b
187 block_type = schemas.actions.BlockTypeUpdate.model_validate( 1b
188 block_type.model_dump(
189 mode="json",
190 exclude={"id", "created", "updated", "name", "slug", "is_protected"},
191 )
192 )
194 update_statement = ( 1b
195 sa.update(db.BlockType)
196 .where(db.BlockType.id == block_type_id)
197 .values(**block_type.model_dump_for_orm(exclude_unset=True, exclude={"id"}))
198 )
199 result = await session.execute(update_statement) 1b
200 return result.rowcount > 0
203@db_injector 1a
204async def delete_block_type( 1a
205 db: PrefectDBInterface, session: AsyncSession, block_type_id: str
206) -> bool:
207 """
208 Delete a block type by id.
210 Args:
211 session: A database session
212 block_type_id: A block type id
214 Returns:
215 bool: True if the block type was updated
216 """
218 result = await session.execute(
219 sa.delete(db.BlockType).where(db.BlockType.id == block_type_id)
220 )
221 return result.rowcount > 0