Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/variables.py: 87%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from typing import Optional, Sequence 1b

2from uuid import UUID 1b

3 

4import sqlalchemy as sa 1b

5from sqlalchemy.ext.asyncio import AsyncSession 1b

6 

7from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1b

8from prefect.server.schemas import filters, sorting 1b

9from prefect.server.schemas.actions import VariableCreate, VariableUpdate 1b

10 

11 

12@db_injector 1b

13async def create_variable( 1b

14 db: PrefectDBInterface, session: AsyncSession, variable: VariableCreate 

15) -> orm_models.Variable: 

16 """ 

17 Create a variable 

18 

19 Args: 

20 session: async database session 

21 variable: variable to create 

22 

23 Returns: 

24 orm_models.Variable 

25 """ 

26 model = db.Variable(**variable.model_dump()) 1adc

27 session.add(model) 1adc

28 await session.flush() 1adc

29 

30 return model 

31 

32 

33@db_injector 1b

34async def read_variable( 1b

35 db: PrefectDBInterface, session: AsyncSession, variable_id: UUID 

36) -> Optional[orm_models.Variable]: 

37 """ 

38 Reads a variable by id. 

39 """ 

40 

41 query = sa.select(db.Variable).where(db.Variable.id == variable_id) 1ac

42 

43 result = await session.execute(query) 1ac

44 return result.scalar() 

45 

46 

47@db_injector 1b

48async def read_variable_by_name( 1b

49 db: PrefectDBInterface, session: AsyncSession, name: str 

50) -> Optional[orm_models.Variable]: 

51 """ 

52 Reads a variable by name. 

53 """ 

54 

55 query = sa.select(db.Variable).where(db.Variable.name == name) 1ac

56 

57 result = await session.execute(query) 1ac

58 return result.scalar() 

59 

60 

61@db_injector 1b

62async def read_variables( 1b

63 db: PrefectDBInterface, 

64 session: AsyncSession, 

65 variable_filter: Optional[filters.VariableFilter] = None, 

66 sort: sorting.VariableSort = sorting.VariableSort.NAME_ASC, 

67 offset: Optional[int] = None, 

68 limit: Optional[int] = None, 

69) -> Sequence[orm_models.Variable]: 

70 """ 

71 Read variables, applying filers. 

72 """ 

73 query = sa.select(db.Variable).order_by(*sort.as_sql_sort()) 1adc

74 

75 if variable_filter: 1adc

76 query = query.where(variable_filter.as_sql_filter()) 1adc

77 

78 if offset is not None: 1adc

79 query = query.offset(offset) 1adc

80 if limit is not None: 1adc

81 query = query.limit(limit) 1adc

82 

83 result = await session.execute(query) 1adc

84 return result.scalars().unique().all() 

85 

86 

87@db_injector 1b

88async def count_variables( 1b

89 db: PrefectDBInterface, 

90 session: AsyncSession, 

91 variable_filter: Optional[filters.VariableFilter] = None, 

92) -> int: 

93 """ 

94 Count variables, applying filters. 

95 """ 

96 

97 query = sa.select(sa.func.count()).select_from(db.Variable) 1ac

98 

99 if variable_filter: 1ac

100 query = query.where(variable_filter.as_sql_filter()) 

101 

102 result = await session.execute(query) 1ac

103 return result.scalar_one() 

104 

105 

106@db_injector 1b

107async def update_variable( 1b

108 db: PrefectDBInterface, 

109 session: AsyncSession, 

110 variable_id: UUID, 

111 variable: VariableUpdate, 

112) -> bool: 

113 """ 

114 Updates a variable by id. 

115 """ 

116 query = ( 1ac

117 sa.update(db.Variable) 

118 .where(db.Variable.id == variable_id) 

119 .values(**variable.model_dump_for_orm(exclude_unset=True)) 

120 ) 

121 

122 result = await session.execute(query) 1ac

123 return result.rowcount > 0 

124 

125 

126@db_injector 1b

127async def update_variable_by_name( 1b

128 db: PrefectDBInterface, session: AsyncSession, name: str, variable: VariableUpdate 

129) -> bool: 

130 """ 

131 Updates a variable by name. 

132 """ 

133 query = ( 1ac

134 sa.update(db.Variable) 

135 .where(db.Variable.name == name) 

136 .values(**variable.model_dump_for_orm(exclude_unset=True)) 

137 ) 

138 

139 result = await session.execute(query) 1ac

140 return result.rowcount > 0 

141 

142 

143@db_injector 1b

144async def delete_variable( 1b

145 db: PrefectDBInterface, session: AsyncSession, variable_id: UUID 

146) -> bool: 

147 """ 

148 Delete a variable by id. 

149 """ 

150 

151 query = sa.delete(db.Variable).where(db.Variable.id == variable_id) 1a

152 

153 result = await session.execute(query) 1a

154 return result.rowcount > 0 

155 

156 

157@db_injector 1b

158async def delete_variable_by_name( 1b

159 db: PrefectDBInterface, session: AsyncSession, name: str 

160) -> bool: 

161 """ 

162 Delete a variable by name. 

163 """ 

164 

165 query = sa.delete(db.Variable).where(db.Variable.name == name) 1ac

166 

167 result = await session.execute(query) 1ac

168 return result.rowcount > 0