Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2023_01_25_114348_b9bda9f142f1_expand_work_queue_table.py: 54%

63 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1"""Expand work queue table 

2 

3Revision ID: b9bda9f142f1 

4Revises: bb38729c471a 

5Create Date: 2023-01-25 11:43:48.160070 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12import prefect 1a

13 

14# revision identifiers, used by Alembic. 

15revision = "b9bda9f142f1" 1a

16down_revision = "f92143d30c27" 1a

17branch_labels = None 1a

18depends_on = None 1a

19 

20# Note: Downgrade for this migration is destructive if additional work pools have been created. 

21 

22 

23def upgrade(): 1a

24 # dropping columns to efficiently clear indexes and constraints 

25 with op.batch_alter_table("work_pool", schema=None) as batch_op: 1a

26 batch_op.drop_constraint("fk_work_pool__default_queue_id__work_pool_queue") 1a

27 with op.batch_alter_table("deployment", schema=None) as batch_op: 1a

28 batch_op.drop_index("ix_deployment__work_pool_queue_id") 1a

29 batch_op.drop_column("work_pool_queue_id") 1a

30 with op.batch_alter_table("flow_run", schema=None) as batch_op: 1a

31 batch_op.drop_index("ix_flow_run__work_pool_queue_id") 1a

32 batch_op.drop_column("work_pool_queue_id") 1a

33 

34 with op.batch_alter_table("work_queue", schema=None) as batch_op: 1a

35 batch_op.add_column( 1a

36 sa.Column( 

37 "priority", 

38 sa.Integer(), 

39 nullable=False, 

40 server_default="1", 

41 ) 

42 ) 

43 batch_op.add_column( 1a

44 sa.Column( 

45 "work_pool_id", prefect.server.utilities.database.UUID(), nullable=True 

46 ) 

47 ) 

48 batch_op.create_foreign_key( 1a

49 batch_op.f("fk_work_queue__work_pool_id__work_pool"), 

50 "work_pool", 

51 ["work_pool_id"], 

52 ["id"], 

53 ondelete="cascade", 

54 ) 

55 

56 batch_op.create_index( 1a

57 op.f("ix_work_queue__work_pool_id"), 

58 ["work_pool_id"], 

59 unique=False, 

60 ) 

61 batch_op.create_index( 1a

62 op.f("ix_work_queue__work_pool_id_priority"), 

63 ["work_pool_id", "priority"], 

64 unique=False, 

65 ) 

66 

67 with op.batch_alter_table("flow_run", schema=None) as batch_op: 1a

68 batch_op.add_column( 1a

69 sa.Column( 

70 "work_queue_id", 

71 prefect.server.utilities.database.UUID(), 

72 nullable=True, 

73 ) 

74 ) 

75 batch_op.create_index( 1a

76 batch_op.f("ix_flow_run__work_queue_id"), 

77 ["work_queue_id"], 

78 unique=False, 

79 ) 

80 batch_op.create_foreign_key( 1a

81 batch_op.f("fk_flow_run__work_queue_id__work_queue"), 

82 "work_queue", 

83 ["work_queue_id"], 

84 ["id"], 

85 ondelete="SET NULL", 

86 ) 

87 

88 with op.batch_alter_table("deployment", schema=None) as batch_op: 1a

89 batch_op.add_column( 1a

90 sa.Column( 

91 "work_queue_id", 

92 prefect.server.utilities.database.UUID(), 

93 nullable=True, 

94 ) 

95 ) 

96 batch_op.create_index( 1a

97 batch_op.f("ix_deployment__work_queue_id"), 

98 ["work_queue_id"], 

99 unique=False, 

100 ) 

101 batch_op.create_foreign_key( 1a

102 batch_op.f("fk_deployment__work_queue_id__work_queue"), 

103 "work_queue", 

104 ["work_queue_id"], 

105 ["id"], 

106 ondelete="SET NULL", 

107 ) 

108 

109 with op.batch_alter_table("work_pool", schema=None) as batch_op: 1a

110 batch_op.create_foreign_key( 1a

111 batch_op.f("fk_work_pool__default_queue_id__work_queue"), 

112 "work_queue", 

113 ["default_queue_id"], 

114 ["id"], 

115 ondelete="RESTRICT", 

116 ) 

117 batch_op.alter_column("type", nullable=False) 1a

118 

119 

120def downgrade(): 1a

121 with op.batch_alter_table("work_queue", schema=None) as batch_op: 

122 batch_op.drop_index("ix_work_queue__work_pool_id_priority") 

123 batch_op.drop_index("ix_work_queue__work_pool_id") 

124 batch_op.drop_constraint("fk_work_queue__work_pool_id__work_pool") 

125 batch_op.drop_column("work_pool_id") 

126 batch_op.drop_column("priority") 

127 

128 with op.batch_alter_table("work_pool", schema=None) as batch_op: 

129 batch_op.drop_constraint("fk_work_pool__default_queue_id__work_queue") 

130 batch_op.alter_column("type", nullable=True) 

131 

132 with op.batch_alter_table("deployment", schema=None) as batch_op: 

133 batch_op.drop_constraint("fk_deployment__work_queue_id__work_queue") 

134 batch_op.drop_index("ix_deployment__work_queue_id") 

135 batch_op.drop_column("work_queue_id") 

136 

137 with op.batch_alter_table("flow_run", schema=None) as batch_op: 

138 batch_op.drop_constraint("fk_flow_run__work_queue_id__work_queue") 

139 batch_op.drop_index("ix_flow_run__work_queue_id") 

140 batch_op.drop_column("work_queue_id") 

141 

142 op.execute(sa.text("DELETE FROM work_pool")) 

143 

144 with op.batch_alter_table("work_pool", schema=None) as batch_op: 

145 batch_op.create_foreign_key( 

146 batch_op.f("fk_work_pool__default_queue_id__work_pool_queue"), 

147 "work_pool_queue", 

148 ["default_queue_id"], 

149 ["id"], 

150 ondelete="RESTRICT", 

151 ) 

152 batch_op.alter_column("type", nullable=True) 

153 

154 with op.batch_alter_table("deployment", schema=None) as batch_op: 

155 batch_op.add_column( 

156 sa.Column( 

157 "work_pool_queue_id", 

158 prefect.server.utilities.database.UUID(), 

159 nullable=True, 

160 ) 

161 ) 

162 batch_op.create_index( 

163 batch_op.f("ix_deployment__work_pool_queue_id"), 

164 ["work_pool_queue_id"], 

165 unique=False, 

166 ) 

167 batch_op.create_foreign_key( 

168 batch_op.f("fk_deployment__work_pool_queue_id__work_pool_queue"), 

169 "work_pool_queue", 

170 ["work_pool_queue_id"], 

171 ["id"], 

172 ondelete="SET NULL", 

173 ) 

174 

175 with op.batch_alter_table("flow_run", schema=None) as batch_op: 

176 batch_op.add_column( 

177 sa.Column( 

178 "work_pool_queue_id", 

179 prefect.server.utilities.database.UUID(), 

180 nullable=True, 

181 ) 

182 ) 

183 batch_op.create_index( 

184 batch_op.f("ix_flow_run__work_pool_queue_id"), 

185 ["work_pool_queue_id"], 

186 unique=False, 

187 ) 

188 batch_op.create_foreign_key( 

189 batch_op.f("fk_flow_run__work_pool_queue_id__work_pool_queue"), 

190 "work_pool_queue", 

191 ["work_pool_queue_id"], 

192 ["id"], 

193 ondelete="SET NULL", 

194 )