Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2023_01_08_175327_bb38729c471a_rename_worker_pools_to_work_pools.py: 55%

87 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1"""Rename Worker Pools to Work Pools 

2 

3Revision ID: bb38729c471a 

4Revises: fe77ad0dda06 

5Create Date: 2023-01-08 17:53:27.444733 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12import prefect 1a

13 

14# revision identifiers, used by Alembic. 

15revision = "bb38729c471a" 1a

16down_revision = "fe77ad0dda06" 1a

17branch_labels = None 1a

18depends_on = None 1a

19 

20 

21### NOTE ### 

22# This upgrade is destructive for anyone who has created a worker 

23### 

24 

25 

26def upgrade(): 1a

27 op.execute("PRAGMA foreign_keys=OFF") 1a

28 

29 ## first we reset to a clean, no-worker slate 

30 with op.batch_alter_table("deployment", schema=None) as batch_op: 1a

31 batch_op.drop_index("ix_deployment__worker_pool_queue_id") 1a

32 batch_op.drop_constraint( 1a

33 "fk_deployment__worker_pool_queue_id__worker_pool_queue" 

34 ) 

35 batch_op.drop_column("worker_pool_queue_id") 1a

36 with op.batch_alter_table("flow_run", schema=None) as batch_op: 1a

37 batch_op.drop_index("ix_flow_run__worker_pool_queue_id") 1a

38 batch_op.drop_constraint("fk_flow_run__worker_pool_queue_id__worker_pool_queue") 1a

39 batch_op.drop_column("worker_pool_queue_id") 1a

40 

41 with op.batch_alter_table("worker_pool", schema=None) as batch_op: 1a

42 batch_op.drop_constraint("fk_worker_pool__default_queue_id__worker_pool_queue") 1a

43 op.drop_table("worker_pool_queue") 1a

44 op.drop_table("worker") 1a

45 op.drop_table("worker_pool") 1a

46 

47 ## then we recreate all tables and relationships 

48 op.create_table( 1a

49 "work_pool", 

50 sa.Column( 

51 "id", 

52 prefect.server.utilities.database.UUID(), 

53 server_default=sa.text( 

54 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

55 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

56 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

57 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

58 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

59 " lower(hex(randomblob(6)))\n )\n )" 

60 ), 

61 nullable=False, 

62 ), 

63 sa.Column( 

64 "created", 

65 prefect.server.utilities.database.Timestamp(timezone=True), 

66 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

67 nullable=False, 

68 ), 

69 sa.Column( 

70 "updated", 

71 prefect.server.utilities.database.Timestamp(timezone=True), 

72 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

73 nullable=False, 

74 ), 

75 sa.Column("name", sa.String(), nullable=False), 

76 sa.Column("description", sa.String(), nullable=True), 

77 sa.Column("type", sa.String(), nullable=True), 

78 sa.Column( 

79 "base_job_template", 

80 prefect.server.utilities.database.JSON(), 

81 nullable=False, 

82 server_default="{}", 

83 ), 

84 sa.Column("is_paused", sa.Boolean(), server_default="0", nullable=False), 

85 sa.Column("concurrency_limit", sa.Integer(), nullable=True), 

86 sa.Column( 

87 "default_queue_id", prefect.server.utilities.database.UUID(), nullable=True 

88 ), 

89 sa.PrimaryKeyConstraint("id", name=op.f("pk_work_pool")), 

90 sa.UniqueConstraint("name", name=op.f("uq_work_pool__name")), 

91 ) 

92 with op.batch_alter_table("work_pool", schema=None) as batch_op: 1a

93 batch_op.create_index( 1a

94 batch_op.f("ix_work_pool__updated"), ["updated"], unique=False 

95 ) 

96 batch_op.create_index(batch_op.f("ix_work_pool__type"), ["type"], unique=False) 1a

97 

98 op.create_table( 1a

99 "worker", 

100 sa.Column( 

101 "id", 

102 prefect.server.utilities.database.UUID(), 

103 server_default=sa.text( 

104 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

105 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

106 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

107 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

108 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

109 " lower(hex(randomblob(6)))\n )\n )" 

110 ), 

111 nullable=False, 

112 ), 

113 sa.Column( 

114 "created", 

115 prefect.server.utilities.database.Timestamp(timezone=True), 

116 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

117 nullable=False, 

118 ), 

119 sa.Column( 

120 "updated", 

121 prefect.server.utilities.database.Timestamp(timezone=True), 

122 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

123 nullable=False, 

124 ), 

125 sa.Column("name", sa.String(), nullable=False), 

126 sa.Column( 

127 "last_heartbeat_time", 

128 prefect.server.utilities.database.Timestamp(timezone=True), 

129 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

130 nullable=False, 

131 ), 

132 sa.Column( 

133 "work_pool_id", prefect.server.utilities.database.UUID(), nullable=False 

134 ), 

135 sa.ForeignKeyConstraint( 

136 ["work_pool_id"], 

137 ["work_pool.id"], 

138 name=op.f("fk_worker__work_pool_id__work_pool"), 

139 ondelete="cascade", 

140 ), 

141 sa.PrimaryKeyConstraint("id", name=op.f("pk_worker")), 

142 sa.UniqueConstraint( 

143 "work_pool_id", 

144 "name", 

145 name=op.f("uq_worker__work_pool_id_name"), 

146 ), 

147 ) 

148 with op.batch_alter_table("worker", schema=None) as batch_op: 1a

149 batch_op.create_index( 1a

150 batch_op.f("ix_worker__updated"), ["updated"], unique=False 

151 ) 

152 batch_op.create_index( 1a

153 batch_op.f("ix_worker__work_pool_id_last_heartbeat_time"), 

154 ["work_pool_id", "last_heartbeat_time"], 

155 unique=False, 

156 ) 

157 batch_op.create_index( 1a

158 batch_op.f("ix_worker__work_pool_id"), 

159 ["work_pool_id"], 

160 unique=False, 

161 ) 

162 

163 op.create_table( 1a

164 "work_pool_queue", 

165 sa.Column( 

166 "id", 

167 prefect.server.utilities.database.UUID(), 

168 server_default=sa.text( 

169 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

170 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

171 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

172 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

173 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

174 " lower(hex(randomblob(6)))\n )\n )" 

175 ), 

176 nullable=False, 

177 ), 

178 sa.Column( 

179 "created", 

180 prefect.server.utilities.database.Timestamp(timezone=True), 

181 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

182 nullable=False, 

183 ), 

184 sa.Column( 

185 "updated", 

186 prefect.server.utilities.database.Timestamp(timezone=True), 

187 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

188 nullable=False, 

189 ), 

190 sa.Column("name", sa.String(), nullable=False), 

191 sa.Column("description", sa.String(), nullable=True), 

192 sa.Column("is_paused", sa.Boolean(), server_default="0", nullable=False), 

193 sa.Column("concurrency_limit", sa.Integer(), nullable=True), 

194 sa.Column("priority", sa.Integer(), nullable=False), 

195 sa.Column( 

196 "work_pool_id", prefect.server.utilities.database.UUID(), nullable=False 

197 ), 

198 sa.ForeignKeyConstraint( 

199 ["work_pool_id"], 

200 ["work_pool.id"], 

201 name=op.f("fk_work_pool_queue__work_pool_id__work_pool"), 

202 ondelete="cascade", 

203 ), 

204 sa.PrimaryKeyConstraint("id", name=op.f("pk_work_pool_queue")), 

205 sa.UniqueConstraint( 

206 "work_pool_id", 

207 "name", 

208 name=op.f("uq_work_pool_queue__work_pool_id_name"), 

209 ), 

210 ) 

211 with op.batch_alter_table("work_pool_queue", schema=None) as batch_op: 1a

212 batch_op.create_index( 1a

213 batch_op.f("ix_work_pool_queue__updated"), ["updated"], unique=False 

214 ) 

215 batch_op.create_index( 1a

216 batch_op.f("ix_work_pool_queue__work_pool_id_priority"), 

217 ["work_pool_id", "priority"], 

218 unique=False, 

219 ) 

220 batch_op.create_index( 1a

221 batch_op.f("ix_work_pool_queue__work_pool_id"), 

222 ["work_pool_id"], 

223 unique=False, 

224 ) 

225 

226 with op.batch_alter_table("work_pool", schema=None) as batch_op: 1a

227 batch_op.create_foreign_key( 1a

228 batch_op.f("fk_work_pool__default_queue_id__work_pool_queue"), 

229 "work_pool_queue", 

230 ["default_queue_id"], 

231 ["id"], 

232 ondelete="RESTRICT", 

233 ) 

234 

235 with op.batch_alter_table("deployment", schema=None) as batch_op: 1a

236 batch_op.add_column( 1a

237 sa.Column( 

238 "work_pool_queue_id", 

239 prefect.server.utilities.database.UUID(), 

240 nullable=True, 

241 ) 

242 ) 

243 batch_op.create_index( 1a

244 batch_op.f("ix_deployment__work_pool_queue_id"), 

245 ["work_pool_queue_id"], 

246 unique=False, 

247 ) 

248 batch_op.create_foreign_key( 1a

249 batch_op.f("fk_deployment__work_pool_queue_id__work_pool_queue"), 

250 "work_pool_queue", 

251 ["work_pool_queue_id"], 

252 ["id"], 

253 ondelete="SET NULL", 

254 ) 

255 

256 with op.batch_alter_table("flow_run", schema=None) as batch_op: 1a

257 batch_op.add_column( 1a

258 sa.Column( 

259 "work_pool_queue_id", 

260 prefect.server.utilities.database.UUID(), 

261 nullable=True, 

262 ) 

263 ) 

264 batch_op.create_index( 1a

265 batch_op.f("ix_flow_run__work_pool_queue_id"), 

266 ["work_pool_queue_id"], 

267 unique=False, 

268 ) 

269 batch_op.create_foreign_key( 1a

270 batch_op.f("fk_flow_run__work_pool_queue_id__work_pool_queue"), 

271 "work_pool_queue", 

272 ["work_pool_queue_id"], 

273 ["id"], 

274 ondelete="SET NULL", 

275 ) 

276 

277 op.execute("PRAGMA foreign_keys=ON") 1a

278 

279 

280def downgrade(): 1a

281 op.execute("PRAGMA foreign_keys=OFF") 

282 

283 with op.batch_alter_table("deployment", schema=None) as batch_op: 

284 batch_op.drop_index("ix_deployment__work_pool_queue_id") 

285 batch_op.drop_constraint("fk_deployment__work_pool_queue_id__work_pool_queue") 

286 batch_op.drop_column("work_pool_queue_id") 

287 with op.batch_alter_table("flow_run", schema=None) as batch_op: 

288 batch_op.drop_index("ix_flow_run__work_pool_queue_id") 

289 batch_op.drop_constraint("fk_flow_run__work_pool_queue_id__work_pool_queue") 

290 batch_op.drop_column("work_pool_queue_id") 

291 

292 with op.batch_alter_table("work_pool", schema=None) as batch_op: 

293 batch_op.drop_constraint("fk_work_pool__default_queue_id__work_pool_queue") 

294 op.drop_table("work_pool_queue") 

295 op.drop_table("worker") 

296 op.drop_table("work_pool") 

297 

298 op.create_table( 

299 "worker_pool", 

300 sa.Column( 

301 "id", 

302 prefect.server.utilities.database.UUID(), 

303 server_default=sa.text( 

304 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

305 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

306 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

307 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

308 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

309 " lower(hex(randomblob(6)))\n )\n )" 

310 ), 

311 nullable=False, 

312 ), 

313 sa.Column( 

314 "created", 

315 prefect.server.utilities.database.Timestamp(timezone=True), 

316 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

317 nullable=False, 

318 ), 

319 sa.Column( 

320 "updated", 

321 prefect.server.utilities.database.Timestamp(timezone=True), 

322 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

323 nullable=False, 

324 ), 

325 sa.Column("name", sa.String(), nullable=False), 

326 sa.Column("description", sa.String(), nullable=True), 

327 sa.Column("type", sa.String(), nullable=True), 

328 sa.Column( 

329 "base_job_template", 

330 prefect.server.utilities.database.JSON(), 

331 nullable=False, 

332 server_default="{}", 

333 ), 

334 sa.Column("is_paused", sa.Boolean(), server_default="0", nullable=False), 

335 sa.Column("concurrency_limit", sa.Integer(), nullable=True), 

336 sa.Column( 

337 "default_queue_id", prefect.server.utilities.database.UUID(), nullable=True 

338 ), 

339 sa.PrimaryKeyConstraint("id", name=op.f("pk_worker_pool")), 

340 sa.UniqueConstraint("name", name=op.f("uq_worker_pool__name")), 

341 ) 

342 with op.batch_alter_table("worker_pool", schema=None) as batch_op: 

343 batch_op.create_index( 

344 batch_op.f("ix_worker_pool__updated"), ["updated"], unique=False 

345 ) 

346 batch_op.create_index( 

347 batch_op.f("ix_worker_pool__type"), ["type"], unique=False 

348 ) 

349 

350 op.create_table( 

351 "worker", 

352 sa.Column( 

353 "id", 

354 prefect.server.utilities.database.UUID(), 

355 server_default=sa.text( 

356 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

357 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

358 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

359 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

360 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

361 " lower(hex(randomblob(6)))\n )\n )" 

362 ), 

363 nullable=False, 

364 ), 

365 sa.Column( 

366 "created", 

367 prefect.server.utilities.database.Timestamp(timezone=True), 

368 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

369 nullable=False, 

370 ), 

371 sa.Column( 

372 "updated", 

373 prefect.server.utilities.database.Timestamp(timezone=True), 

374 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

375 nullable=False, 

376 ), 

377 sa.Column("name", sa.String(), nullable=False), 

378 sa.Column( 

379 "last_heartbeat_time", 

380 prefect.server.utilities.database.Timestamp(timezone=True), 

381 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

382 nullable=False, 

383 ), 

384 sa.Column( 

385 "worker_pool_id", prefect.server.utilities.database.UUID(), nullable=False 

386 ), 

387 sa.ForeignKeyConstraint( 

388 ["worker_pool_id"], 

389 ["worker_pool.id"], 

390 name=op.f("fk_worker__worker_pool_id__worker_pool"), 

391 ondelete="cascade", 

392 ), 

393 sa.PrimaryKeyConstraint("id", name=op.f("pk_worker")), 

394 sa.UniqueConstraint( 

395 "worker_pool_id", 

396 "name", 

397 name=op.f("uq_worker__worker_pool_id_name"), 

398 ), 

399 ) 

400 with op.batch_alter_table("worker", schema=None) as batch_op: 

401 batch_op.create_index( 

402 batch_op.f("ix_worker__updated"), ["updated"], unique=False 

403 ) 

404 batch_op.create_index( 

405 batch_op.f("ix_worker__worker_pool_id_last_heartbeat_time"), 

406 ["worker_pool_id", "last_heartbeat_time"], 

407 unique=False, 

408 ) 

409 batch_op.create_index( 

410 batch_op.f("ix_worker__worker_pool_id"), 

411 ["worker_pool_id"], 

412 unique=False, 

413 ) 

414 

415 op.create_table( 

416 "worker_pool_queue", 

417 sa.Column( 

418 "id", 

419 prefect.server.utilities.database.UUID(), 

420 server_default=sa.text( 

421 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

422 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

423 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

424 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

425 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

426 " lower(hex(randomblob(6)))\n )\n )" 

427 ), 

428 nullable=False, 

429 ), 

430 sa.Column( 

431 "created", 

432 prefect.server.utilities.database.Timestamp(timezone=True), 

433 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

434 nullable=False, 

435 ), 

436 sa.Column( 

437 "updated", 

438 prefect.server.utilities.database.Timestamp(timezone=True), 

439 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

440 nullable=False, 

441 ), 

442 sa.Column("name", sa.String(), nullable=False), 

443 sa.Column("description", sa.String(), nullable=True), 

444 sa.Column("is_paused", sa.Boolean(), server_default="0", nullable=False), 

445 sa.Column("concurrency_limit", sa.Integer(), nullable=True), 

446 sa.Column("priority", sa.Integer(), nullable=False), 

447 sa.Column( 

448 "worker_pool_id", prefect.server.utilities.database.UUID(), nullable=False 

449 ), 

450 sa.ForeignKeyConstraint( 

451 ["worker_pool_id"], 

452 ["worker_pool.id"], 

453 name=op.f("fk_worker_pool_queue__worker_pool_id__worker_pool"), 

454 ondelete="cascade", 

455 ), 

456 sa.PrimaryKeyConstraint("id", name=op.f("pk_worker_pool_queue")), 

457 sa.UniqueConstraint( 

458 "worker_pool_id", 

459 "name", 

460 name=op.f("uq_worker_pool_queue__worker_pool_id_name"), 

461 ), 

462 ) 

463 with op.batch_alter_table("worker_pool_queue", schema=None) as batch_op: 

464 batch_op.create_index( 

465 batch_op.f("ix_worker_pool_queue__updated"), ["updated"], unique=False 

466 ) 

467 batch_op.create_index( 

468 batch_op.f("ix_worker_pool_queue__worker_pool_id_priority"), 

469 ["worker_pool_id", "priority"], 

470 unique=False, 

471 ) 

472 batch_op.create_index( 

473 batch_op.f("ix_worker_pool_queue__worker_pool_id"), 

474 ["worker_pool_id"], 

475 unique=False, 

476 ) 

477 

478 with op.batch_alter_table("worker_pool", schema=None) as batch_op: 

479 batch_op.create_foreign_key( 

480 batch_op.f("fk_worker_pool__default_queue_id__worker_pool_queue"), 

481 "worker_pool_queue", 

482 ["default_queue_id"], 

483 ["id"], 

484 ondelete="RESTRICT", 

485 ) 

486 

487 with op.batch_alter_table("deployment", schema=None) as batch_op: 

488 batch_op.add_column( 

489 sa.Column( 

490 "worker_pool_queue_id", 

491 prefect.server.utilities.database.UUID(), 

492 nullable=True, 

493 ) 

494 ) 

495 batch_op.create_index( 

496 batch_op.f("ix_deployment__worker_pool_queue_id"), 

497 ["worker_pool_queue_id"], 

498 unique=False, 

499 ) 

500 batch_op.create_foreign_key( 

501 batch_op.f("fk_deployment__worker_pool_queue_id__worker_pool_queue"), 

502 "worker_pool_queue", 

503 ["worker_pool_queue_id"], 

504 ["id"], 

505 ondelete="SET NULL", 

506 ) 

507 

508 with op.batch_alter_table("flow_run", schema=None) as batch_op: 

509 batch_op.add_column( 

510 sa.Column( 

511 "worker_pool_queue_id", 

512 prefect.server.utilities.database.UUID(), 

513 nullable=True, 

514 ) 

515 ) 

516 batch_op.create_index( 

517 batch_op.f("ix_flow_run__worker_pool_queue_id"), 

518 ["worker_pool_queue_id"], 

519 unique=False, 

520 ) 

521 batch_op.create_foreign_key( 

522 batch_op.f("fk_flow_run__worker_pool_queue_id__worker_pool_queue"), 

523 "worker_pool_queue", 

524 ["worker_pool_queue_id"], 

525 ["id"], 

526 ondelete="SET NULL", 

527 ) 

528 op.execute("PRAGMA foreign_keys=ON")