Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/automations.py: 31%

93 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from typing import Optional, Sequence 1a

2from uuid import UUID 1a

3 

4from fastapi import Body, Depends, HTTPException, Path 1a

5from fastapi.exceptions import RequestValidationError 1a

6from pydantic import ValidationError 1a

7 

8from prefect._internal.compatibility.starlette import status 1a

9from prefect.server.api.dependencies import LimitBody 1a

10from prefect.server.api.validation import ( 1a

11 validate_job_variables_for_run_deployment_action, 

12) 

13from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

14from prefect.server.events import actions 1a

15from prefect.server.events.filters import AutomationFilter, AutomationFilterCreated 1a

16from prefect.server.events.models import automations as automations_models 1a

17from prefect.server.events.schemas.automations import ( 1a

18 Automation, 

19 AutomationCreate, 

20 AutomationPartialUpdate, 

21 AutomationSort, 

22 AutomationUpdate, 

23) 

24from prefect.server.exceptions import ObjectNotFoundError 1a

25from prefect.server.utilities.server import PrefectRouter 1a

26from prefect.types._datetime import now 1a

27from prefect.utilities.schema_tools.validation import ( 1a

28 ValidationError as JSONSchemaValidationError, 

29) 

30 

31router: PrefectRouter = PrefectRouter( 1a

32 prefix="/automations", 

33 tags=["Automations"], 

34 dependencies=[], 

35) 

36 

37 

38@router.post("/", status_code=status.HTTP_201_CREATED) 1a

39async def create_automation( 1a

40 automation: AutomationCreate, 

41 db: PrefectDBInterface = Depends(provide_database_interface), 

42) -> Automation: 

43 """ 

44 Create an automation. 

45 

46 For more information, see https://docs.prefect.io/v3/concepts/automations. 

47 """ 

48 # reset any client-provided IDs on the provided triggers 

49 automation.trigger.reset_ids() 

50 

51 errors = [] 

52 for action in automation.actions: 

53 if ( 

54 isinstance(action, actions.RunDeployment) 

55 and action.deployment_id is not None 

56 and action.job_variables is not None 

57 and action.job_variables != {} 

58 ): 

59 async with db.session_context() as session: 

60 try: 

61 await validate_job_variables_for_run_deployment_action( 

62 session, action 

63 ) 

64 except JSONSchemaValidationError as exc: 

65 errors.append(str(exc)) 

66 

67 if errors: 

68 raise HTTPException( 

69 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

70 detail=f"Error creating automation: {' '.join(errors)}", 

71 ) 

72 

73 automation_dict = automation.model_dump() 

74 owner_resource = automation_dict.pop("owner_resource", None) 

75 

76 async with db.session_context(begin_transaction=True) as session: 

77 created_automation = await automations_models.create_automation( 

78 session=session, 

79 automation=Automation( 

80 **automation_dict, 

81 ), 

82 ) 

83 

84 if owner_resource: 

85 await automations_models.relate_automation_to_resource( 

86 session, 

87 automation_id=created_automation.id, 

88 resource_id=owner_resource, 

89 owned_by_resource=True, 

90 ) 

91 

92 return created_automation 

93 

94 

95@router.put("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

96async def update_automation( 1a

97 automation: AutomationUpdate, 

98 automation_id: UUID = Path(..., alias="id"), 

99 db: PrefectDBInterface = Depends(provide_database_interface), 

100) -> None: 

101 # reset any client-provided IDs on the provided triggers 

102 automation.trigger.reset_ids() 

103 

104 errors = [] 

105 for action in automation.actions: 

106 if ( 

107 isinstance(action, actions.RunDeployment) 

108 and action.deployment_id is not None 

109 and action.job_variables is not None 

110 ): 

111 async with db.session_context() as session: 

112 try: 

113 await validate_job_variables_for_run_deployment_action( 

114 session, action 

115 ) 

116 except JSONSchemaValidationError as exc: 

117 errors.append(str(exc)) 

118 if errors: 

119 raise HTTPException( 

120 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

121 detail=f"Error creating automation: {' '.join(errors)}", 

122 ) 

123 

124 async with db.session_context(begin_transaction=True) as session: 

125 updated = await automations_models.update_automation( 

126 session=session, 

127 automation_update=automation, 

128 automation_id=automation_id, 

129 ) 

130 

131 if not updated: 

132 raise ObjectNotFoundError("Automation not found") 

133 

134 

135@router.patch( 1a

136 "/{id:uuid}", 

137 status_code=status.HTTP_204_NO_CONTENT, 

138) 

139async def patch_automation( 1a

140 automation: AutomationPartialUpdate, 

141 automation_id: UUID = Path(..., alias="id"), 

142 db: PrefectDBInterface = Depends(provide_database_interface), 

143) -> None: 

144 try: 

145 async with db.session_context(begin_transaction=True) as session: 

146 updated = await automations_models.update_automation( 

147 session=session, 

148 automation_update=automation, 

149 automation_id=automation_id, 

150 ) 

151 except ValidationError as e: 

152 raise RequestValidationError( 

153 errors=e.errors(), 

154 body=automation.model_dump(mode="json"), 

155 ) 

156 

157 if not updated: 

158 raise ObjectNotFoundError("Automation not found") 

159 

160 

161@router.delete( 1a

162 "/{id:uuid}", 

163 status_code=status.HTTP_204_NO_CONTENT, 

164) 

165async def delete_automation( 1a

166 automation_id: UUID = Path(..., alias="id"), 

167 db: PrefectDBInterface = Depends(provide_database_interface), 

168) -> None: 

169 async with db.session_context(begin_transaction=True) as session: 

170 deleted = await automations_models.delete_automation( 

171 session=session, 

172 automation_id=automation_id, 

173 ) 

174 

175 if not deleted: 

176 raise ObjectNotFoundError("Automation not found") 

177 

178 

179@router.post("/filter") 1a

180async def read_automations( 1a

181 sort: AutomationSort = Body(AutomationSort.NAME_ASC), 

182 limit: int = LimitBody(), 

183 offset: int = Body(0, ge=0), 

184 automations: Optional[AutomationFilter] = None, 

185 db: PrefectDBInterface = Depends(provide_database_interface), 

186) -> Sequence[Automation]: 

187 async with db.session_context() as session: 

188 return await automations_models.read_automations_for_workspace( 

189 session=session, 

190 sort=sort, 

191 limit=limit, 

192 offset=offset, 

193 automation_filter=automations, 

194 ) 

195 

196 

197@router.post("/count") 1a

198async def count_automations( 1a

199 db: PrefectDBInterface = Depends(provide_database_interface), 

200) -> int: 

201 async with db.session_context() as session: 

202 return await automations_models.count_automations_for_workspace(session=session) 

203 

204 

205@router.get("/{id:uuid}") 1a

206async def read_automation( 1a

207 automation_id: UUID = Path(..., alias="id"), 

208 db: PrefectDBInterface = Depends(provide_database_interface), 

209) -> Automation: 

210 async with db.session_context() as session: 

211 automation = await automations_models.read_automation( 

212 session=session, 

213 automation_id=automation_id, 

214 ) 

215 

216 if not automation: 

217 raise ObjectNotFoundError("Automation not found") 

218 return automation 

219 

220 

221@router.get("/related-to/{resource_id:str}") 1a

222async def read_automations_related_to_resource( 1a

223 resource_id: str = Path(..., alias="resource_id"), 

224 db: PrefectDBInterface = Depends(provide_database_interface), 

225) -> Sequence[Automation]: 

226 async with db.session_context() as session: 

227 return await automations_models.read_automations_related_to_resource( 

228 session=session, 

229 resource_id=resource_id, 

230 ) 

231 

232 

233@router.delete("/owned-by/{resource_id:str}", status_code=status.HTTP_202_ACCEPTED) 1a

234async def delete_automations_owned_by_resource( 1a

235 resource_id: str = Path(..., alias="resource_id"), 

236 db: PrefectDBInterface = Depends(provide_database_interface), 

237) -> None: 

238 async with db.session_context(begin_transaction=True) as session: 

239 await automations_models.delete_automations_owned_by_resource( 

240 session, 

241 resource_id=resource_id, 

242 automation_filter=AutomationFilter( 

243 created=AutomationFilterCreated(before_=now("UTC")) 

244 ), 

245 )