1 """Add state_timestamp
2
3 Revision ID: 22b7cb02e593
4 Revises: e757138e954a
5 Create Date: 2022-10-12 10:20:48.760447
6
7 """
8
9 import sqlalchemy as sa 1 ctx 1a
10 from alembic import op 1 ctx 1a
11
12 import prefect 1 ctx 1a
13
14 # revision identifiers, used by Alembic.
15 revision = "22b7cb02e593" 1 ctx 1a
16 down_revision = "e757138e954a" 1 ctx 1a
17 branch_labels = None 1 ctx 1a
18 depends_on = None 1 ctx 1a
19
20
21 def upgrade ( ) : 1 ctx 1a
22 with op . batch_alter_table ( "flow_run" , schema = None ) as batch_op : 1 ctx 1a
23 batch_op . add_column ( 1 ctx 1a
24 sa . Column (
25 "state_timestamp" ,
26 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
27 nullable = True ,
28 )
29 )
30 batch_op . create_index ( 1 ctx 1a
31 "ix_flow_run__state_timestamp" , [ "state_timestamp" ] , unique = False
32 )
33
34 with op . batch_alter_table ( "task_run" , schema = None ) as batch_op : 1 ctx 1a
35 batch_op . add_column ( 1 ctx 1a
36 sa . Column (
37 "state_timestamp" ,
38 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
39 nullable = True ,
40 )
41 )
42 batch_op . create_index ( 1 ctx 1a
43 "ix_task_run__state_timestamp" , [ "state_timestamp" ] , unique = False
44 )
45
46 update_flow_run_state_timestamp_in_batches = """ 1 ctx 1a
47 UPDATE flow_run
48 SET state_timestamp = (SELECT timestamp from flow_run_state where flow_run.state_id = flow_run_state.id)
49 WHERE flow_run.id in (SELECT id from flow_run where state_timestamp is null and state_id is not null limit 500);
50 """
51
52 update_task_run_state_timestamp_in_batches = """ 1 ctx 1a
53 UPDATE task_run
54 SET state_timestamp = (SELECT timestamp from task_run_state where task_run.state_id = task_run_state.id)
55 WHERE task_run.id in (SELECT id from task_run where state_timestamp is null and state_id is not null limit 500);
56 """
57
58 with op . get_context ( ) . autocommit_block ( ) : 1 ctx 1a
59 conn = op . get_bind ( ) 1 ctx 1a
60 while True : 1 ctx 1a
61 # execute until we've backfilled all flow run state timestamps
62 # autocommit mode will commit each time `execute` is called
63 result = conn . execute ( sa . text ( update_flow_run_state_timestamp_in_batches ) ) 1 ctx 1a
64 if result . rowcount <= 0 : 64 ↛ 60 line 64 didn't jump to line 60 because the condition on line 64 was always true 1 ctx 1a
65 break 1 ctx 1a
66
67 while True : 1 ctx 1a
68 # execute until we've backfilled all task run state timestamps
69 # autocommit mode will commit each time `execute` is called
70 result = conn . execute ( sa . text ( update_task_run_state_timestamp_in_batches ) ) 1 ctx 1a
71 if result . rowcount <= 0 : 71 ↛ 67 line 71 didn't jump to line 67 because the condition on line 71 was always true 1 ctx 1a
72 break 1 ctx 1a
73
74
75 def downgrade ( ) : 1 ctx 1a
76 with op . batch_alter_table ( "task_run" , schema = None ) as batch_op :
77 batch_op . drop_index ( "ix_task_run__state_timestamp" )
78 batch_op . drop_column ( "state_timestamp" )
79
80 with op . batch_alter_table ( "flow_run" , schema = None ) as batch_op :
81 batch_op . drop_index ( "ix_flow_run__state_timestamp" )
82 batch_op . drop_column ( "state_timestamp" )