Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2023_04_06_122659_2dbcec43c857_migrate_artifact_data.py: 75%

24 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1"""Migrate artifact data to artifact_collection table 

2 

3Revision ID: 2dbcec43c857 

4Revises: 3d46e23593d6 

5Create Date: 2023-04-06 12:26:59.799863 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12# revision identifiers, used by Alembic. 

13revision = "2dbcec43c857" 1a

14down_revision = "3d46e23593d6" 1a

15branch_labels = None 1a

16depends_on = None 1a

17 

18 

19def upgrade(): 1a

20 """ 

21 A data-only migration that populates flow_run_id, task_run_id, type, description, and metadata_ columns 

22 for artifact_collection table. 

23 """ 

24 op.execute("PRAGMA foreign_keys=OFF") 1a

25 

26 batch_size = 500 1a

27 offset = 0 1a

28 

29 update_artifact_collection_table = """ 1a

30 WITH artifact_collection_cte AS ( 

31 SELECT * FROM artifact_collection WHERE id = :id 

32 ) 

33 UPDATE artifact_collection 

34 SET data = artifact.data, 

35 description = artifact.description, 

36 flow_run_id = artifact.flow_run_id, 

37 task_run_id = artifact.task_run_id, 

38 type = artifact.type, 

39 metadata_ = artifact.metadata_ 

40 FROM artifact, artifact_collection_cte 

41 WHERE artifact_collection.latest_id = artifact.id 

42 AND artifact.id = artifact_collection_cte.latest_id; 

43 """ 

44 

45 with op.get_context().autocommit_block(): 1a

46 conn = op.get_bind() 1a

47 while True: 1a

48 select_artifact_collection_cte = f""" 1a

49 SELECT * from artifact_collection ORDER BY id LIMIT {batch_size} OFFSET {offset}; 

50 """ 

51 

52 # Get the next batch of rows to update 

53 selected_artifact_collections = conn.execute( 1a

54 sa.text(select_artifact_collection_cte) 

55 ).fetchall() 

56 if not selected_artifact_collections: 56 ↛ 59line 56 didn't jump to line 59 because the condition on line 56 was always true1a

57 break 1a

58 

59 for row in selected_artifact_collections: 

60 id_to_update = row[0] 

61 conn.execute( 

62 sa.text(update_artifact_collection_table), {"id": id_to_update} 

63 ) 

64 offset += batch_size 

65 

66 op.execute("PRAGMA foreign_keys=ON") 1a

67 

68 

69def downgrade(): 1a

70 """ 

71 Data-only migration, no action needed. 

72 """