Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/automations.py: 82%

93 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from typing import Optional, Sequence 1b

2from uuid import UUID 1b

3 

4from fastapi import Body, Depends, HTTPException, Path 1b

5from fastapi.exceptions import RequestValidationError 1b

6from pydantic import ValidationError 1b

7 

8from prefect._internal.compatibility.starlette import status 1b

9from prefect.server.api.dependencies import LimitBody 1b

10from prefect.server.api.validation import ( 1b

11 validate_job_variables_for_run_deployment_action, 

12) 

13from prefect.server.database import PrefectDBInterface, provide_database_interface 1b

14from prefect.server.events import actions 1b

15from prefect.server.events.filters import AutomationFilter, AutomationFilterCreated 1b

16from prefect.server.events.models import automations as automations_models 1b

17from prefect.server.events.schemas.automations import ( 1b

18 Automation, 

19 AutomationCreate, 

20 AutomationPartialUpdate, 

21 AutomationSort, 

22 AutomationUpdate, 

23) 

24from prefect.server.exceptions import ObjectNotFoundError 1b

25from prefect.server.utilities.server import PrefectRouter 1b

26from prefect.types._datetime import now 1b

27from prefect.utilities.schema_tools.validation import ( 1b

28 ValidationError as JSONSchemaValidationError, 

29) 

30 

31router: PrefectRouter = PrefectRouter( 1b

32 prefix="/automations", 

33 tags=["Automations"], 

34 dependencies=[], 

35) 

36 

37 

38@router.post("/", status_code=status.HTTP_201_CREATED) 1b

39async def create_automation( 1b

40 automation: AutomationCreate, 

41 db: PrefectDBInterface = Depends(provide_database_interface), 

42) -> Automation: 

43 """ 

44 Create an automation. 

45 

46 For more information, see https://docs.prefect.io/v3/concepts/automations. 

47 """ 

48 # reset any client-provided IDs on the provided triggers 

49 automation.trigger.reset_ids() 1adc

50 

51 errors = [] 1adc

52 for action in automation.actions: 1adc

53 if ( 53 ↛ 59line 53 didn't jump to line 59 because the condition on line 53 was never true

54 isinstance(action, actions.RunDeployment) 

55 and action.deployment_id is not None 

56 and action.job_variables is not None 

57 and action.job_variables != {} 

58 ): 

59 async with db.session_context() as session: 

60 try: 

61 await validate_job_variables_for_run_deployment_action( 

62 session, action 

63 ) 

64 except JSONSchemaValidationError as exc: 

65 errors.append(str(exc)) 

66 

67 if errors: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true1adc

68 raise HTTPException( 

69 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

70 detail=f"Error creating automation: {' '.join(errors)}", 

71 ) 

72 

73 automation_dict = automation.model_dump() 1adc

74 owner_resource = automation_dict.pop("owner_resource", None) 1adc

75 

76 async with db.session_context(begin_transaction=True) as session: 1adc

77 created_automation = await automations_models.create_automation( 1adc

78 session=session, 

79 automation=Automation( 

80 **automation_dict, 

81 ), 

82 ) 

83 

84 if owner_resource: 

85 await automations_models.relate_automation_to_resource( 

86 session, 

87 automation_id=created_automation.id, 

88 resource_id=owner_resource, 

89 owned_by_resource=True, 

90 ) 

91 

92 return created_automation 1adc

93 

94 

95@router.put("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1b

96async def update_automation( 1b

97 automation: AutomationUpdate, 

98 automation_id: UUID = Path(..., alias="id"), 

99 db: PrefectDBInterface = Depends(provide_database_interface), 

100) -> None: 

101 # reset any client-provided IDs on the provided triggers 

102 automation.trigger.reset_ids() 1ac

103 

104 errors = [] 1ac

105 for action in automation.actions: 1ac

106 if ( 106 ↛ 111line 106 didn't jump to line 111 because the condition on line 106 was never true

107 isinstance(action, actions.RunDeployment) 

108 and action.deployment_id is not None 

109 and action.job_variables is not None 

110 ): 

111 async with db.session_context() as session: 

112 try: 

113 await validate_job_variables_for_run_deployment_action( 

114 session, action 

115 ) 

116 except JSONSchemaValidationError as exc: 

117 errors.append(str(exc)) 

118 if errors: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true1ac

119 raise HTTPException( 

120 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

121 detail=f"Error creating automation: {' '.join(errors)}", 

122 ) 

123 

124 async with db.session_context(begin_transaction=True) as session: 1ac

125 updated = await automations_models.update_automation( 1ac

126 session=session, 

127 automation_update=automation, 

128 automation_id=automation_id, 

129 ) 

130 

131 if not updated: 1ac

132 raise ObjectNotFoundError("Automation not found") 1a

133 

134 

135@router.patch( 1b

136 "/{id:uuid}", 

137 status_code=status.HTTP_204_NO_CONTENT, 

138) 

139async def patch_automation( 1b

140 automation: AutomationPartialUpdate, 

141 automation_id: UUID = Path(..., alias="id"), 

142 db: PrefectDBInterface = Depends(provide_database_interface), 

143) -> None: 

144 try: 1ac

145 async with db.session_context(begin_transaction=True) as session: 1ac

146 updated = await automations_models.update_automation( 1ac

147 session=session, 

148 automation_update=automation, 

149 automation_id=automation_id, 

150 ) 

151 except ValidationError as e: 

152 raise RequestValidationError( 

153 errors=e.errors(), 

154 body=automation.model_dump(mode="json"), 

155 ) 

156 

157 if not updated: 1ac

158 raise ObjectNotFoundError("Automation not found") 1a

159 

160 

161@router.delete( 1b

162 "/{id:uuid}", 

163 status_code=status.HTTP_204_NO_CONTENT, 

164) 

165async def delete_automation( 1b

166 automation_id: UUID = Path(..., alias="id"), 

167 db: PrefectDBInterface = Depends(provide_database_interface), 

168) -> None: 

169 async with db.session_context(begin_transaction=True) as session: 1ac

170 deleted = await automations_models.delete_automation( 1ac

171 session=session, 

172 automation_id=automation_id, 

173 ) 

174 

175 if not deleted: 1ac

176 raise ObjectNotFoundError("Automation not found") 1a

177 

178 

179@router.post("/filter") 1b

180async def read_automations( 1b

181 sort: AutomationSort = Body(AutomationSort.NAME_ASC), 

182 limit: int = LimitBody(), 

183 offset: int = Body(0, ge=0), 

184 automations: Optional[AutomationFilter] = None, 

185 db: PrefectDBInterface = Depends(provide_database_interface), 

186) -> Sequence[Automation]: 

187 async with db.session_context() as session: 1adc

188 return await automations_models.read_automations_for_workspace( 1adc

189 session=session, 

190 sort=sort, 

191 limit=limit, 

192 offset=offset, 

193 automation_filter=automations, 

194 ) 

195 

196 

197@router.post("/count") 1b

198async def count_automations( 1b

199 db: PrefectDBInterface = Depends(provide_database_interface), 

200) -> int: 

201 async with db.session_context() as session: 1a

202 return await automations_models.count_automations_for_workspace(session=session) 1a

203 

204 

205@router.get("/{id:uuid}") 1b

206async def read_automation( 1b

207 automation_id: UUID = Path(..., alias="id"), 

208 db: PrefectDBInterface = Depends(provide_database_interface), 

209) -> Automation: 

210 async with db.session_context() as session: 1ac

211 automation = await automations_models.read_automation( 1ac

212 session=session, 

213 automation_id=automation_id, 

214 ) 

215 

216 if not automation: 1ac

217 raise ObjectNotFoundError("Automation not found") 1a

218 return automation 1ac

219 

220 

221@router.get("/related-to/{resource_id:str}") 1b

222async def read_automations_related_to_resource( 1b

223 resource_id: str = Path(..., alias="resource_id"), 

224 db: PrefectDBInterface = Depends(provide_database_interface), 

225) -> Sequence[Automation]: 

226 async with db.session_context() as session: 1ac

227 return await automations_models.read_automations_related_to_resource( 1ac

228 session=session, 

229 resource_id=resource_id, 

230 ) 

231 

232 

233@router.delete("/owned-by/{resource_id:str}", status_code=status.HTTP_202_ACCEPTED) 1b

234async def delete_automations_owned_by_resource( 1b

235 resource_id: str = Path(..., alias="resource_id"), 

236 db: PrefectDBInterface = Depends(provide_database_interface), 

237) -> None: 

238 async with db.session_context(begin_transaction=True) as session: 1ac

239 await automations_models.delete_automations_owned_by_resource( 1ac

240 session, 

241 resource_id=resource_id, 

242 automation_filter=AutomationFilter( 

243 created=AutomationFilterCreated(before_=now("UTC")) 

244 ), 

245 )