Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2023_01_12_000043_f92143d30c25_create_migration_index.py: 80%

36 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1"""Adds a helper index for the artifact data migration 

2 

3Revision ID: f92143d30c25 

4Revises: f92143d30c24 

5Create Date: 2023-01-12 00:00:43.488367 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12# revision identifiers, used by Alembic. 

13revision = "f92143d30c25" 1a

14down_revision = "f92143d30c24" 1a

15branch_labels = None 1a

16depends_on = None 1a

17 

18 

19def upgrade(): 1a

20 op.execute("PRAGMA foreign_keys=OFF") 1a

21 

22 with op.batch_alter_table("flow_run_state", schema=None) as batch_op: 1a

23 batch_op.add_column(sa.Column("has_data", sa.Boolean)) 1a

24 batch_op.create_index( 1a

25 batch_op.f("ix_flow_run_state__has_data"), 

26 ["has_data"], 

27 unique=False, 

28 ) 

29 

30 with op.batch_alter_table("task_run_state", schema=None) as batch_op: 1a

31 batch_op.add_column(sa.Column("has_data", sa.Boolean)) 1a

32 batch_op.create_index( 1a

33 batch_op.f("ix_task_run_state__has_data"), 

34 ["has_data"], 

35 unique=False, 

36 ) 

37 

38 def populate_flow_has_data_in_batches(batch_size): 1a

39 return f""" 1a

40 UPDATE flow_run_state 

41 SET has_data = (data IS NOT NULL AND data IS NOT 'null') 

42 WHERE flow_run_state.id in (SELECT id FROM flow_run_state WHERE (has_data IS NULL) LIMIT {batch_size}); 

43 """ 

44 

45 def populate_task_has_data_in_batches(batch_size): 1a

46 return f""" 1a

47 UPDATE task_run_state 

48 SET has_data = (data IS NOT NULL AND data IS NOT 'null') 

49 WHERE task_run_state.id in (SELECT id FROM task_run_state WHERE (has_data IS NULL) LIMIT {batch_size}); 

50 """ 

51 

52 migration_statements = [ 1a

53 populate_flow_has_data_in_batches, 

54 populate_task_has_data_in_batches, 

55 ] 

56 

57 with op.get_context().autocommit_block(): 1a

58 conn = op.get_bind() 1a

59 for query in migration_statements: 1a

60 batch_size = 500 1a

61 

62 while True: 1a

63 # execute until we've updated task_run_state_id and artifact_data 

64 # autocommit mode will commit each time `execute` is called 

65 sql_stmt = sa.text(query(batch_size)) 1a

66 result = conn.execute(sql_stmt) 1a

67 

68 if result.rowcount < batch_size: 68 ↛ 62line 68 didn't jump to line 62 because the condition on line 68 was always true1a

69 break 1a

70 

71 

72def downgrade(): 1a

73 op.execute("PRAGMA foreign_keys=OFF") 

74 

75 with op.batch_alter_table("task_run_state", schema=None) as batch_op: 

76 batch_op.drop_index(batch_op.f("ix_task_run_state__has_data")) 

77 batch_op.drop_column("has_data") 

78 

79 with op.batch_alter_table("flow_run_state", schema=None) as batch_op: 

80 batch_op.drop_index(batch_op.f("ix_flow_run_state__has_data")) 

81 batch_op.drop_column("has_data")