1 """Migrates state data to the artifact table
2
3 Revision ID: f92143d30c26
4 Revises: f92143d30c25
5 Create Date: 2023-01-12 00:00:43.488367
6
7 """
8
9 import sqlalchemy as sa 1 ctx 1a
10 from alembic import op 1 ctx 1a
11
12 # revision identifiers, used by Alembic.
13 revision = "f92143d30c26" 1 ctx 1a
14 down_revision = "f92143d30c25" 1 ctx 1a
15 branch_labels = None 1 ctx 1a
16 depends_on = None 1 ctx 1a
17
18
19 def upgrade ( ) : 1 ctx 1a
20 op . execute ( "PRAGMA foreign_keys=OFF" ) 1 ctx 1a
21
22 ### START DATA MIGRATION
23
24 # insert nontrivial task run state results into the artifact table
25 def update_task_run_artifact_data_in_batches ( batch_size , offset ) : 1 ctx 1a
26 return f""" 1 ctx 1a
27 INSERT INTO artifact (task_run_state_id, task_run_id, data)
28 SELECT id, task_run_id, data
29 FROM task_run_state
30 WHERE has_data IS TRUE
31 ORDER by id
32 LIMIT { batch_size } OFFSET { offset } ;
33 """
34
35 # backpopulate the result artifact id on the task run state table
36 def update_task_run_state_from_artifact_id_in_batches ( batch_size , offset ) : 1 ctx 1a
37 return f""" 1 ctx 1a
38 UPDATE task_run_state
39 SET result_artifact_id = (SELECT id FROM artifact WHERE task_run_state.id = task_run_state_id)
40 WHERE task_run_state.id in (SELECT id FROM task_run_state WHERE (has_data IS TRUE) AND (result_artifact_id IS NULL) LIMIT { batch_size } );
41 """
42
43 # insert nontrivial flow run state results into the artifact table
44 def update_flow_run_artifact_data_in_batches ( batch_size , offset ) : 1 ctx 1a
45 return f""" 1 ctx 1a
46 INSERT INTO artifact (flow_run_state_id, flow_run_id, data)
47 SELECT id, flow_run_id, data
48 FROM flow_run_state
49 WHERE has_data IS TRUE
50 ORDER by id
51 LIMIT { batch_size } OFFSET { offset } ;
52 """
53
54 # backpopulate the result artifact id on the flow run state table
55 def update_flow_run_state_from_artifact_id_in_batches ( batch_size , offset ) : 1 ctx 1a
56 return f""" 1 ctx 1a
57 UPDATE flow_run_state
58 SET result_artifact_id = (SELECT id FROM artifact WHERE flow_run_state.id = flow_run_state_id)
59 WHERE flow_run_state.id in (SELECT id FROM flow_run_state WHERE (has_data IS TRUE) AND (result_artifact_id IS NULL) LIMIT { batch_size } );
60 """
61
62 data_migration_queries = [ 1 ctx 1a
63 update_task_run_artifact_data_in_batches ,
64 update_task_run_state_from_artifact_id_in_batches ,
65 update_flow_run_artifact_data_in_batches ,
66 update_flow_run_state_from_artifact_id_in_batches ,
67 ]
68
69 with op . get_context ( ) . autocommit_block ( ) : 1 ctx 1a
70 conn = op . get_bind ( ) 1 ctx 1a
71 for query in data_migration_queries : 1 ctx 1a
72 batch_size = 500 1 ctx 1a
73 offset = 0 1 ctx 1a
74
75 while True : 1 ctx 1a
76 # execute until we've updated task_run_state_id and artifact_data
77 # autocommit mode will commit each time `execute` is called
78 sql_stmt = sa . text ( query ( batch_size , offset ) ) 1 ctx 1a
79 result = conn . execute ( sql_stmt ) 1 ctx 1a
80
81 if result . rowcount <= 0 : 81 ↛ 84 line 81 didn't jump to line 84 because the condition on line 81 was always true 1 ctx 1a
82 break 1 ctx 1a
83
84 offset += batch_size
85
86 ### END DATA MIGRATION
87
88
89 def downgrade ( ) : 1 ctx 1a
90 def nullify_artifact_ref_from_flow_run_state_in_batches ( batch_size ) :
91 return f"""
92 UPDATE flow_run_state
93 SET result_artifact_id = NULL
94 WHERE flow_run_state.id in (SELECT id FROM flow_run_state WHERE result_artifact_id IS NOT NULL LIMIT { batch_size } );
95 """
96
97 def nullify_artifact_ref_from_task_run_state_in_batches ( batch_size ) :
98 return f"""
99 UPDATE task_run_state
100 SET result_artifact_id = NULL
101 WHERE task_run_state.id in (SELECT id FROM task_run_state WHERE result_artifact_id IS NOT NULL LIMIT { batch_size } );
102 """
103
104 def delete_artifacts_in_batches ( batch_size ) :
105 return f"""
106 DELETE FROM artifact
107 WHERE artifact.id IN (SELECT id FROM artifact LIMIT { batch_size } );
108 """
109
110 data_migration_queries = [
111 delete_artifacts_in_batches ,
112 nullify_artifact_ref_from_flow_run_state_in_batches ,
113 nullify_artifact_ref_from_task_run_state_in_batches ,
114 ]
115
116 with op . get_context ( ) . autocommit_block ( ) :
117 conn = op . get_bind ( )
118 for query in data_migration_queries :
119 batch_size = 500
120
121 while True :
122 # execute until we've updated task_run_state_id and artifact_data
123 # autocommit mode will commit each time `execute` is called
124 sql_stmt = sa . text ( query ( batch_size ) )
125 result = conn . execute ( sql_stmt )
126
127 if result . rowcount <= 0 :
128 break