Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/artifacts.py: 75%

139 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from typing import Any, Optional, Sequence, TypeVar, Union 1c

2from uuid import UUID 1c

3 

4import sqlalchemy as sa 1c

5from sqlalchemy import select 1c

6from sqlalchemy.ext.asyncio import AsyncSession 1c

7from sqlalchemy.sql import Select 1c

8 

9from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1c

10from prefect.server.schemas import actions, filters, sorting 1c

11from prefect.server.schemas.core import Artifact 1c

12from prefect.types._datetime import DateTime, now 1c

13 

14T = TypeVar("T", bound=tuple[Any, ...]) 1c

15 

16 

17@db_injector 1c

18async def _insert_into_artifact_collection( 1c

19 db: PrefectDBInterface, 

20 session: AsyncSession, 

21 artifact: Artifact, 

22 now: Optional[DateTime] = None, 

23) -> orm_models.ArtifactCollection: 

24 """ 

25 Inserts a new artifact into the artifact_collection table or updates it. 

26 """ 

27 insert_values = artifact.model_dump_for_orm( 1ba

28 exclude_unset=True, exclude={"id", "updated", "created"} 

29 ) 

30 upsert_new_latest_id = ( 1ba

31 db.queries.insert(db.ArtifactCollection) 

32 .values(latest_id=artifact.id, updated=now, created=now, **insert_values) 

33 .on_conflict_do_update( 

34 index_elements=db.orm.artifact_collection_unique_upsert_columns, 

35 set_=dict( 

36 latest_id=artifact.id, 

37 updated=now, 

38 **insert_values, 

39 ), 

40 ) 

41 ) 

42 

43 await session.execute(upsert_new_latest_id) 1ba

44 

45 query = ( 

46 sa.select(db.ArtifactCollection) 

47 .where(sa.and_(db.ArtifactCollection.key == artifact.key)) 

48 .execution_options(populate_existing=True) 

49 ) 

50 

51 result = await session.execute(query) 1ba

52 

53 model = result.scalar() 

54 

55 if model is not None: 

56 if model.latest_id != artifact.id: 

57 raise ValueError( 

58 f"Artifact {artifact.id} was not inserted into the artifact collection" 

59 " table." 

60 ) 

61 if model is None: 

62 raise ValueError( 

63 f"Artifact {artifact.id} was not inserted into the artifact collection" 

64 " table." 

65 ) 

66 

67 return model 

68 

69 

70@db_injector 1c

71async def _insert_into_artifact( 1c

72 db: PrefectDBInterface, 

73 session: AsyncSession, 

74 artifact: Artifact, 

75 now: Optional[DateTime] = None, 

76) -> orm_models.Artifact: 

77 """ 

78 Inserts a new artifact into the artifact table. 

79 """ 

80 artifact_id = artifact.id 1bda

81 insert_stmt = db.queries.insert(db.Artifact).values( 1bda

82 created=now, 

83 updated=now, 

84 **artifact.model_dump_for_orm(exclude={"created", "updated"}), 

85 ) 

86 await session.execute(insert_stmt) 1bda

87 

88 query = ( 

89 sa.select(db.Artifact) 

90 .where(db.Artifact.id == artifact_id) 

91 .limit(1) 

92 .execution_options(populate_existing=True) 

93 ) 

94 

95 result = await session.execute(query) 1bda

96 return result.scalar_one() 

97 

98 

99async def create_artifact( 1c

100 session: AsyncSession, 

101 artifact: Artifact, 

102) -> orm_models.Artifact: 

103 right_now = now("UTC") 1bda

104 

105 if artifact.key is not None: 1bda

106 await _insert_into_artifact_collection( 1ba

107 session=session, now=right_now, artifact=artifact 

108 ) 

109 

110 result = await _insert_into_artifact( 1bda

111 session=session, 

112 now=right_now, 

113 artifact=artifact, 

114 ) 

115 

116 return result 

117 

118 

119@db_injector 1c

120async def read_latest_artifact( 1c

121 db: PrefectDBInterface, 

122 session: AsyncSession, 

123 key: str, 

124) -> Union[orm_models.ArtifactCollection, None]: 

125 """ 

126 Reads the latest artifact by key. 

127 Args: 

128 session: A database session 

129 key: The artifact key 

130 Returns: 

131 Artifact: The latest artifact 

132 """ 

133 latest_artifact_query = sa.select(db.ArtifactCollection).where( 1ba

134 db.ArtifactCollection.key == key 

135 ) 

136 result = await session.execute(latest_artifact_query) 1ba

137 return result.scalar() 

138 

139 

140@db_injector 1c

141async def read_artifact( 1c

142 db: PrefectDBInterface, 

143 session: AsyncSession, 

144 artifact_id: UUID, 

145) -> Union[orm_models.Artifact, None]: 

146 """ 

147 Reads an artifact by id. 

148 """ 

149 

150 query = sa.select(db.Artifact).where(db.Artifact.id == artifact_id) 1ba

151 

152 result = await session.execute(query) 1ba

153 return result.scalar() 

154 

155 

156async def _apply_artifact_filters( 1c

157 db: PrefectDBInterface, 

158 query: Select[T], 

159 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

160 task_run_filter: Optional[filters.TaskRunFilter] = None, 

161 artifact_filter: Optional[filters.ArtifactFilter] = None, 

162 deployment_filter: Optional[filters.DeploymentFilter] = None, 

163 flow_filter: Optional[filters.FlowFilter] = None, 

164) -> Select[T]: 

165 """Applies filters to an artifact query as a combination of EXISTS subqueries.""" 

166 if artifact_filter: 1bdea

167 query = query.where(artifact_filter.as_sql_filter()) 1bdea

168 

169 if flow_filter or flow_run_filter or deployment_filter: 1bdea

170 flow_run_exists_clause = select(db.FlowRun).where( 1ba

171 db.Artifact.flow_run_id == db.FlowRun.id 

172 ) 

173 if flow_run_filter: 1ba

174 flow_run_exists_clause = flow_run_exists_clause.where( 1ba

175 flow_run_filter.as_sql_filter() 

176 ) 

177 

178 if flow_filter: 1ba

179 flow_run_exists_clause = flow_run_exists_clause.join( 1ba

180 db.Flow, db.Flow.id == db.FlowRun.flow_id 

181 ).where(flow_filter.as_sql_filter()) 

182 

183 if deployment_filter: 1ba

184 flow_run_exists_clause = flow_run_exists_clause.join( 1ba

185 db.Deployment, db.Deployment.id == db.FlowRun.deployment_id 

186 ).where(deployment_filter.as_sql_filter()) 

187 

188 query = query.where(flow_run_exists_clause.exists()) 1ba

189 

190 if task_run_filter: 1bdea

191 task_run_exists_clause = select(db.TaskRun).where( 1ba

192 db.Artifact.task_run_id == db.TaskRun.id 

193 ) 

194 task_run_exists_clause = task_run_exists_clause.where( 1ba

195 task_run_filter.as_sql_filter() 

196 ) 

197 

198 query = query.where(task_run_exists_clause.exists()) 1ba

199 

200 return query 1bdea

201 

202 

203async def _apply_artifact_collection_filters( 1c

204 db: PrefectDBInterface, 

205 query: Select[T], 

206 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

207 task_run_filter: Optional[filters.TaskRunFilter] = None, 

208 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None, 

209 deployment_filter: Optional[filters.DeploymentFilter] = None, 

210 flow_filter: Optional[filters.FlowFilter] = None, 

211) -> Select[T]: 

212 """Applies filters to an artifact collection query as a combination of EXISTS subqueries.""" 

213 if artifact_filter: 1ba

214 query = query.where(artifact_filter.as_sql_filter()) 

215 

216 if flow_filter or flow_run_filter or deployment_filter: 1ba

217 flow_run_exists_clause = select(db.FlowRun).where( 1ba

218 db.ArtifactCollection.flow_run_id == db.FlowRun.id 

219 ) 

220 if flow_run_filter: 220 ↛ 225line 220 didn't jump to line 225 because the condition on line 220 was always true1ba

221 flow_run_exists_clause = flow_run_exists_clause.where( 1ba

222 flow_run_filter.as_sql_filter() 

223 ) 

224 

225 if flow_filter: 1ba

226 flow_run_exists_clause = flow_run_exists_clause.join( 

227 db.Flow, db.Flow.id == db.FlowRun.flow_id 

228 ).where(flow_filter.as_sql_filter()) 

229 

230 if deployment_filter: 1ba

231 flow_run_exists_clause = flow_run_exists_clause.join( 

232 db.Deployment, db.Deployment.id == db.FlowRun.deployment_id 

233 ).where(deployment_filter.as_sql_filter()) 

234 

235 query = query.where(flow_run_exists_clause.exists()) 1ba

236 

237 if task_run_filter: 1ba

238 task_run_exists_clause = select(db.TaskRun).where( 

239 db.ArtifactCollection.task_run_id == db.TaskRun.id 

240 ) 

241 task_run_exists_clause = task_run_exists_clause.where( 

242 task_run_filter.as_sql_filter() 

243 ) 

244 

245 query = query.where(task_run_exists_clause.exists()) 

246 

247 return query 1ba

248 

249 

250@db_injector 1c

251async def read_artifacts( 1c

252 db: PrefectDBInterface, 

253 session: AsyncSession, 

254 offset: Optional[int] = None, 

255 limit: Optional[int] = None, 

256 artifact_filter: Optional[filters.ArtifactFilter] = None, 

257 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

258 task_run_filter: Optional[filters.TaskRunFilter] = None, 

259 deployment_filter: Optional[filters.DeploymentFilter] = None, 

260 flow_filter: Optional[filters.FlowFilter] = None, 

261 sort: sorting.ArtifactSort = sorting.ArtifactSort.ID_DESC, 

262) -> Sequence[orm_models.Artifact]: 

263 """ 

264 Reads artifacts. 

265 

266 Args: 

267 session: A database session 

268 offset: Query offset 

269 limit: Query limit 

270 artifact_filter: Only select artifacts matching this filter 

271 flow_run_filter: Only select artifacts whose flow runs matching this filter 

272 task_run_filter: Only select artifacts whose task runs matching this filter 

273 deployment_filter: Only select artifacts whose flow runs belong to deployments matching this filter 

274 flow_filter: Only select artifacts whose flow runs belong to flows matching this filter 

275 work_pool_filter: Only select artifacts whose flow runs belong to work pools matching this filter 

276 """ 

277 query = sa.select(db.Artifact).order_by(*sort.as_sql_sort()) 1bdea

278 

279 query = await _apply_artifact_filters( 1bdea

280 db, 

281 query, 

282 artifact_filter=artifact_filter, 

283 flow_run_filter=flow_run_filter, 

284 task_run_filter=task_run_filter, 

285 deployment_filter=deployment_filter, 

286 flow_filter=flow_filter, 

287 ) 

288 

289 if offset is not None: 289 ↛ 291line 289 didn't jump to line 291 because the condition on line 289 was always true1bdea

290 query = query.offset(offset) 1bdea

291 if limit is not None: 291 ↛ 294line 291 didn't jump to line 294 because the condition on line 291 was always true1bdea

292 query = query.limit(limit) 1bdea

293 

294 result = await session.execute(query) 1bdea

295 return result.scalars().unique().all() 

296 

297 

298@db_injector 1c

299async def read_latest_artifacts( 1c

300 db: PrefectDBInterface, 

301 session: AsyncSession, 

302 offset: Optional[int] = None, 

303 limit: Optional[int] = None, 

304 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None, 

305 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

306 task_run_filter: Optional[filters.TaskRunFilter] = None, 

307 deployment_filter: Optional[filters.DeploymentFilter] = None, 

308 flow_filter: Optional[filters.FlowFilter] = None, 

309 sort: sorting.ArtifactCollectionSort = sorting.ArtifactCollectionSort.ID_DESC, 

310) -> Sequence[orm_models.ArtifactCollection]: 

311 """ 

312 Reads artifacts. 

313 

314 Args: 

315 session: A database session 

316 offset: Query offset 

317 limit: Query limit 

318 artifact_filter: Only select artifacts matching this filter 

319 flow_run_filter: Only select artifacts whose flow runs matching this filter 

320 task_run_filter: Only select artifacts whose task runs matching this filter 

321 deployment_filter: Only select artifacts whose flow runs belong to deployments matching this filter 

322 flow_filter: Only select artifacts whose flow runs belong to flows matching this filter 

323 work_pool_filter: Only select artifacts whose flow runs belong to work pools matching this filter 

324 """ 

325 query = sa.select(db.ArtifactCollection).order_by(*sort.as_sql_sort()) 1ba

326 query = await _apply_artifact_collection_filters( 1ba

327 db, 

328 query, 

329 artifact_filter=artifact_filter, 

330 flow_run_filter=flow_run_filter, 

331 task_run_filter=task_run_filter, 

332 deployment_filter=deployment_filter, 

333 flow_filter=flow_filter, 

334 ) 

335 

336 if offset is not None: 336 ↛ 338line 336 didn't jump to line 338 because the condition on line 336 was always true1ba

337 query = query.offset(offset) 1ba

338 if limit is not None: 338 ↛ 341line 338 didn't jump to line 341 because the condition on line 338 was always true1ba

339 query = query.limit(limit) 1ba

340 

341 result = await session.execute(query) 1ba

342 return result.scalars().unique().all() 

343 

344 

345@db_injector 1c

346async def count_artifacts( 1c

347 db: PrefectDBInterface, 

348 session: AsyncSession, 

349 artifact_filter: Optional[filters.ArtifactFilter] = None, 

350 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

351 task_run_filter: Optional[filters.TaskRunFilter] = None, 

352 deployment_filter: Optional[filters.DeploymentFilter] = None, 

353 flow_filter: Optional[filters.FlowFilter] = None, 

354) -> int: 

355 """ 

356 Counts artifacts. 

357 Args: 

358 session: A database session 

359 artifact_filter: Only select artifacts matching this filter 

360 flow_run_filter: Only select artifacts whose flow runs matching this filter 

361 task_run_filter: Only select artifacts whose task runs matching this filter 

362 """ 

363 query = sa.select(sa.func.count(db.Artifact.id)) 1ba

364 

365 query = await _apply_artifact_filters( 1ba

366 db, 

367 query, 

368 artifact_filter=artifact_filter, 

369 flow_run_filter=flow_run_filter, 

370 task_run_filter=task_run_filter, 

371 deployment_filter=deployment_filter, 

372 flow_filter=flow_filter, 

373 ) 

374 

375 result = await session.execute(query) 1ba

376 return result.scalar_one() 

377 

378 

379@db_injector 1c

380async def count_latest_artifacts( 1c

381 db: PrefectDBInterface, 

382 session: AsyncSession, 

383 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None, 

384 flow_run_filter: Optional[filters.FlowRunFilter] = None, 

385 task_run_filter: Optional[filters.TaskRunFilter] = None, 

386 deployment_filter: Optional[filters.DeploymentFilter] = None, 

387 flow_filter: Optional[filters.FlowFilter] = None, 

388) -> int: 

389 """ 

390 Counts artifacts. 

391 Args: 

392 session: A database session 

393 artifact_filter: Only select artifacts matching this filter 

394 flow_run_filter: Only select artifacts whose flow runs matching this filter 

395 task_run_filter: Only select artifacts whose task runs matching this filter 

396 """ 

397 query = sa.select(sa.func.count(db.ArtifactCollection.id)) 1ba

398 

399 query = await _apply_artifact_collection_filters( 1ba

400 db, 

401 query, 

402 artifact_filter=artifact_filter, 

403 flow_run_filter=flow_run_filter, 

404 task_run_filter=task_run_filter, 

405 deployment_filter=deployment_filter, 

406 flow_filter=flow_filter, 

407 ) 

408 

409 result = await session.execute(query) 1ba

410 return result.scalar_one() 

411 

412 

413@db_injector 1c

414async def update_artifact( 1c

415 db: PrefectDBInterface, 

416 session: AsyncSession, 

417 artifact_id: UUID, 

418 artifact: actions.ArtifactUpdate, 

419) -> bool: 

420 """ 

421 Updates an artifact by id. 

422 

423 Args: 

424 session: A database session 

425 artifact_id (UUID): The artifact id to update 

426 artifact: An artifact model 

427 

428 Returns: 

429 bool: True if the update was successful, False otherwise 

430 """ 

431 update_artifact_data = artifact.model_dump_for_orm(exclude_unset=True) 1ba

432 

433 update_artifact_stmt = ( 1ba

434 sa.update(db.Artifact) 

435 .where(db.Artifact.id == artifact_id) 

436 .values(**update_artifact_data) 

437 ) 

438 

439 artifact_result = await session.execute(update_artifact_stmt) 1ba

440 

441 update_artifact_collection_data = artifact.model_dump_for_orm(exclude_unset=True) 

442 update_artifact_collection_stmt = ( 

443 sa.update(db.ArtifactCollection) 

444 .where(db.ArtifactCollection.latest_id == artifact_id) 

445 .values(**update_artifact_collection_data) 

446 ) 

447 collection_result = await session.execute(update_artifact_collection_stmt) 1ba

448 

449 return artifact_result.rowcount + collection_result.rowcount > 0 

450 

451 

452@db_injector 1c

453async def delete_artifact( 1c

454 db: PrefectDBInterface, session: AsyncSession, artifact_id: UUID 

455) -> bool: 

456 """ 

457 Deletes an artifact by id. 

458 

459 The ArtifactCollection table is used to track the latest version of an artifact 

460 by key. If we are deleting the latest version of an artifact from the Artifact 

461 table, we need to first update the latest version referenced in ArtifactCollection 

462 so that it points to the next latest version of the artifact. 

463 

464 Example: 

465 If we have the following artifacts in Artifact: 

466 - key: "foo", id: 1, created: 2020-01-01 

467 - key: "foo", id: 2, created: 2020-01-02 

468 - key: "foo", id: 3, created: 2020-01-03 

469 

470 the ArtifactCollection table has the following entry: 

471 - key: "foo", latest_id: 3 

472 

473 If we delete the artifact with id 3, we need to update the latest version of the 

474 artifact with key "foo" to be the artifact with id 2. 

475 

476 Args: 

477 session: A database session 

478 artifact_id (UUID): The artifact id to delete 

479 

480 Returns: 

481 bool: True if the delete was successful, False otherwise 

482 """ 

483 artifact = await session.get(db.Artifact, artifact_id) 1bda

484 if artifact is None: 

485 return False 

486 

487 is_latest_version = ( 

488 await session.execute( 

489 sa.select(db.ArtifactCollection) 

490 .where(db.ArtifactCollection.key == artifact.key) 

491 .where(db.ArtifactCollection.latest_id == artifact_id) 

492 ) 

493 ).scalar_one_or_none() is not None 

494 

495 if is_latest_version: 

496 next_latest_version = ( 

497 await session.execute( 

498 sa.select(db.Artifact) 

499 .where(db.Artifact.key == artifact.key) 

500 .where(db.Artifact.id != artifact_id) 

501 .order_by(db.Artifact.created.desc()) 

502 .limit(1) 

503 ) 

504 ).scalar_one_or_none() 

505 

506 if next_latest_version is not None: 

507 set_next_latest_version = ( 

508 sa.update(db.ArtifactCollection) 

509 .where(db.ArtifactCollection.key == artifact.key) 

510 .values( 

511 latest_id=next_latest_version.id, 

512 data=next_latest_version.data, 

513 description=next_latest_version.description, 

514 type=next_latest_version.type, 

515 created=next_latest_version.created, 

516 updated=next_latest_version.updated, 

517 flow_run_id=next_latest_version.flow_run_id, 

518 task_run_id=next_latest_version.task_run_id, 

519 metadata_=next_latest_version.metadata_, 

520 ) 

521 ) 

522 await session.execute(set_next_latest_version) 

523 

524 else: 

525 await session.execute( 

526 sa.delete(db.ArtifactCollection) 

527 .where(db.ArtifactCollection.key == artifact.key) 

528 .where(db.ArtifactCollection.latest_id == artifact_id) 

529 ) 

530 

531 delete_stmt = sa.delete(db.Artifact).where(db.Artifact.id == artifact_id) 

532 

533 result = await session.execute(delete_stmt) 1da

534 return result.rowcount > 0