1 """Migrate artifact data to artifact_collection table
2
3 Revision ID: 2dbcec43c857
4 Revises: 3d46e23593d6
5 Create Date: 2023-04-06 12:26:59.799863
6
7 """
8
9 import sqlalchemy as sa 1 ctx 1a
10 from alembic import op 1 ctx 1a
11
12 # revision identifiers, used by Alembic.
13 revision = "2dbcec43c857" 1 ctx 1a
14 down_revision = "3d46e23593d6" 1 ctx 1a
15 branch_labels = None 1 ctx 1a
16 depends_on = None 1 ctx 1a
17
18
19 def upgrade ( ) : 1 ctx 1a
20 """
21 A data-only migration that populates flow_run_id, task_run_id, type, description, and metadata_ columns
22 for artifact_collection table.
23 """
24 op . execute ( "PRAGMA foreign_keys=OFF" ) 1 ctx 1a
25
26 batch_size = 500 1 ctx 1a
27 offset = 0 1 ctx 1a
28
29 update_artifact_collection_table = """ 1 ctx 1a
30 WITH artifact_collection_cte AS (
31 SELECT * FROM artifact_collection WHERE id = :id
32 )
33 UPDATE artifact_collection
34 SET data = artifact.data,
35 description = artifact.description,
36 flow_run_id = artifact.flow_run_id,
37 task_run_id = artifact.task_run_id,
38 type = artifact.type,
39 metadata_ = artifact.metadata_
40 FROM artifact, artifact_collection_cte
41 WHERE artifact_collection.latest_id = artifact.id
42 AND artifact.id = artifact_collection_cte.latest_id;
43 """
44
45 with op . get_context ( ) . autocommit_block ( ) : 1 ctx 1a
46 conn = op . get_bind ( ) 1 ctx 1a
47 while True : 1 ctx 1a
48 select_artifact_collection_cte = f""" 1 ctx 1a
49 SELECT * from artifact_collection ORDER BY id LIMIT { batch_size } OFFSET { offset } ;
50 """
51
52 # Get the next batch of rows to update
53 selected_artifact_collections = conn . execute ( 1 ctx 1a
54 sa . text ( select_artifact_collection_cte )
55 ) . fetchall ( )
56 if not selected_artifact_collections : 56 ↛ 59 line 56 didn't jump to line 59 because the condition on line 56 was always true 1 ctx 1a
57 break 1 ctx 1a
58
59 for row in selected_artifact_collections :
60 id_to_update = row [ 0 ]
61 conn . execute (
62 sa . text ( update_artifact_collection_table ) , { "id" : id_to_update }
63 )
64 offset += batch_size
65
66 op . execute ( "PRAGMA foreign_keys=ON" ) 1 ctx 1a
67
68
69 def downgrade ( ) : 1 ctx 1a
70 """
71 Data-only migration, no action needed.
72 """