Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/variables.py: 81%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Routes for interacting with variable objects 

3""" 

4 

5from typing import List, Optional 1a

6from uuid import UUID 1a

7 

8import sqlalchemy as sa 1a

9from fastapi import Body, Depends, HTTPException, Path, status 1a

10from sqlalchemy.ext.asyncio import AsyncSession 1a

11 

12from prefect.server import models 1a

13from prefect.server.api.dependencies import LimitBody 1a

14from prefect.server.database import ( 1a

15 PrefectDBInterface, 

16 orm_models, 

17 provide_database_interface, 

18) 

19from prefect.server.schemas import actions, core, filters, sorting 1a

20from prefect.server.utilities.server import PrefectRouter 1a

21 

22 

23async def get_variable_or_404( 1a

24 session: AsyncSession, variable_id: UUID 

25) -> orm_models.Variable: 

26 """Returns a variable or raises 404 HTTPException if it does not exist""" 

27 

28 variable = await models.variables.read_variable( 1bc

29 session=session, variable_id=variable_id 

30 ) 

31 if not variable: 

32 raise HTTPException(status_code=404, detail="Variable not found.") 

33 

34 return variable 

35 

36 

37async def get_variable_by_name_or_404( 1a

38 session: AsyncSession, name: str 

39) -> orm_models.Variable: 

40 """Returns a variable or raises 404 HTTPException if it does not exist""" 

41 

42 variable = await models.variables.read_variable_by_name(session=session, name=name) 1bc

43 if not variable: 

44 raise HTTPException(status_code=404, detail="Variable not found.") 

45 

46 return variable 

47 

48 

49router: PrefectRouter = PrefectRouter( 1a

50 prefix="/variables", 

51 tags=["Variables"], 

52) 

53 

54 

55@router.post("/", status_code=status.HTTP_201_CREATED) 1a

56async def create_variable( 1a

57 variable: actions.VariableCreate, 

58 db: PrefectDBInterface = Depends(provide_database_interface), 

59) -> core.Variable: 

60 """ 

61 Create a variable. 

62 

63 For more information, see https://docs.prefect.io/v3/concepts/variables. 

64 """ 

65 async with db.session_context(begin_transaction=True) as session: 1bdc

66 try: 1bdc

67 model = await models.variables.create_variable( 1bdc

68 session=session, variable=variable 

69 ) 

70 except sa.exc.IntegrityError: 

71 raise HTTPException( 

72 status_code=409, 

73 detail=f"A variable with the name {variable.name!r} already exists.", 

74 ) 

75 

76 return core.Variable.model_validate(model, from_attributes=True) 1bc

77 

78 

79@router.get("/{id:uuid}") 1a

80async def read_variable( 1a

81 variable_id: UUID = Path(..., alias="id"), 

82 db: PrefectDBInterface = Depends(provide_database_interface), 

83) -> core.Variable: 

84 async with db.session_context() as session: 1bc

85 model = await get_variable_or_404(session=session, variable_id=variable_id) 1bc

86 

87 return core.Variable.model_validate(model, from_attributes=True) 1bc

88 

89 

90@router.get("/name/{name:str}") 1a

91async def read_variable_by_name( 1a

92 name: str = Path(...), 

93 db: PrefectDBInterface = Depends(provide_database_interface), 

94) -> core.Variable: 

95 async with db.session_context() as session: 1bc

96 model = await get_variable_by_name_or_404(session=session, name=name) 1bc

97 

98 return core.Variable.model_validate(model, from_attributes=True) 

99 

100 

101@router.post("/filter") 1a

102async def read_variables( 1a

103 limit: int = LimitBody(), 

104 offset: int = Body(0, ge=0), 

105 variables: Optional[filters.VariableFilter] = None, 

106 sort: sorting.VariableSort = Body(sorting.VariableSort.NAME_ASC), 

107 db: PrefectDBInterface = Depends(provide_database_interface), 

108) -> List[core.Variable]: 

109 async with db.session_context() as session: 1bdc

110 return await models.variables.read_variables( 1bdc

111 session=session, 

112 variable_filter=variables, 

113 sort=sort, 

114 offset=offset, 

115 limit=limit, 

116 ) 

117 

118 

119@router.post("/count") 1a

120async def count_variables( 1a

121 variables: Optional[filters.VariableFilter] = Body(None, embed=True), 

122 db: PrefectDBInterface = Depends(provide_database_interface), 

123) -> int: 

124 async with db.session_context() as session: 1bc

125 return await models.variables.count_variables( 1bc

126 session=session, 

127 variable_filter=variables, 

128 ) 

129 

130 

131@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

132async def update_variable( 1a

133 variable: actions.VariableUpdate, 

134 variable_id: UUID = Path(..., alias="id"), 

135 db: PrefectDBInterface = Depends(provide_database_interface), 

136) -> None: 

137 async with db.session_context(begin_transaction=True) as session: 1bc

138 updated = await models.variables.update_variable( 1bc

139 session=session, 

140 variable_id=variable_id, 

141 variable=variable, 

142 ) 

143 if not updated: 143 ↛ exitline 143 didn't return from function 'update_variable' because the condition on line 143 was always true1bc

144 raise HTTPException(status_code=404, detail="Variable not found.") 1bc

145 

146 

147@router.patch("/name/{name:str}", status_code=status.HTTP_204_NO_CONTENT) 1a

148async def update_variable_by_name( 1a

149 variable: actions.VariableUpdate, 

150 name: str = Path(..., alias="name"), 

151 db: PrefectDBInterface = Depends(provide_database_interface), 

152) -> None: 

153 async with db.session_context(begin_transaction=True) as session: 1bc

154 updated = await models.variables.update_variable_by_name( 1bc

155 session=session, 

156 name=name, 

157 variable=variable, 

158 ) 

159 if not updated: 159 ↛ exitline 159 didn't return from function 'update_variable_by_name' because the condition on line 159 was always true1bc

160 raise HTTPException(status_code=404, detail="Variable not found.") 1bc

161 

162 

163@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

164async def delete_variable( 1a

165 variable_id: UUID = Path(..., alias="id"), 

166 db: PrefectDBInterface = Depends(provide_database_interface), 

167) -> None: 

168 async with db.session_context(begin_transaction=True) as session: 1b

169 deleted = await models.variables.delete_variable( 1b

170 session=session, variable_id=variable_id 

171 ) 

172 if not deleted: 1b

173 raise HTTPException(status_code=404, detail="Variable not found.") 1b

174 

175 

176@router.delete("/name/{name:str}", status_code=status.HTTP_204_NO_CONTENT) 1a

177async def delete_variable_by_name( 1a

178 name: str = Path(...), 

179 db: PrefectDBInterface = Depends(provide_database_interface), 

180) -> None: 

181 async with db.session_context(begin_transaction=True) as session: 1bc

182 deleted = await models.variables.delete_variable_by_name( 1bc

183 session=session, name=name 

184 ) 

185 if not deleted: 185 ↛ exitline 185 didn't return from function 'delete_variable_by_name' because the condition on line 185 was always true1bc

186 raise HTTPException(status_code=404, detail="Variable not found.") 1bc