Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/variables.py: 36%
61 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from typing import Optional, Sequence 1a
2from uuid import UUID 1a
4import sqlalchemy as sa 1a
5from sqlalchemy.ext.asyncio import AsyncSession 1a
7from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a
8from prefect.server.schemas import filters, sorting 1a
9from prefect.server.schemas.actions import VariableCreate, VariableUpdate 1a
12@db_injector 1a
13async def create_variable( 1a
14 db: PrefectDBInterface, session: AsyncSession, variable: VariableCreate
15) -> orm_models.Variable:
16 """
17 Create a variable
19 Args:
20 session: async database session
21 variable: variable to create
23 Returns:
24 orm_models.Variable
25 """
26 model = db.Variable(**variable.model_dump())
27 session.add(model)
28 await session.flush()
30 return model
33@db_injector 1a
34async def read_variable( 1a
35 db: PrefectDBInterface, session: AsyncSession, variable_id: UUID
36) -> Optional[orm_models.Variable]:
37 """
38 Reads a variable by id.
39 """
41 query = sa.select(db.Variable).where(db.Variable.id == variable_id)
43 result = await session.execute(query)
44 return result.scalar()
47@db_injector 1a
48async def read_variable_by_name( 1a
49 db: PrefectDBInterface, session: AsyncSession, name: str
50) -> Optional[orm_models.Variable]:
51 """
52 Reads a variable by name.
53 """
55 query = sa.select(db.Variable).where(db.Variable.name == name)
57 result = await session.execute(query)
58 return result.scalar()
61@db_injector 1a
62async def read_variables( 1a
63 db: PrefectDBInterface,
64 session: AsyncSession,
65 variable_filter: Optional[filters.VariableFilter] = None,
66 sort: sorting.VariableSort = sorting.VariableSort.NAME_ASC,
67 offset: Optional[int] = None,
68 limit: Optional[int] = None,
69) -> Sequence[orm_models.Variable]:
70 """
71 Read variables, applying filers.
72 """
73 query = sa.select(db.Variable).order_by(*sort.as_sql_sort())
75 if variable_filter:
76 query = query.where(variable_filter.as_sql_filter())
78 if offset is not None:
79 query = query.offset(offset)
80 if limit is not None:
81 query = query.limit(limit)
83 result = await session.execute(query)
84 return result.scalars().unique().all()
87@db_injector 1a
88async def count_variables( 1a
89 db: PrefectDBInterface,
90 session: AsyncSession,
91 variable_filter: Optional[filters.VariableFilter] = None,
92) -> int:
93 """
94 Count variables, applying filters.
95 """
97 query = sa.select(sa.func.count()).select_from(db.Variable)
99 if variable_filter:
100 query = query.where(variable_filter.as_sql_filter())
102 result = await session.execute(query)
103 return result.scalar_one()
106@db_injector 1a
107async def update_variable( 1a
108 db: PrefectDBInterface,
109 session: AsyncSession,
110 variable_id: UUID,
111 variable: VariableUpdate,
112) -> bool:
113 """
114 Updates a variable by id.
115 """
116 query = (
117 sa.update(db.Variable)
118 .where(db.Variable.id == variable_id)
119 .values(**variable.model_dump_for_orm(exclude_unset=True))
120 )
122 result = await session.execute(query)
123 return result.rowcount > 0
126@db_injector 1a
127async def update_variable_by_name( 1a
128 db: PrefectDBInterface, session: AsyncSession, name: str, variable: VariableUpdate
129) -> bool:
130 """
131 Updates a variable by name.
132 """
133 query = (
134 sa.update(db.Variable)
135 .where(db.Variable.name == name)
136 .values(**variable.model_dump_for_orm(exclude_unset=True))
137 )
139 result = await session.execute(query)
140 return result.rowcount > 0
143@db_injector 1a
144async def delete_variable( 1a
145 db: PrefectDBInterface, session: AsyncSession, variable_id: UUID
146) -> bool:
147 """
148 Delete a variable by id.
149 """
151 query = sa.delete(db.Variable).where(db.Variable.id == variable_id)
153 result = await session.execute(query)
154 return result.rowcount > 0
157@db_injector 1a
158async def delete_variable_by_name( 1a
159 db: PrefectDBInterface, session: AsyncSession, name: str
160) -> bool:
161 """
162 Delete a variable by name.
163 """
165 query = sa.delete(db.Variable).where(db.Variable.name == name)
167 result = await session.execute(query)
168 return result.rowcount > 0