Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2023_01_12_000043_f92143d30c26_migrate_artifact_data.py: 58%

45 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1"""Migrates state data to the artifact table 

2 

3Revision ID: f92143d30c26 

4Revises: f92143d30c25 

5Create Date: 2023-01-12 00:00:43.488367 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12# revision identifiers, used by Alembic. 

13revision = "f92143d30c26" 1a

14down_revision = "f92143d30c25" 1a

15branch_labels = None 1a

16depends_on = None 1a

17 

18 

19def upgrade(): 1a

20 op.execute("PRAGMA foreign_keys=OFF") 1a

21 

22 ### START DATA MIGRATION 

23 

24 # insert nontrivial task run state results into the artifact table 

25 def update_task_run_artifact_data_in_batches(batch_size, offset): 1a

26 return f""" 1a

27 INSERT INTO artifact (task_run_state_id, task_run_id, data) 

28 SELECT id, task_run_id, data 

29 FROM task_run_state 

30 WHERE has_data IS TRUE 

31 ORDER by id 

32 LIMIT {batch_size} OFFSET {offset}; 

33 """ 

34 

35 # backpopulate the result artifact id on the task run state table 

36 def update_task_run_state_from_artifact_id_in_batches(batch_size, offset): 1a

37 return f""" 1a

38 UPDATE task_run_state 

39 SET result_artifact_id = (SELECT id FROM artifact WHERE task_run_state.id = task_run_state_id) 

40 WHERE task_run_state.id in (SELECT id FROM task_run_state WHERE (has_data IS TRUE) AND (result_artifact_id IS NULL) LIMIT {batch_size}); 

41 """ 

42 

43 # insert nontrivial flow run state results into the artifact table 

44 def update_flow_run_artifact_data_in_batches(batch_size, offset): 1a

45 return f""" 1a

46 INSERT INTO artifact (flow_run_state_id, flow_run_id, data) 

47 SELECT id, flow_run_id, data 

48 FROM flow_run_state 

49 WHERE has_data IS TRUE 

50 ORDER by id 

51 LIMIT {batch_size} OFFSET {offset}; 

52 """ 

53 

54 # backpopulate the result artifact id on the flow run state table 

55 def update_flow_run_state_from_artifact_id_in_batches(batch_size, offset): 1a

56 return f""" 1a

57 UPDATE flow_run_state 

58 SET result_artifact_id = (SELECT id FROM artifact WHERE flow_run_state.id = flow_run_state_id) 

59 WHERE flow_run_state.id in (SELECT id FROM flow_run_state WHERE (has_data IS TRUE) AND (result_artifact_id IS NULL) LIMIT {batch_size}); 

60 """ 

61 

62 data_migration_queries = [ 1a

63 update_task_run_artifact_data_in_batches, 

64 update_task_run_state_from_artifact_id_in_batches, 

65 update_flow_run_artifact_data_in_batches, 

66 update_flow_run_state_from_artifact_id_in_batches, 

67 ] 

68 

69 with op.get_context().autocommit_block(): 1a

70 conn = op.get_bind() 1a

71 for query in data_migration_queries: 1a

72 batch_size = 500 1a

73 offset = 0 1a

74 

75 while True: 1a

76 # execute until we've updated task_run_state_id and artifact_data 

77 # autocommit mode will commit each time `execute` is called 

78 sql_stmt = sa.text(query(batch_size, offset)) 1a

79 result = conn.execute(sql_stmt) 1a

80 

81 if result.rowcount <= 0: 81 ↛ 84line 81 didn't jump to line 84 because the condition on line 81 was always true1a

82 break 1a

83 

84 offset += batch_size 

85 

86 ### END DATA MIGRATION 

87 

88 

89def downgrade(): 1a

90 def nullify_artifact_ref_from_flow_run_state_in_batches(batch_size): 

91 return f""" 

92 UPDATE flow_run_state 

93 SET result_artifact_id = NULL 

94 WHERE flow_run_state.id in (SELECT id FROM flow_run_state WHERE result_artifact_id IS NOT NULL LIMIT {batch_size}); 

95 """ 

96 

97 def nullify_artifact_ref_from_task_run_state_in_batches(batch_size): 

98 return f""" 

99 UPDATE task_run_state 

100 SET result_artifact_id = NULL 

101 WHERE task_run_state.id in (SELECT id FROM task_run_state WHERE result_artifact_id IS NOT NULL LIMIT {batch_size}); 

102 """ 

103 

104 def delete_artifacts_in_batches(batch_size): 

105 return f""" 

106 DELETE FROM artifact 

107 WHERE artifact.id IN (SELECT id FROM artifact LIMIT {batch_size}); 

108 """ 

109 

110 data_migration_queries = [ 

111 delete_artifacts_in_batches, 

112 nullify_artifact_ref_from_flow_run_state_in_batches, 

113 nullify_artifact_ref_from_task_run_state_in_batches, 

114 ] 

115 

116 with op.get_context().autocommit_block(): 

117 conn = op.get_bind() 

118 for query in data_migration_queries: 

119 batch_size = 500 

120 

121 while True: 

122 # execute until we've updated task_run_state_id and artifact_data 

123 # autocommit mode will commit each time `execute` is called 

124 sql_stmt = sa.text(query(batch_size)) 

125 result = conn.execute(sql_stmt) 

126 

127 if result.rowcount <= 0: 

128 break