1 """Adds a helper index for the artifact data migration
2
3 Revision ID: f92143d30c25
4 Revises: f92143d30c24
5 Create Date: 2023-01-12 00:00:43.488367
6
7 """
8
9 import sqlalchemy as sa 1 ctx 1a
10 from alembic import op 1 ctx 1a
11
12 # revision identifiers, used by Alembic.
13 revision = "f92143d30c25" 1 ctx 1a
14 down_revision = "f92143d30c24" 1 ctx 1a
15 branch_labels = None 1 ctx 1a
16 depends_on = None 1 ctx 1a
17
18
19 def upgrade ( ) : 1 ctx 1a
20 op . execute ( "PRAGMA foreign_keys=OFF" ) 1 ctx 1a
21
22 with op . batch_alter_table ( "flow_run_state" , schema = None ) as batch_op : 1 ctx 1a
23 batch_op . add_column ( sa . Column ( "has_data" , sa . Boolean ) ) 1 ctx 1a
24 batch_op . create_index ( 1 ctx 1a
25 batch_op . f ( "ix_flow_run_state__has_data" ) ,
26 [ "has_data" ] ,
27 unique = False ,
28 )
29
30 with op . batch_alter_table ( "task_run_state" , schema = None ) as batch_op : 1 ctx 1a
31 batch_op . add_column ( sa . Column ( "has_data" , sa . Boolean ) ) 1 ctx 1a
32 batch_op . create_index ( 1 ctx 1a
33 batch_op . f ( "ix_task_run_state__has_data" ) ,
34 [ "has_data" ] ,
35 unique = False ,
36 )
37
38 def populate_flow_has_data_in_batches ( batch_size ) : 1 ctx 1a
39 return f""" 1 ctx 1a
40 UPDATE flow_run_state
41 SET has_data = (data IS NOT NULL AND data IS NOT 'null')
42 WHERE flow_run_state.id in (SELECT id FROM flow_run_state WHERE (has_data IS NULL) LIMIT { batch_size } );
43 """
44
45 def populate_task_has_data_in_batches ( batch_size ) : 1 ctx 1a
46 return f""" 1 ctx 1a
47 UPDATE task_run_state
48 SET has_data = (data IS NOT NULL AND data IS NOT 'null')
49 WHERE task_run_state.id in (SELECT id FROM task_run_state WHERE (has_data IS NULL) LIMIT { batch_size } );
50 """
51
52 migration_statements = [ 1 ctx 1a
53 populate_flow_has_data_in_batches ,
54 populate_task_has_data_in_batches ,
55 ]
56
57 with op . get_context ( ) . autocommit_block ( ) : 1 ctx 1a
58 conn = op . get_bind ( ) 1 ctx 1a
59 for query in migration_statements : 1 ctx 1a
60 batch_size = 500 1 ctx 1a
61
62 while True : 1 ctx 1a
63 # execute until we've updated task_run_state_id and artifact_data
64 # autocommit mode will commit each time `execute` is called
65 sql_stmt = sa . text ( query ( batch_size ) ) 1 ctx 1a
66 result = conn . execute ( sql_stmt ) 1 ctx 1a
67
68 if result . rowcount < batch_size : 68 ↛ 62 line 68 didn't jump to line 62 because the condition on line 68 was always true 1 ctx 1a
69 break 1 ctx 1a
70
71
72 def downgrade ( ) : 1 ctx 1a
73 op . execute ( "PRAGMA foreign_keys=OFF" )
74
75 with op . batch_alter_table ( "task_run_state" , schema = None ) as batch_op :
76 batch_op . drop_index ( batch_op . f ( "ix_task_run_state__has_data" ) )
77 batch_op . drop_column ( "has_data" )
78
79 with op . batch_alter_table ( "flow_run_state" , schema = None ) as batch_op :
80 batch_op . drop_index ( batch_op . f ( "ix_flow_run_state__has_data" ) )
81 batch_op . drop_column ( "has_data" )