1 """Expand work queue table
2
3 Revision ID: b9bda9f142f1
4 Revises: bb38729c471a
5 Create Date: 2023-01-25 11:43:48.160070
6
7 """
8
9 import sqlalchemy as sa 1 ctx 1a
10 from alembic import op 1 ctx 1a
11
12 import prefect 1 ctx 1a
13
14 # revision identifiers, used by Alembic.
15 revision = "b9bda9f142f1" 1 ctx 1a
16 down_revision = "f92143d30c27" 1 ctx 1a
17 branch_labels = None 1 ctx 1a
18 depends_on = None 1 ctx 1a
19
20 # Note: Downgrade for this migration is destructive if additional work pools have been created.
21
22
23 def upgrade ( ) : 1 ctx 1a
24 # dropping columns to efficiently clear indexes and constraints
25 with op . batch_alter_table ( "work_pool" , schema = None ) as batch_op : 1 ctx 1a
26 batch_op . drop_constraint ( "fk_work_pool__default_queue_id__work_pool_queue" ) 1 ctx 1a
27 with op . batch_alter_table ( "deployment" , schema = None ) as batch_op : 1 ctx 1a
28 batch_op . drop_index ( "ix_deployment__work_pool_queue_id" ) 1 ctx 1a
29 batch_op . drop_column ( "work_pool_queue_id" ) 1 ctx 1a
30 with op . batch_alter_table ( "flow_run" , schema = None ) as batch_op : 1 ctx 1a
31 batch_op . drop_index ( "ix_flow_run__work_pool_queue_id" ) 1 ctx 1a
32 batch_op . drop_column ( "work_pool_queue_id" ) 1 ctx 1a
33
34 with op . batch_alter_table ( "work_queue" , schema = None ) as batch_op : 1 ctx 1a
35 batch_op . add_column ( 1 ctx 1a
36 sa . Column (
37 "priority" ,
38 sa . Integer ( ) ,
39 nullable = False ,
40 server_default = "1" ,
41 )
42 )
43 batch_op . add_column ( 1 ctx 1a
44 sa . Column (
45 "work_pool_id" , prefect . server . utilities . database . UUID ( ) , nullable = True
46 )
47 )
48 batch_op . create_foreign_key ( 1 ctx 1a
49 batch_op . f ( "fk_work_queue__work_pool_id__work_pool" ) ,
50 "work_pool" ,
51 [ "work_pool_id" ] ,
52 [ "id" ] ,
53 ondelete = "cascade" ,
54 )
55
56 batch_op . create_index ( 1 ctx 1a
57 op . f ( "ix_work_queue__work_pool_id" ) ,
58 [ "work_pool_id" ] ,
59 unique = False ,
60 )
61 batch_op . create_index ( 1 ctx 1a
62 op . f ( "ix_work_queue__work_pool_id_priority" ) ,
63 [ "work_pool_id" , "priority" ] ,
64 unique = False ,
65 )
66
67 with op . batch_alter_table ( "flow_run" , schema = None ) as batch_op : 1 ctx 1a
68 batch_op . add_column ( 1 ctx 1a
69 sa . Column (
70 "work_queue_id" ,
71 prefect . server . utilities . database . UUID ( ) ,
72 nullable = True ,
73 )
74 )
75 batch_op . create_index ( 1 ctx 1a
76 batch_op . f ( "ix_flow_run__work_queue_id" ) ,
77 [ "work_queue_id" ] ,
78 unique = False ,
79 )
80 batch_op . create_foreign_key ( 1 ctx 1a
81 batch_op . f ( "fk_flow_run__work_queue_id__work_queue" ) ,
82 "work_queue" ,
83 [ "work_queue_id" ] ,
84 [ "id" ] ,
85 ondelete = "SET NULL" ,
86 )
87
88 with op . batch_alter_table ( "deployment" , schema = None ) as batch_op : 1 ctx 1a
89 batch_op . add_column ( 1 ctx 1a
90 sa . Column (
91 "work_queue_id" ,
92 prefect . server . utilities . database . UUID ( ) ,
93 nullable = True ,
94 )
95 )
96 batch_op . create_index ( 1 ctx 1a
97 batch_op . f ( "ix_deployment__work_queue_id" ) ,
98 [ "work_queue_id" ] ,
99 unique = False ,
100 )
101 batch_op . create_foreign_key ( 1 ctx 1a
102 batch_op . f ( "fk_deployment__work_queue_id__work_queue" ) ,
103 "work_queue" ,
104 [ "work_queue_id" ] ,
105 [ "id" ] ,
106 ondelete = "SET NULL" ,
107 )
108
109 with op . batch_alter_table ( "work_pool" , schema = None ) as batch_op : 1 ctx 1a
110 batch_op . create_foreign_key ( 1 ctx 1a
111 batch_op . f ( "fk_work_pool__default_queue_id__work_queue" ) ,
112 "work_queue" ,
113 [ "default_queue_id" ] ,
114 [ "id" ] ,
115 ondelete = "RESTRICT" ,
116 )
117 batch_op . alter_column ( "type" , nullable = False ) 1 ctx 1a
118
119
120 def downgrade ( ) : 1 ctx 1a
121 with op . batch_alter_table ( "work_queue" , schema = None ) as batch_op :
122 batch_op . drop_index ( "ix_work_queue__work_pool_id_priority" )
123 batch_op . drop_index ( "ix_work_queue__work_pool_id" )
124 batch_op . drop_constraint ( "fk_work_queue__work_pool_id__work_pool" )
125 batch_op . drop_column ( "work_pool_id" )
126 batch_op . drop_column ( "priority" )
127
128 with op . batch_alter_table ( "work_pool" , schema = None ) as batch_op :
129 batch_op . drop_constraint ( "fk_work_pool__default_queue_id__work_queue" )
130 batch_op . alter_column ( "type" , nullable = True )
131
132 with op . batch_alter_table ( "deployment" , schema = None ) as batch_op :
133 batch_op . drop_constraint ( "fk_deployment__work_queue_id__work_queue" )
134 batch_op . drop_index ( "ix_deployment__work_queue_id" )
135 batch_op . drop_column ( "work_queue_id" )
136
137 with op . batch_alter_table ( "flow_run" , schema = None ) as batch_op :
138 batch_op . drop_constraint ( "fk_flow_run__work_queue_id__work_queue" )
139 batch_op . drop_index ( "ix_flow_run__work_queue_id" )
140 batch_op . drop_column ( "work_queue_id" )
141
142 op . execute ( sa . text ( "DELETE FROM work_pool" ) )
143
144 with op . batch_alter_table ( "work_pool" , schema = None ) as batch_op :
145 batch_op . create_foreign_key (
146 batch_op . f ( "fk_work_pool__default_queue_id__work_pool_queue" ) ,
147 "work_pool_queue" ,
148 [ "default_queue_id" ] ,
149 [ "id" ] ,
150 ondelete = "RESTRICT" ,
151 )
152 batch_op . alter_column ( "type" , nullable = True )
153
154 with op . batch_alter_table ( "deployment" , schema = None ) as batch_op :
155 batch_op . add_column (
156 sa . Column (
157 "work_pool_queue_id" ,
158 prefect . server . utilities . database . UUID ( ) ,
159 nullable = True ,
160 )
161 )
162 batch_op . create_index (
163 batch_op . f ( "ix_deployment__work_pool_queue_id" ) ,
164 [ "work_pool_queue_id" ] ,
165 unique = False ,
166 )
167 batch_op . create_foreign_key (
168 batch_op . f ( "fk_deployment__work_pool_queue_id__work_pool_queue" ) ,
169 "work_pool_queue" ,
170 [ "work_pool_queue_id" ] ,
171 [ "id" ] ,
172 ondelete = "SET NULL" ,
173 )
174
175 with op . batch_alter_table ( "flow_run" , schema = None ) as batch_op :
176 batch_op . add_column (
177 sa . Column (
178 "work_pool_queue_id" ,
179 prefect . server . utilities . database . UUID ( ) ,
180 nullable = True ,
181 )
182 )
183 batch_op . create_index (
184 batch_op . f ( "ix_flow_run__work_pool_queue_id" ) ,
185 [ "work_pool_queue_id" ] ,
186 unique = False ,
187 )
188 batch_op . create_foreign_key (
189 batch_op . f ( "fk_flow_run__work_pool_queue_id__work_pool_queue" ) ,
190 "work_pool_queue" ,
191 [ "work_pool_queue_id" ] ,
192 [ "id" ] ,
193 ondelete = "SET NULL" ,
194 )