Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2022_11_24_143302_fe77ad0dda06_add_worker_tables.py: 70%

50 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1"""Add worker tables 

2 

3Revision ID: fe77ad0dda06 

4Revises: 7201de756d85 

5Create Date: 2022-11-24 14:33:02.689796 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12import prefect 1a

13 

14# revision identifiers, used by Alembic. 

15revision = "fe77ad0dda06" 1a

16down_revision = "7201de756d85" 1a

17branch_labels = None 1a

18depends_on = None 1a

19 

20 

21def upgrade(): 1a

22 # Turn off FKs to prevent dropping 

23 op.execute("PRAGMA foreign_keys=OFF") 1a

24 

25 # ### commands auto generated by Alembic - please adjust! ### 

26 op.create_table( 1a

27 "worker_pool", 

28 sa.Column( 

29 "id", 

30 prefect.server.utilities.database.UUID(), 

31 server_default=sa.text( 

32 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

33 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

34 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

35 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

36 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

37 " lower(hex(randomblob(6)))\n )\n )" 

38 ), 

39 nullable=False, 

40 ), 

41 sa.Column( 

42 "created", 

43 prefect.server.utilities.database.Timestamp(timezone=True), 

44 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

45 nullable=False, 

46 ), 

47 sa.Column( 

48 "updated", 

49 prefect.server.utilities.database.Timestamp(timezone=True), 

50 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

51 nullable=False, 

52 ), 

53 sa.Column("name", sa.String(), nullable=False), 

54 sa.Column("description", sa.String(), nullable=True), 

55 sa.Column("type", sa.String(), nullable=True), 

56 sa.Column( 

57 "base_job_template", 

58 prefect.server.utilities.database.JSON(), 

59 nullable=False, 

60 server_default="{}", 

61 ), 

62 sa.Column("is_paused", sa.Boolean(), server_default="0", nullable=False), 

63 sa.Column("concurrency_limit", sa.Integer(), nullable=True), 

64 sa.Column( 

65 "default_queue_id", prefect.server.utilities.database.UUID(), nullable=True 

66 ), 

67 sa.PrimaryKeyConstraint("id", name=op.f("pk_worker_pool")), 

68 sa.UniqueConstraint("name", name=op.f("uq_worker_pool__name")), 

69 ) 

70 with op.batch_alter_table("worker_pool", schema=None) as batch_op: 1a

71 batch_op.create_index( 1a

72 batch_op.f("ix_worker_pool__updated"), ["updated"], unique=False 

73 ) 

74 batch_op.create_index( 1a

75 batch_op.f("ix_worker_pool__type"), ["type"], unique=False 

76 ) 

77 

78 op.create_table( 1a

79 "worker", 

80 sa.Column( 

81 "id", 

82 prefect.server.utilities.database.UUID(), 

83 server_default=sa.text( 

84 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

85 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

86 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

87 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

88 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

89 " lower(hex(randomblob(6)))\n )\n )" 

90 ), 

91 nullable=False, 

92 ), 

93 sa.Column( 

94 "created", 

95 prefect.server.utilities.database.Timestamp(timezone=True), 

96 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

97 nullable=False, 

98 ), 

99 sa.Column( 

100 "updated", 

101 prefect.server.utilities.database.Timestamp(timezone=True), 

102 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

103 nullable=False, 

104 ), 

105 sa.Column("name", sa.String(), nullable=False), 

106 sa.Column( 

107 "last_heartbeat_time", 

108 prefect.server.utilities.database.Timestamp(timezone=True), 

109 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

110 nullable=False, 

111 ), 

112 sa.Column( 

113 "worker_pool_id", prefect.server.utilities.database.UUID(), nullable=False 

114 ), 

115 sa.ForeignKeyConstraint( 

116 ["worker_pool_id"], 

117 ["worker_pool.id"], 

118 name=op.f("fk_worker__worker_pool_id__worker_pool"), 

119 ondelete="cascade", 

120 ), 

121 sa.PrimaryKeyConstraint("id", name=op.f("pk_worker")), 

122 sa.UniqueConstraint( 

123 "worker_pool_id", 

124 "name", 

125 name=op.f("uq_worker__worker_pool_id_name"), 

126 ), 

127 ) 

128 with op.batch_alter_table("worker", schema=None) as batch_op: 1a

129 batch_op.create_index( 1a

130 batch_op.f("ix_worker__updated"), ["updated"], unique=False 

131 ) 

132 batch_op.create_index( 1a

133 batch_op.f("ix_worker__worker_pool_id_last_heartbeat_time"), 

134 ["worker_pool_id", "last_heartbeat_time"], 

135 unique=False, 

136 ) 

137 batch_op.create_index( 1a

138 batch_op.f("ix_worker__worker_pool_id"), 

139 ["worker_pool_id"], 

140 unique=False, 

141 ) 

142 

143 op.create_table( 1a

144 "worker_pool_queue", 

145 sa.Column( 

146 "id", 

147 prefect.server.utilities.database.UUID(), 

148 server_default=sa.text( 

149 "(\n (\n lower(hex(randomblob(4)))\n || '-'\n " 

150 " || lower(hex(randomblob(2)))\n || '-4'\n ||" 

151 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

152 " substr('89ab',abs(random()) % 4 + 1, 1)\n ||" 

153 " substr(lower(hex(randomblob(2))),2)\n || '-'\n ||" 

154 " lower(hex(randomblob(6)))\n )\n )" 

155 ), 

156 nullable=False, 

157 ), 

158 sa.Column( 

159 "created", 

160 prefect.server.utilities.database.Timestamp(timezone=True), 

161 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

162 nullable=False, 

163 ), 

164 sa.Column( 

165 "updated", 

166 prefect.server.utilities.database.Timestamp(timezone=True), 

167 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

168 nullable=False, 

169 ), 

170 sa.Column("name", sa.String(), nullable=False), 

171 sa.Column("description", sa.String(), nullable=True), 

172 sa.Column("is_paused", sa.Boolean(), server_default="0", nullable=False), 

173 sa.Column("concurrency_limit", sa.Integer(), nullable=True), 

174 sa.Column("priority", sa.Integer(), nullable=False), 

175 sa.Column( 

176 "worker_pool_id", prefect.server.utilities.database.UUID(), nullable=False 

177 ), 

178 sa.ForeignKeyConstraint( 

179 ["worker_pool_id"], 

180 ["worker_pool.id"], 

181 name=op.f("fk_worker_pool_queue__worker_pool_id__worker_pool"), 

182 ondelete="cascade", 

183 ), 

184 sa.PrimaryKeyConstraint("id", name=op.f("pk_worker_pool_queue")), 

185 sa.UniqueConstraint( 

186 "worker_pool_id", 

187 "name", 

188 name=op.f("uq_worker_pool_queue__worker_pool_id_name"), 

189 ), 

190 ) 

191 with op.batch_alter_table("worker_pool_queue", schema=None) as batch_op: 1a

192 batch_op.create_index( 1a

193 batch_op.f("ix_worker_pool_queue__updated"), ["updated"], unique=False 

194 ) 

195 batch_op.create_index( 1a

196 batch_op.f("ix_worker_pool_queue__worker_pool_id_priority"), 

197 ["worker_pool_id", "priority"], 

198 unique=False, 

199 ) 

200 batch_op.create_index( 1a

201 batch_op.f("ix_worker_pool_queue__worker_pool_id"), 

202 ["worker_pool_id"], 

203 unique=False, 

204 ) 

205 

206 with op.batch_alter_table("worker_pool", schema=None) as batch_op: 1a

207 batch_op.create_foreign_key( 1a

208 batch_op.f("fk_worker_pool__default_queue_id__worker_pool_queue"), 

209 "worker_pool_queue", 

210 ["default_queue_id"], 

211 ["id"], 

212 ondelete="RESTRICT", 

213 ) 

214 

215 with op.batch_alter_table("deployment", schema=None) as batch_op: 1a

216 batch_op.add_column( 1a

217 sa.Column( 

218 "worker_pool_queue_id", 

219 prefect.server.utilities.database.UUID(), 

220 nullable=True, 

221 ) 

222 ) 

223 batch_op.create_index( 1a

224 batch_op.f("ix_deployment__worker_pool_queue_id"), 

225 ["worker_pool_queue_id"], 

226 unique=False, 

227 ) 

228 batch_op.create_foreign_key( 1a

229 batch_op.f("fk_deployment__worker_pool_queue_id__worker_pool_queue"), 

230 "worker_pool_queue", 

231 ["worker_pool_queue_id"], 

232 ["id"], 

233 ondelete="SET NULL", 

234 ) 

235 

236 with op.batch_alter_table("flow_run", schema=None) as batch_op: 1a

237 batch_op.add_column( 1a

238 sa.Column( 

239 "worker_pool_queue_id", 

240 prefect.server.utilities.database.UUID(), 

241 nullable=True, 

242 ) 

243 ) 

244 batch_op.create_index( 1a

245 batch_op.f("ix_flow_run__worker_pool_queue_id"), 

246 ["worker_pool_queue_id"], 

247 unique=False, 

248 ) 

249 batch_op.create_foreign_key( 1a

250 batch_op.f("fk_flow_run__worker_pool_queue_id__worker_pool_queue"), 

251 "worker_pool_queue", 

252 ["worker_pool_queue_id"], 

253 ["id"], 

254 ondelete="SET NULL", 

255 ) 

256 

257 # ### end Alembic commands ### 

258 

259 op.execute("PRAGMA foreign_keys=ON") 1a

260 

261 

262def downgrade(): 1a

263 op.execute("PRAGMA foreign_keys=OFF") 

264 

265 # ### commands auto generated by Alembic - please adjust! ### 

266 with op.batch_alter_table("deployment", schema=None) as batch_op: 

267 batch_op.drop_index("ix_deployment__worker_pool_queue_id") 

268 batch_op.drop_constraint( 

269 "fk_deployment__worker_pool_queue_id__worker_pool_queue" 

270 ) 

271 batch_op.drop_column("worker_pool_queue_id") 

272 with op.batch_alter_table("flow_run", schema=None) as batch_op: 

273 batch_op.drop_index("ix_flow_run__worker_pool_queue_id") 

274 batch_op.drop_constraint("fk_flow_run__worker_pool_queue_id__worker_pool_queue") 

275 batch_op.drop_column("worker_pool_queue_id") 

276 

277 with op.batch_alter_table("worker_pool", schema=None) as batch_op: 

278 batch_op.drop_constraint("fk_worker_pool__default_queue_id__worker_pool_queue") 

279 op.drop_table("worker_pool_queue") 

280 op.drop_table("worker") 

281 op.drop_table("worker_pool") 

282 # ### end Alembic commands ### 

283 

284 op.execute("PRAGMA foreign_keys=ON")