Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2022_10_12_102048_22b7cb02e593_add_state_timestamp.py: 78%

33 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1"""Add state_timestamp 

2 

3Revision ID: 22b7cb02e593 

4Revises: e757138e954a 

5Create Date: 2022-10-12 10:20:48.760447 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12import prefect 1a

13 

14# revision identifiers, used by Alembic. 

15revision = "22b7cb02e593" 1a

16down_revision = "e757138e954a" 1a

17branch_labels = None 1a

18depends_on = None 1a

19 

20 

21def upgrade(): 1a

22 with op.batch_alter_table("flow_run", schema=None) as batch_op: 1a

23 batch_op.add_column( 1a

24 sa.Column( 

25 "state_timestamp", 

26 prefect.server.utilities.database.Timestamp(timezone=True), 

27 nullable=True, 

28 ) 

29 ) 

30 batch_op.create_index( 1a

31 "ix_flow_run__state_timestamp", ["state_timestamp"], unique=False 

32 ) 

33 

34 with op.batch_alter_table("task_run", schema=None) as batch_op: 1a

35 batch_op.add_column( 1a

36 sa.Column( 

37 "state_timestamp", 

38 prefect.server.utilities.database.Timestamp(timezone=True), 

39 nullable=True, 

40 ) 

41 ) 

42 batch_op.create_index( 1a

43 "ix_task_run__state_timestamp", ["state_timestamp"], unique=False 

44 ) 

45 

46 update_flow_run_state_timestamp_in_batches = """ 1a

47 UPDATE flow_run 

48 SET state_timestamp = (SELECT timestamp from flow_run_state where flow_run.state_id = flow_run_state.id) 

49 WHERE flow_run.id in (SELECT id from flow_run where state_timestamp is null and state_id is not null limit 500); 

50 """ 

51 

52 update_task_run_state_timestamp_in_batches = """ 1a

53 UPDATE task_run 

54 SET state_timestamp = (SELECT timestamp from task_run_state where task_run.state_id = task_run_state.id) 

55 WHERE task_run.id in (SELECT id from task_run where state_timestamp is null and state_id is not null limit 500); 

56 """ 

57 

58 with op.get_context().autocommit_block(): 1a

59 conn = op.get_bind() 1a

60 while True: 1a

61 # execute until we've backfilled all flow run state timestamps 

62 # autocommit mode will commit each time `execute` is called 

63 result = conn.execute(sa.text(update_flow_run_state_timestamp_in_batches)) 1a

64 if result.rowcount <= 0: 64 ↛ 60line 64 didn't jump to line 60 because the condition on line 64 was always true1a

65 break 1a

66 

67 while True: 1a

68 # execute until we've backfilled all task run state timestamps 

69 # autocommit mode will commit each time `execute` is called 

70 result = conn.execute(sa.text(update_task_run_state_timestamp_in_batches)) 1a

71 if result.rowcount <= 0: 71 ↛ 67line 71 didn't jump to line 67 because the condition on line 71 was always true1a

72 break 1a

73 

74 

75def downgrade(): 1a

76 with op.batch_alter_table("task_run", schema=None) as batch_op: 

77 batch_op.drop_index("ix_task_run__state_timestamp") 

78 batch_op.drop_column("state_timestamp") 

79 

80 with op.batch_alter_table("flow_run", schema=None) as batch_op: 

81 batch_op.drop_index("ix_flow_run__state_timestamp") 

82 batch_op.drop_column("state_timestamp")