Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/variables.py: 36%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from typing import Optional, Sequence 1a

2from uuid import UUID 1a

3 

4import sqlalchemy as sa 1a

5from sqlalchemy.ext.asyncio import AsyncSession 1a

6 

7from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

8from prefect.server.schemas import filters, sorting 1a

9from prefect.server.schemas.actions import VariableCreate, VariableUpdate 1a

10 

11 

12@db_injector 1a

13async def create_variable( 1a

14 db: PrefectDBInterface, session: AsyncSession, variable: VariableCreate 

15) -> orm_models.Variable: 

16 """ 

17 Create a variable 

18 

19 Args: 

20 session: async database session 

21 variable: variable to create 

22 

23 Returns: 

24 orm_models.Variable 

25 """ 

26 model = db.Variable(**variable.model_dump()) 

27 session.add(model) 

28 await session.flush() 

29 

30 return model 

31 

32 

33@db_injector 1a

34async def read_variable( 1a

35 db: PrefectDBInterface, session: AsyncSession, variable_id: UUID 

36) -> Optional[orm_models.Variable]: 

37 """ 

38 Reads a variable by id. 

39 """ 

40 

41 query = sa.select(db.Variable).where(db.Variable.id == variable_id) 

42 

43 result = await session.execute(query) 

44 return result.scalar() 

45 

46 

47@db_injector 1a

48async def read_variable_by_name( 1a

49 db: PrefectDBInterface, session: AsyncSession, name: str 

50) -> Optional[orm_models.Variable]: 

51 """ 

52 Reads a variable by name. 

53 """ 

54 

55 query = sa.select(db.Variable).where(db.Variable.name == name) 

56 

57 result = await session.execute(query) 

58 return result.scalar() 

59 

60 

61@db_injector 1a

62async def read_variables( 1a

63 db: PrefectDBInterface, 

64 session: AsyncSession, 

65 variable_filter: Optional[filters.VariableFilter] = None, 

66 sort: sorting.VariableSort = sorting.VariableSort.NAME_ASC, 

67 offset: Optional[int] = None, 

68 limit: Optional[int] = None, 

69) -> Sequence[orm_models.Variable]: 

70 """ 

71 Read variables, applying filers. 

72 """ 

73 query = sa.select(db.Variable).order_by(*sort.as_sql_sort()) 

74 

75 if variable_filter: 

76 query = query.where(variable_filter.as_sql_filter()) 

77 

78 if offset is not None: 

79 query = query.offset(offset) 

80 if limit is not None: 

81 query = query.limit(limit) 

82 

83 result = await session.execute(query) 

84 return result.scalars().unique().all() 

85 

86 

87@db_injector 1a

88async def count_variables( 1a

89 db: PrefectDBInterface, 

90 session: AsyncSession, 

91 variable_filter: Optional[filters.VariableFilter] = None, 

92) -> int: 

93 """ 

94 Count variables, applying filters. 

95 """ 

96 

97 query = sa.select(sa.func.count()).select_from(db.Variable) 

98 

99 if variable_filter: 

100 query = query.where(variable_filter.as_sql_filter()) 

101 

102 result = await session.execute(query) 

103 return result.scalar_one() 

104 

105 

106@db_injector 1a

107async def update_variable( 1a

108 db: PrefectDBInterface, 

109 session: AsyncSession, 

110 variable_id: UUID, 

111 variable: VariableUpdate, 

112) -> bool: 

113 """ 

114 Updates a variable by id. 

115 """ 

116 query = ( 

117 sa.update(db.Variable) 

118 .where(db.Variable.id == variable_id) 

119 .values(**variable.model_dump_for_orm(exclude_unset=True)) 

120 ) 

121 

122 result = await session.execute(query) 

123 return result.rowcount > 0 

124 

125 

126@db_injector 1a

127async def update_variable_by_name( 1a

128 db: PrefectDBInterface, session: AsyncSession, name: str, variable: VariableUpdate 

129) -> bool: 

130 """ 

131 Updates a variable by name. 

132 """ 

133 query = ( 

134 sa.update(db.Variable) 

135 .where(db.Variable.name == name) 

136 .values(**variable.model_dump_for_orm(exclude_unset=True)) 

137 ) 

138 

139 result = await session.execute(query) 

140 return result.rowcount > 0 

141 

142 

143@db_injector 1a

144async def delete_variable( 1a

145 db: PrefectDBInterface, session: AsyncSession, variable_id: UUID 

146) -> bool: 

147 """ 

148 Delete a variable by id. 

149 """ 

150 

151 query = sa.delete(db.Variable).where(db.Variable.id == variable_id) 

152 

153 result = await session.execute(query) 

154 return result.rowcount > 0 

155 

156 

157@db_injector 1a

158async def delete_variable_by_name( 1a

159 db: PrefectDBInterface, session: AsyncSession, name: str 

160) -> bool: 

161 """ 

162 Delete a variable by name. 

163 """ 

164 

165 query = sa.delete(db.Variable).where(db.Variable.name == name) 

166 

167 result = await session.execute(query) 

168 return result.rowcount > 0