Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2022_04_21_113057_db6bde582447_backfill_state_name.py: 92%

20 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1"""Backfill state_name 

2 

3Revision ID: db6bde582447 

4Revises: 7f5f335cace3 

5Create Date: 2022-04-21 11:30:57.542292 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12# revision identifiers, used by Alembic. 

13revision = "db6bde582447" 1a

14down_revision = "7f5f335cace3" 1a

15branch_labels = None 1a

16depends_on = None 1a

17 

18 

19def upgrade(): 1a

20 """ 

21 Backfills state_name column for task_run and flow_run tables. 

22 

23 This is a data only migration that can be run as many 

24 times as desired. 

25 """ 

26 

27 update_flow_run_state_name_in_batches = """ 1a

28 UPDATE flow_run 

29 SET state_name = (SELECT name from flow_run_state where flow_run.state_id = flow_run_state.id) 

30 WHERE flow_run.id in (SELECT id from flow_run where state_name is null and state_id is not null limit 500); 

31 """ 

32 

33 update_task_run_state_name_in_batches = """ 1a

34 UPDATE task_run 

35 SET state_name = (SELECT name from task_run_state where task_run.state_id = task_run_state.id) 

36 WHERE task_run.id in (SELECT id from task_run where state_name is null and state_id is not null limit 500); 

37 """ 

38 

39 with op.get_context().autocommit_block(): 1a

40 conn = op.get_bind() 1a

41 while True: 1a

42 # execute until we've backfilled all flow run state names 

43 # autocommit mode will commit each time `execute` is called 

44 result = conn.execute(sa.text(update_flow_run_state_name_in_batches)) 1a

45 if result.rowcount <= 0: 45 ↛ 41line 45 didn't jump to line 41 because the condition on line 45 was always true1a

46 break 1a

47 

48 while True: 1a

49 # execute until we've backfilled all task run state names 

50 # autocommit mode will commit each time `execute` is called 

51 result = conn.execute(sa.text(update_task_run_state_name_in_batches)) 1a

52 if result.rowcount <= 0: 52 ↛ 48line 52 didn't jump to line 48 because the condition on line 52 was always true1a

53 break 1a

54 

55 

56def downgrade(): 1a

57 """ 

58 Data only migration. No action on downgrade. 

59 """