Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/artifacts.py: 18%

139 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from typing import Any, Optional, Sequence, TypeVar, Union 1a

2from uuid import UUID 1a

3 

4import sqlalchemy as sa 1a

5from sqlalchemy import select 1a

6from sqlalchemy.ext.asyncio import AsyncSession 1a

7from sqlalchemy.sql import Select 1a

8 

9from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

10from prefect.server.schemas import actions, filters, sorting 1a

11from prefect.server.schemas.core import Artifact 1a

12from prefect.types._datetime import DateTime, now 1a

13 

14T = TypeVar("T", bound=tuple[Any, ...]) 1a

15 

16 

17@db_injector 1a

18async def _insert_into_artifact_collection( 1a

19 db: PrefectDBInterface, 

20 session: AsyncSession, 

21 artifact: Artifact, 

22 now: Optional[DateTime] = None, 

23) -> orm_models.ArtifactCollection: 

24 """ 

25 Inserts a new artifact into the artifact_collection table or updates it. 

26 """ 

27 insert_values = artifact.model_dump_for_orm( 

28 exclude_unset=True, exclude={"id", "updated", "created"} 

29 ) 

30 upsert_new_latest_id = ( 

31 db.queries.insert(db.ArtifactCollection) 

32 .values(latest_id=artifact.id, updated=now, created=now, **insert_values) 

33 .on_conflict_do_update( 

34 index_elements=db.orm.artifact_collection_unique_upsert_columns, 

35 set_=dict( 

36 latest_id=artifact.id, 

37 updated=now, 

38 **insert_values, 

39 ), 

40 ) 

41 ) 

42 

43 await session.execute(upsert_new_latest_id) 

44 

45 query = ( 

46 sa.select(db.ArtifactCollection) 

47 .where(sa.and_(db.ArtifactCollection.key == artifact.key)) 

48 .execution_options(populate_existing=True) 

49 ) 

50 

51 result = await session.execute(query) 

52 

53 model = result.scalar() 

54 

55 if model is not None: 

56 if model.latest_id != artifact.id: 

57 raise ValueError( 

58 f"Artifact {artifact.id} was not inserted into the artifact collection" 

59 " table." 

60 ) 

61 if model is None: 

62 raise ValueError( 

63 f"Artifact {artifact.id} was not inserted into the artifact collection" 

64 " table." 

65 ) 

66 

67 return model 

68 

69 

70@db_injector 1a

71async def _insert_into_artifact( 1a

72 db: PrefectDBInterface, 

73 session: AsyncSession, 

74 artifact: Artifact, 

75 now: Optional[DateTime] = None, 

76) -> orm_models.Artifact: 

77 """ 

78 Inserts a new artifact into the artifact table. 

79 """ 

80 artifact_id = artifact.id 

81 insert_stmt = db.queries.insert(db.Artifact).values( 

82 created=now, 

83 updated=now, 

84 **artifact.model_dump_for_orm(exclude={"created", "updated"}), 

85 ) 

86 await session.execute(insert_stmt) 

87 

88 query = ( 

89 sa.select(db.Artifact) 

90 .where(db.Artifact.id == artifact_id) 

91 .limit(1) 

92 .execution_options(populate_existing=True) 

93 ) 

94 

95 result = await session.execute(query) 

96 return result.scalar_one() 

97 

98 

99async def create_artifact( 1a

100 session: AsyncSession, 

101 artifact: Artifact, 

102) -> orm_models.Artifact: 

103 right_now = now("UTC") 

104 

105 if artifact.key is not None: 

106 await _insert_into_artifact_collection( 

107 session=session, now=right_now, artifact=artifact 

108 ) 

109 

110 result = await _insert_into_artifact( 

111 session=session, 

112 now=right_now, 

113 artifact=artifact, 

114 ) 

115 

116 return result 

117 

118 

119@db_injector 1a

120async def read_latest_artifact( 1a

121 db: PrefectDBInterface, 

122 session: AsyncSession, 

123 key: str, 

124) -> Union[orm_models.ArtifactCollection, None]: 

125 """ 

126 Reads the latest artifact by key. 

127 Args: 

128 session: A database session 

129 key: The artifact key 

130 Returns: 

131 Artifact: The latest artifact 

132 """ 

133 latest_artifact_query = sa.select(db.ArtifactCollection).where( 

134 db.ArtifactCollection.key == key 

135 ) 

136 result = await session.execute(latest_artifact_query) 

137 return result.scalar() 

138 

139 

140@db_injector 1a

141async def read_artifact( 1a

142 db: PrefectDBInterface, 

143 session: AsyncSession, 

144 artifact_id: UUID, 

145) -> Union[orm_models.Artifact, None]: 

146 """ 

147 Reads an artifact by id. 

148 """ 

149 

150 query = sa.select(db.Artifact).where(db.Artifact.id == artifact_id) 

151 

152 result = await session.execute(query) 

153 return result.scalar() 

154 

155 

156async def _apply_artifact_filters( 1a

157 db: PrefectDBInterface, 

158 query: Select[T], 

159 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

160 task_run_filter: Optional[filters.TaskRunFilter] = None, 

161 artifact_filter: Optional[filters.ArtifactFilter] = None, 

162 deployment_filter: Optional[filters.DeploymentFilter] = None, 

163 flow_filter: Optional[filters.FlowFilter] = None, 

164) -> Select[T]: 

165 """Applies filters to an artifact query as a combination of EXISTS subqueries.""" 

166 if artifact_filter: 

167 query = query.where(artifact_filter.as_sql_filter()) 

168 

169 if flow_filter or flow_run_filter or deployment_filter: 

170 flow_run_exists_clause = select(db.FlowRun).where( 

171 db.Artifact.flow_run_id == db.FlowRun.id 

172 ) 

173 if flow_run_filter: 

174 flow_run_exists_clause = flow_run_exists_clause.where( 

175 flow_run_filter.as_sql_filter() 

176 ) 

177 

178 if flow_filter: 

179 flow_run_exists_clause = flow_run_exists_clause.join( 

180 db.Flow, db.Flow.id == db.FlowRun.flow_id 

181 ).where(flow_filter.as_sql_filter()) 

182 

183 if deployment_filter: 

184 flow_run_exists_clause = flow_run_exists_clause.join( 

185 db.Deployment, db.Deployment.id == db.FlowRun.deployment_id 

186 ).where(deployment_filter.as_sql_filter()) 

187 

188 query = query.where(flow_run_exists_clause.exists()) 

189 

190 if task_run_filter: 

191 task_run_exists_clause = select(db.TaskRun).where( 

192 db.Artifact.task_run_id == db.TaskRun.id 

193 ) 

194 task_run_exists_clause = task_run_exists_clause.where( 

195 task_run_filter.as_sql_filter() 

196 ) 

197 

198 query = query.where(task_run_exists_clause.exists()) 

199 

200 return query 

201 

202 

203async def _apply_artifact_collection_filters( 1a

204 db: PrefectDBInterface, 

205 query: Select[T], 

206 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

207 task_run_filter: Optional[filters.TaskRunFilter] = None, 

208 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None, 

209 deployment_filter: Optional[filters.DeploymentFilter] = None, 

210 flow_filter: Optional[filters.FlowFilter] = None, 

211) -> Select[T]: 

212 """Applies filters to an artifact collection query as a combination of EXISTS subqueries.""" 

213 if artifact_filter: 

214 query = query.where(artifact_filter.as_sql_filter()) 

215 

216 if flow_filter or flow_run_filter or deployment_filter: 

217 flow_run_exists_clause = select(db.FlowRun).where( 

218 db.ArtifactCollection.flow_run_id == db.FlowRun.id 

219 ) 

220 if flow_run_filter: 

221 flow_run_exists_clause = flow_run_exists_clause.where( 

222 flow_run_filter.as_sql_filter() 

223 ) 

224 

225 if flow_filter: 

226 flow_run_exists_clause = flow_run_exists_clause.join( 

227 db.Flow, db.Flow.id == db.FlowRun.flow_id 

228 ).where(flow_filter.as_sql_filter()) 

229 

230 if deployment_filter: 

231 flow_run_exists_clause = flow_run_exists_clause.join( 

232 db.Deployment, db.Deployment.id == db.FlowRun.deployment_id 

233 ).where(deployment_filter.as_sql_filter()) 

234 

235 query = query.where(flow_run_exists_clause.exists()) 

236 

237 if task_run_filter: 

238 task_run_exists_clause = select(db.TaskRun).where( 

239 db.ArtifactCollection.task_run_id == db.TaskRun.id 

240 ) 

241 task_run_exists_clause = task_run_exists_clause.where( 

242 task_run_filter.as_sql_filter() 

243 ) 

244 

245 query = query.where(task_run_exists_clause.exists()) 

246 

247 return query 

248 

249 

250@db_injector 1a

251async def read_artifacts( 1a

252 db: PrefectDBInterface, 

253 session: AsyncSession, 

254 offset: Optional[int] = None, 

255 limit: Optional[int] = None, 

256 artifact_filter: Optional[filters.ArtifactFilter] = None, 

257 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

258 task_run_filter: Optional[filters.TaskRunFilter] = None, 

259 deployment_filter: Optional[filters.DeploymentFilter] = None, 

260 flow_filter: Optional[filters.FlowFilter] = None, 

261 sort: sorting.ArtifactSort = sorting.ArtifactSort.ID_DESC, 

262) -> Sequence[orm_models.Artifact]: 

263 """ 

264 Reads artifacts. 

265 

266 Args: 

267 session: A database session 

268 offset: Query offset 

269 limit: Query limit 

270 artifact_filter: Only select artifacts matching this filter 

271 flow_run_filter: Only select artifacts whose flow runs matching this filter 

272 task_run_filter: Only select artifacts whose task runs matching this filter 

273 deployment_filter: Only select artifacts whose flow runs belong to deployments matching this filter 

274 flow_filter: Only select artifacts whose flow runs belong to flows matching this filter 

275 work_pool_filter: Only select artifacts whose flow runs belong to work pools matching this filter 

276 """ 

277 query = sa.select(db.Artifact).order_by(*sort.as_sql_sort()) 

278 

279 query = await _apply_artifact_filters( 

280 db, 

281 query, 

282 artifact_filter=artifact_filter, 

283 flow_run_filter=flow_run_filter, 

284 task_run_filter=task_run_filter, 

285 deployment_filter=deployment_filter, 

286 flow_filter=flow_filter, 

287 ) 

288 

289 if offset is not None: 

290 query = query.offset(offset) 

291 if limit is not None: 

292 query = query.limit(limit) 

293 

294 result = await session.execute(query) 

295 return result.scalars().unique().all() 

296 

297 

298@db_injector 1a

299async def read_latest_artifacts( 1a

300 db: PrefectDBInterface, 

301 session: AsyncSession, 

302 offset: Optional[int] = None, 

303 limit: Optional[int] = None, 

304 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None, 

305 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

306 task_run_filter: Optional[filters.TaskRunFilter] = None, 

307 deployment_filter: Optional[filters.DeploymentFilter] = None, 

308 flow_filter: Optional[filters.FlowFilter] = None, 

309 sort: sorting.ArtifactCollectionSort = sorting.ArtifactCollectionSort.ID_DESC, 

310) -> Sequence[orm_models.ArtifactCollection]: 

311 """ 

312 Reads artifacts. 

313 

314 Args: 

315 session: A database session 

316 offset: Query offset 

317 limit: Query limit 

318 artifact_filter: Only select artifacts matching this filter 

319 flow_run_filter: Only select artifacts whose flow runs matching this filter 

320 task_run_filter: Only select artifacts whose task runs matching this filter 

321 deployment_filter: Only select artifacts whose flow runs belong to deployments matching this filter 

322 flow_filter: Only select artifacts whose flow runs belong to flows matching this filter 

323 work_pool_filter: Only select artifacts whose flow runs belong to work pools matching this filter 

324 """ 

325 query = sa.select(db.ArtifactCollection).order_by(*sort.as_sql_sort()) 

326 query = await _apply_artifact_collection_filters( 

327 db, 

328 query, 

329 artifact_filter=artifact_filter, 

330 flow_run_filter=flow_run_filter, 

331 task_run_filter=task_run_filter, 

332 deployment_filter=deployment_filter, 

333 flow_filter=flow_filter, 

334 ) 

335 

336 if offset is not None: 

337 query = query.offset(offset) 

338 if limit is not None: 

339 query = query.limit(limit) 

340 

341 result = await session.execute(query) 

342 return result.scalars().unique().all() 

343 

344 

345@db_injector 1a

346async def count_artifacts( 1a

347 db: PrefectDBInterface, 

348 session: AsyncSession, 

349 artifact_filter: Optional[filters.ArtifactFilter] = None, 

350 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

351 task_run_filter: Optional[filters.TaskRunFilter] = None, 

352 deployment_filter: Optional[filters.DeploymentFilter] = None, 

353 flow_filter: Optional[filters.FlowFilter] = None, 

354) -> int: 

355 """ 

356 Counts artifacts. 

357 Args: 

358 session: A database session 

359 artifact_filter: Only select artifacts matching this filter 

360 flow_run_filter: Only select artifacts whose flow runs matching this filter 

361 task_run_filter: Only select artifacts whose task runs matching this filter 

362 """ 

363 query = sa.select(sa.func.count(db.Artifact.id)) 

364 

365 query = await _apply_artifact_filters( 

366 db, 

367 query, 

368 artifact_filter=artifact_filter, 

369 flow_run_filter=flow_run_filter, 

370 task_run_filter=task_run_filter, 

371 deployment_filter=deployment_filter, 

372 flow_filter=flow_filter, 

373 ) 

374 

375 result = await session.execute(query) 

376 return result.scalar_one() 

377 

378 

379@db_injector 1a

380async def count_latest_artifacts( 1a

381 db: PrefectDBInterface, 

382 session: AsyncSession, 

383 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None, 

384 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

385 task_run_filter: Optional[filters.TaskRunFilter] = None, 

386 deployment_filter: Optional[filters.DeploymentFilter] = None, 

387 flow_filter: Optional[filters.FlowFilter] = None, 

388) -> int: 

389 """ 

390 Counts artifacts. 

391 Args: 

392 session: A database session 

393 artifact_filter: Only select artifacts matching this filter 

394 flow_run_filter: Only select artifacts whose flow runs matching this filter 

395 task_run_filter: Only select artifacts whose task runs matching this filter 

396 """ 

397 query = sa.select(sa.func.count(db.ArtifactCollection.id)) 

398 

399 query = await _apply_artifact_collection_filters( 

400 db, 

401 query, 

402 artifact_filter=artifact_filter, 

403 flow_run_filter=flow_run_filter, 

404 task_run_filter=task_run_filter, 

405 deployment_filter=deployment_filter, 

406 flow_filter=flow_filter, 

407 ) 

408 

409 result = await session.execute(query) 

410 return result.scalar_one() 

411 

412 

413@db_injector 1a

414async def update_artifact( 1a

415 db: PrefectDBInterface, 

416 session: AsyncSession, 

417 artifact_id: UUID, 

418 artifact: actions.ArtifactUpdate, 

419) -> bool: 

420 """ 

421 Updates an artifact by id. 

422 

423 Args: 

424 session: A database session 

425 artifact_id (UUID): The artifact id to update 

426 artifact: An artifact model 

427 

428 Returns: 

429 bool: True if the update was successful, False otherwise 

430 """ 

431 update_artifact_data = artifact.model_dump_for_orm(exclude_unset=True) 

432 

433 update_artifact_stmt = ( 

434 sa.update(db.Artifact) 

435 .where(db.Artifact.id == artifact_id) 

436 .values(**update_artifact_data) 

437 ) 

438 

439 artifact_result = await session.execute(update_artifact_stmt) 

440 

441 update_artifact_collection_data = artifact.model_dump_for_orm(exclude_unset=True) 

442 update_artifact_collection_stmt = ( 

443 sa.update(db.ArtifactCollection) 

444 .where(db.ArtifactCollection.latest_id == artifact_id) 

445 .values(**update_artifact_collection_data) 

446 ) 

447 collection_result = await session.execute(update_artifact_collection_stmt) 

448 

449 return artifact_result.rowcount + collection_result.rowcount > 0 

450 

451 

452@db_injector 1a

453async def delete_artifact( 1a

454 db: PrefectDBInterface, session: AsyncSession, artifact_id: UUID 

455) -> bool: 

456 """ 

457 Deletes an artifact by id. 

458 

459 The ArtifactCollection table is used to track the latest version of an artifact 

460 by key. If we are deleting the latest version of an artifact from the Artifact 

461 table, we need to first update the latest version referenced in ArtifactCollection 

462 so that it points to the next latest version of the artifact. 

463 

464 Example: 

465 If we have the following artifacts in Artifact: 

466 - key: "foo", id: 1, created: 2020-01-01 

467 - key: "foo", id: 2, created: 2020-01-02 

468 - key: "foo", id: 3, created: 2020-01-03 

469 

470 the ArtifactCollection table has the following entry: 

471 - key: "foo", latest_id: 3 

472 

473 If we delete the artifact with id 3, we need to update the latest version of the 

474 artifact with key "foo" to be the artifact with id 2. 

475 

476 Args: 

477 session: A database session 

478 artifact_id (UUID): The artifact id to delete 

479 

480 Returns: 

481 bool: True if the delete was successful, False otherwise 

482 """ 

483 artifact = await session.get(db.Artifact, artifact_id) 

484 if artifact is None: 

485 return False 

486 

487 is_latest_version = ( 

488 await session.execute( 

489 sa.select(db.ArtifactCollection) 

490 .where(db.ArtifactCollection.key == artifact.key) 

491 .where(db.ArtifactCollection.latest_id == artifact_id) 

492 ) 

493 ).scalar_one_or_none() is not None 

494 

495 if is_latest_version: 

496 next_latest_version = ( 

497 await session.execute( 

498 sa.select(db.Artifact) 

499 .where(db.Artifact.key == artifact.key) 

500 .where(db.Artifact.id != artifact_id) 

501 .order_by(db.Artifact.created.desc()) 

502 .limit(1) 

503 ) 

504 ).scalar_one_or_none() 

505 

506 if next_latest_version is not None: 

507 set_next_latest_version = ( 

508 sa.update(db.ArtifactCollection) 

509 .where(db.ArtifactCollection.key == artifact.key) 

510 .values( 

511 latest_id=next_latest_version.id, 

512 data=next_latest_version.data, 

513 description=next_latest_version.description, 

514 type=next_latest_version.type, 

515 created=next_latest_version.created, 

516 updated=next_latest_version.updated, 

517 flow_run_id=next_latest_version.flow_run_id, 

518 task_run_id=next_latest_version.task_run_id, 

519 metadata_=next_latest_version.metadata_, 

520 ) 

521 ) 

522 await session.execute(set_next_latest_version) 

523 

524 else: 

525 await session.execute( 

526 sa.delete(db.ArtifactCollection) 

527 .where(db.ArtifactCollection.key == artifact.key) 

528 .where(db.ArtifactCollection.latest_id == artifact_id) 

529 ) 

530 

531 delete_stmt = sa.delete(db.Artifact).where(db.Artifact.id == artifact_id) 

532 

533 result = await session.execute(delete_stmt) 

534 return result.rowcount > 0