Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/artifacts.py: 96%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Routes for interacting with artifact objects. 

3""" 

4 

5from typing import List 1c

6from uuid import UUID 1c

7 

8from fastapi import Body, Depends, HTTPException, Path, Response, status 1c

9 

10import prefect.server.api.dependencies as dependencies 1c

11from prefect.server import models 1c

12from prefect.server.database import PrefectDBInterface, provide_database_interface 1c

13from prefect.server.schemas import actions, core, filters, sorting 1c

14from prefect.server.utilities.server import PrefectRouter 1c

15from prefect.types._datetime import now 1c

16 

17router: PrefectRouter = PrefectRouter( 1c

18 prefix="/artifacts", 

19 tags=["Artifacts"], 

20) 

21 

22 

23@router.post("/") 1c

24async def create_artifact( 1c

25 artifact: actions.ArtifactCreate, 

26 response: Response, 

27 db: PrefectDBInterface = Depends(provide_database_interface), 

28) -> core.Artifact: 

29 """ 

30 Create an artifact. 

31 

32 For more information, see https://docs.prefect.io/v3/concepts/artifacts. 

33 """ 

34 artifact = core.Artifact(**artifact.model_dump()) 1adb

35 

36 right_now = now("UTC") 1adb

37 

38 async with db.session_context(begin_transaction=True) as session: 1adb

39 model = await models.artifacts.create_artifact( 1adb

40 session=session, 

41 artifact=artifact, 

42 ) 

43 

44 if model.created >= right_now: 44 ↛ 46line 44 didn't jump to line 46 because the condition on line 44 was always true1adb

45 response.status_code = status.HTTP_201_CREATED 1adb

46 return model 1adb

47 

48 

49@router.get("/{id:uuid}") 1c

50async def read_artifact( 1c

51 artifact_id: UUID = Path( 

52 ..., description="The ID of the artifact to retrieve.", alias="id" 

53 ), 

54 db: PrefectDBInterface = Depends(provide_database_interface), 

55) -> core.Artifact: 

56 """ 

57 Retrieve an artifact from the database. 

58 """ 

59 async with db.session_context() as session: 1ab

60 artifact = await models.artifacts.read_artifact( 1ab

61 session=session, artifact_id=artifact_id 

62 ) 

63 

64 if artifact is None: 1ab

65 raise HTTPException(status_code=404, detail="Artifact not found.") 1ab

66 return artifact 1ab

67 

68 

69@router.get("/{key}/latest") 1c

70async def read_latest_artifact( 1c

71 key: str = Path( 

72 ..., 

73 description="The key of the artifact to retrieve.", 

74 ), 

75 db: PrefectDBInterface = Depends(provide_database_interface), 

76) -> core.Artifact: 

77 """ 

78 Retrieve the latest artifact from the artifact table. 

79 """ 

80 async with db.session_context() as session: 1ab

81 artifact = await models.artifacts.read_latest_artifact(session=session, key=key) 1ab

82 

83 if artifact is None: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true1ab

84 raise HTTPException(status_code=404, detail="Artifact not found.") 1ab

85 return artifact 

86 

87 

88@router.post("/filter") 1c

89async def read_artifacts( 1c

90 sort: sorting.ArtifactSort = Body(sorting.ArtifactSort.ID_DESC), 

91 limit: int = dependencies.LimitBody(), 

92 offset: int = Body(0, ge=0), 

93 artifacts: filters.ArtifactFilter = None, 

94 flow_runs: filters.FlowRunFilter = None, 

95 task_runs: filters.TaskRunFilter = None, 

96 flows: filters.FlowFilter = None, 

97 deployments: filters.DeploymentFilter = None, 

98 db: PrefectDBInterface = Depends(provide_database_interface), 

99) -> List[core.Artifact]: 

100 """ 

101 Retrieve artifacts from the database. 

102 """ 

103 async with db.session_context() as session: 1adeb

104 return await models.artifacts.read_artifacts( 1adeb

105 session=session, 

106 artifact_filter=artifacts, 

107 flow_run_filter=flow_runs, 

108 task_run_filter=task_runs, 

109 flow_filter=flows, 

110 deployment_filter=deployments, 

111 offset=offset, 

112 limit=limit, 

113 sort=sort, 

114 ) 

115 

116 

117@router.post("/latest/filter") 1c

118async def read_latest_artifacts( 1c

119 sort: sorting.ArtifactCollectionSort = Body(sorting.ArtifactCollectionSort.ID_DESC), 

120 limit: int = dependencies.LimitBody(), 

121 offset: int = Body(0, ge=0), 

122 artifacts: filters.ArtifactCollectionFilter = None, 

123 flow_runs: filters.FlowRunFilter = None, 

124 task_runs: filters.TaskRunFilter = None, 

125 flows: filters.FlowFilter = None, 

126 deployments: filters.DeploymentFilter = None, 

127 db: PrefectDBInterface = Depends(provide_database_interface), 

128) -> List[core.ArtifactCollection]: 

129 """ 

130 Retrieve artifacts from the database. 

131 """ 

132 async with db.session_context() as session: 1ab

133 return await models.artifacts.read_latest_artifacts( 1ab

134 session=session, 

135 artifact_filter=artifacts, 

136 flow_run_filter=flow_runs, 

137 task_run_filter=task_runs, 

138 flow_filter=flows, 

139 deployment_filter=deployments, 

140 offset=offset, 

141 limit=limit, 

142 sort=sort, 

143 ) 

144 

145 

146@router.post("/count") 1c

147async def count_artifacts( 1c

148 artifacts: filters.ArtifactFilter = None, 

149 flow_runs: filters.FlowRunFilter = None, 

150 task_runs: filters.TaskRunFilter = None, 

151 flows: filters.FlowFilter = None, 

152 deployments: filters.DeploymentFilter = None, 

153 db: PrefectDBInterface = Depends(provide_database_interface), 

154) -> int: 

155 """ 

156 Count artifacts from the database. 

157 """ 

158 async with db.session_context() as session: 1ab

159 return await models.artifacts.count_artifacts( 1ab

160 session=session, 

161 artifact_filter=artifacts, 

162 flow_run_filter=flow_runs, 

163 task_run_filter=task_runs, 

164 flow_filter=flows, 

165 deployment_filter=deployments, 

166 ) 

167 

168 

169@router.post("/latest/count") 1c

170async def count_latest_artifacts( 1c

171 artifacts: filters.ArtifactCollectionFilter = None, 

172 flow_runs: filters.FlowRunFilter = None, 

173 task_runs: filters.TaskRunFilter = None, 

174 flows: filters.FlowFilter = None, 

175 deployments: filters.DeploymentFilter = None, 

176 db: PrefectDBInterface = Depends(provide_database_interface), 

177) -> int: 

178 """ 

179 Count artifacts from the database. 

180 """ 

181 async with db.session_context() as session: 1ab

182 return await models.artifacts.count_latest_artifacts( 1ab

183 session=session, 

184 artifact_filter=artifacts, 

185 flow_run_filter=flow_runs, 

186 task_run_filter=task_runs, 

187 flow_filter=flows, 

188 deployment_filter=deployments, 

189 ) 

190 

191 

192@router.patch("/{id:uuid}", status_code=204) 1c

193async def update_artifact( 1c

194 artifact: actions.ArtifactUpdate, 

195 artifact_id: UUID = Path( 

196 ..., description="The ID of the artifact to update.", alias="id" 

197 ), 

198 db: PrefectDBInterface = Depends(provide_database_interface), 

199) -> None: 

200 """ 

201 Update an artifact in the database. 

202 """ 

203 async with db.session_context(begin_transaction=True) as session: 1ab

204 result = await models.artifacts.update_artifact( 1ab

205 session=session, 

206 artifact_id=artifact_id, 

207 artifact=artifact, 

208 ) 

209 if not result: 1ab

210 raise HTTPException(status_code=404, detail="Artifact not found.") 1ab

211 

212 

213@router.delete("/{id:uuid}", status_code=204) 1c

214async def delete_artifact( 1c

215 artifact_id: UUID = Path( 

216 ..., description="The ID of the artifact to delete.", alias="id" 

217 ), 

218 db: PrefectDBInterface = Depends(provide_database_interface), 

219) -> None: 

220 """ 

221 Delete an artifact from the database. 

222 """ 

223 async with db.session_context(begin_transaction=True) as session: 1adb

224 result = await models.artifacts.delete_artifact( 1adb

225 session=session, 

226 artifact_id=artifact_id, 

227 ) 

228 if not result: 1adb

229 raise HTTPException(status_code=404, detail="Artifact not found.") 1adb