1 """Add worker tables
2
3 Revision ID: fe77ad0dda06
4 Revises: 7201de756d85
5 Create Date: 2022-11-24 14:33:02.689796
6
7 """
8
9 import sqlalchemy as sa 1 ctx 1a
10 from alembic import op 1 ctx 1a
11
12 import prefect 1 ctx 1a
13
14 # revision identifiers, used by Alembic.
15 revision = "fe77ad0dda06" 1 ctx 1a
16 down_revision = "7201de756d85" 1 ctx 1a
17 branch_labels = None 1 ctx 1a
18 depends_on = None 1 ctx 1a
19
20
21 def upgrade ( ) : 1 ctx 1a
22 # Turn off FKs to prevent dropping
23 op . execute ( "PRAGMA foreign_keys=OFF" ) 1 ctx 1a
24
25 # ### commands auto generated by Alembic - please adjust! ###
26 op . create_table ( 1 ctx 1a
27 "worker_pool" ,
28 sa . Column (
29 "id" ,
30 prefect . server . utilities . database . UUID ( ) ,
31 server_default = sa . text (
32 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n "
33 " || lower(hex(randomblob(2)))\n || '-4'\n ||"
34 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||"
35 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||"
36 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||"
37 " lower(hex(randomblob(6)))\n )\n )"
38 ) ,
39 nullable = False ,
40 ) ,
41 sa . Column (
42 "created" ,
43 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
44 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
45 nullable = False ,
46 ) ,
47 sa . Column (
48 "updated" ,
49 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
50 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
51 nullable = False ,
52 ) ,
53 sa . Column ( "name" , sa . String ( ) , nullable = False ) ,
54 sa . Column ( "description" , sa . String ( ) , nullable = True ) ,
55 sa . Column ( "type" , sa . String ( ) , nullable = True ) ,
56 sa . Column (
57 "base_job_template" ,
58 prefect . server . utilities . database . JSON ( ) ,
59 nullable = False ,
60 server_default = "{}" ,
61 ) ,
62 sa . Column ( "is_paused" , sa . Boolean ( ) , server_default = "0" , nullable = False ) ,
63 sa . Column ( "concurrency_limit" , sa . Integer ( ) , nullable = True ) ,
64 sa . Column (
65 "default_queue_id" , prefect . server . utilities . database . UUID ( ) , nullable = True
66 ) ,
67 sa . PrimaryKeyConstraint ( "id" , name = op . f ( "pk_worker_pool" ) ) ,
68 sa . UniqueConstraint ( "name" , name = op . f ( "uq_worker_pool__name" ) ) ,
69 )
70 with op . batch_alter_table ( "worker_pool" , schema = None ) as batch_op : 1 ctx 1a
71 batch_op . create_index ( 1 ctx 1a
72 batch_op . f ( "ix_worker_pool__updated" ) , [ "updated" ] , unique = False
73 )
74 batch_op . create_index ( 1 ctx 1a
75 batch_op . f ( "ix_worker_pool__type" ) , [ "type" ] , unique = False
76 )
77
78 op . create_table ( 1 ctx 1a
79 "worker" ,
80 sa . Column (
81 "id" ,
82 prefect . server . utilities . database . UUID ( ) ,
83 server_default = sa . text (
84 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n "
85 " || lower(hex(randomblob(2)))\n || '-4'\n ||"
86 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||"
87 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||"
88 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||"
89 " lower(hex(randomblob(6)))\n )\n )"
90 ) ,
91 nullable = False ,
92 ) ,
93 sa . Column (
94 "created" ,
95 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
96 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
97 nullable = False ,
98 ) ,
99 sa . Column (
100 "updated" ,
101 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
102 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
103 nullable = False ,
104 ) ,
105 sa . Column ( "name" , sa . String ( ) , nullable = False ) ,
106 sa . Column (
107 "last_heartbeat_time" ,
108 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
109 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
110 nullable = False ,
111 ) ,
112 sa . Column (
113 "worker_pool_id" , prefect . server . utilities . database . UUID ( ) , nullable = False
114 ) ,
115 sa . ForeignKeyConstraint (
116 [ "worker_pool_id" ] ,
117 [ "worker_pool.id" ] ,
118 name = op . f ( "fk_worker__worker_pool_id__worker_pool" ) ,
119 ondelete = "cascade" ,
120 ) ,
121 sa . PrimaryKeyConstraint ( "id" , name = op . f ( "pk_worker" ) ) ,
122 sa . UniqueConstraint (
123 "worker_pool_id" ,
124 "name" ,
125 name = op . f ( "uq_worker__worker_pool_id_name" ) ,
126 ) ,
127 )
128 with op . batch_alter_table ( "worker" , schema = None ) as batch_op : 1 ctx 1a
129 batch_op . create_index ( 1 ctx 1a
130 batch_op . f ( "ix_worker__updated" ) , [ "updated" ] , unique = False
131 )
132 batch_op . create_index ( 1 ctx 1a
133 batch_op . f ( "ix_worker__worker_pool_id_last_heartbeat_time" ) ,
134 [ "worker_pool_id" , "last_heartbeat_time" ] ,
135 unique = False ,
136 )
137 batch_op . create_index ( 1 ctx 1a
138 batch_op . f ( "ix_worker__worker_pool_id" ) ,
139 [ "worker_pool_id" ] ,
140 unique = False ,
141 )
142
143 op . create_table ( 1 ctx 1a
144 "worker_pool_queue" ,
145 sa . Column (
146 "id" ,
147 prefect . server . utilities . database . UUID ( ) ,
148 server_default = sa . text (
149 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n "
150 " || lower(hex(randomblob(2)))\n || '-4'\n ||"
151 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||"
152 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||"
153 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||"
154 " lower(hex(randomblob(6)))\n )\n )"
155 ) ,
156 nullable = False ,
157 ) ,
158 sa . Column (
159 "created" ,
160 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
161 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
162 nullable = False ,
163 ) ,
164 sa . Column (
165 "updated" ,
166 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
167 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
168 nullable = False ,
169 ) ,
170 sa . Column ( "name" , sa . String ( ) , nullable = False ) ,
171 sa . Column ( "description" , sa . String ( ) , nullable = True ) ,
172 sa . Column ( "is_paused" , sa . Boolean ( ) , server_default = "0" , nullable = False ) ,
173 sa . Column ( "concurrency_limit" , sa . Integer ( ) , nullable = True ) ,
174 sa . Column ( "priority" , sa . Integer ( ) , nullable = False ) ,
175 sa . Column (
176 "worker_pool_id" , prefect . server . utilities . database . UUID ( ) , nullable = False
177 ) ,
178 sa . ForeignKeyConstraint (
179 [ "worker_pool_id" ] ,
180 [ "worker_pool.id" ] ,
181 name = op . f ( "fk_worker_pool_queue__worker_pool_id__worker_pool" ) ,
182 ondelete = "cascade" ,
183 ) ,
184 sa . PrimaryKeyConstraint ( "id" , name = op . f ( "pk_worker_pool_queue" ) ) ,
185 sa . UniqueConstraint (
186 "worker_pool_id" ,
187 "name" ,
188 name = op . f ( "uq_worker_pool_queue__worker_pool_id_name" ) ,
189 ) ,
190 )
191 with op . batch_alter_table ( "worker_pool_queue" , schema = None ) as batch_op : 1 ctx 1a
192 batch_op . create_index ( 1 ctx 1a
193 batch_op . f ( "ix_worker_pool_queue__updated" ) , [ "updated" ] , unique = False
194 )
195 batch_op . create_index ( 1 ctx 1a
196 batch_op . f ( "ix_worker_pool_queue__worker_pool_id_priority" ) ,
197 [ "worker_pool_id" , "priority" ] ,
198 unique = False ,
199 )
200 batch_op . create_index ( 1 ctx 1a
201 batch_op . f ( "ix_worker_pool_queue__worker_pool_id" ) ,
202 [ "worker_pool_id" ] ,
203 unique = False ,
204 )
205
206 with op . batch_alter_table ( "worker_pool" , schema = None ) as batch_op : 1 ctx 1a
207 batch_op . create_foreign_key ( 1 ctx 1a
208 batch_op . f ( "fk_worker_pool__default_queue_id__worker_pool_queue" ) ,
209 "worker_pool_queue" ,
210 [ "default_queue_id" ] ,
211 [ "id" ] ,
212 ondelete = "RESTRICT" ,
213 )
214
215 with op . batch_alter_table ( "deployment" , schema = None ) as batch_op : 1 ctx 1a
216 batch_op . add_column ( 1 ctx 1a
217 sa . Column (
218 "worker_pool_queue_id" ,
219 prefect . server . utilities . database . UUID ( ) ,
220 nullable = True ,
221 )
222 )
223 batch_op . create_index ( 1 ctx 1a
224 batch_op . f ( "ix_deployment__worker_pool_queue_id" ) ,
225 [ "worker_pool_queue_id" ] ,
226 unique = False ,
227 )
228 batch_op . create_foreign_key ( 1 ctx 1a
229 batch_op . f ( "fk_deployment__worker_pool_queue_id__worker_pool_queue" ) ,
230 "worker_pool_queue" ,
231 [ "worker_pool_queue_id" ] ,
232 [ "id" ] ,
233 ondelete = "SET NULL" ,
234 )
235
236 with op . batch_alter_table ( "flow_run" , schema = None ) as batch_op : 1 ctx 1a
237 batch_op . add_column ( 1 ctx 1a
238 sa . Column (
239 "worker_pool_queue_id" ,
240 prefect . server . utilities . database . UUID ( ) ,
241 nullable = True ,
242 )
243 )
244 batch_op . create_index ( 1 ctx 1a
245 batch_op . f ( "ix_flow_run__worker_pool_queue_id" ) ,
246 [ "worker_pool_queue_id" ] ,
247 unique = False ,
248 )
249 batch_op . create_foreign_key ( 1 ctx 1a
250 batch_op . f ( "fk_flow_run__worker_pool_queue_id__worker_pool_queue" ) ,
251 "worker_pool_queue" ,
252 [ "worker_pool_queue_id" ] ,
253 [ "id" ] ,
254 ondelete = "SET NULL" ,
255 )
256
257 # ### end Alembic commands ###
258
259 op . execute ( "PRAGMA foreign_keys=ON" ) 1 ctx 1a
260
261
262 def downgrade ( ) : 1 ctx 1a
263 op . execute ( "PRAGMA foreign_keys=OFF" )
264
265 # ### commands auto generated by Alembic - please adjust! ###
266 with op . batch_alter_table ( "deployment" , schema = None ) as batch_op :
267 batch_op . drop_index ( "ix_deployment__worker_pool_queue_id" )
268 batch_op . drop_constraint (
269 "fk_deployment__worker_pool_queue_id__worker_pool_queue"
270 )
271 batch_op . drop_column ( "worker_pool_queue_id" )
272 with op . batch_alter_table ( "flow_run" , schema = None ) as batch_op :
273 batch_op . drop_index ( "ix_flow_run__worker_pool_queue_id" )
274 batch_op . drop_constraint ( "fk_flow_run__worker_pool_queue_id__worker_pool_queue" )
275 batch_op . drop_column ( "worker_pool_queue_id" )
276
277 with op . batch_alter_table ( "worker_pool" , schema = None ) as batch_op :
278 batch_op . drop_constraint ( "fk_worker_pool__default_queue_id__worker_pool_queue" )
279 op . drop_table ( "worker_pool_queue" )
280 op . drop_table ( "worker" )
281 op . drop_table ( "worker_pool" )
282 # ### end Alembic commands ###
283
284 op . execute ( "PRAGMA foreign_keys=ON" )