1 """Migrate V1 concurrency limits to V2
2
3 Revision ID: 9e83011d1f2a
4 Revises: 8bb517bae6f9
5 Create Date: 2025-09-17 09:12:13.171320
6
7 """
8
9 import json 1 ctx 1a
10
11 import sqlalchemy as sa 1 ctx 1a
12 from alembic import op 1 ctx 1a
13
14 # revision identifiers, used by Alembic.
15 revision = "9e83011d1f2a" 1 ctx 1a
16 down_revision = "8bb517bae6f9" 1 ctx 1a
17 branch_labels = None 1 ctx 1a
18 depends_on = None 1 ctx 1a
19
20
21 def upgrade ( ) : 1 ctx 1a
22 """Migrate V1 concurrency limits to V2.
23
24 Creates V2 limits with 'tag:' prefix and preserves active slot counts.
25 The V1 records are deleted after migration since the adapter routes
26 all V1 API calls to V2.
27 """
28
29 connection = op . get_bind ( ) 1 ctx 1a
30
31 # Read all V1 limits and create corresponding V2 limits
32 v1_limits = connection . execute ( 1 ctx 1a
33 sa . text ( "SELECT * FROM concurrency_limit" )
34 ) . fetchall ( )
35
36 # Get the configured lease storage to create actual leases
37 from prefect . server . concurrency . lease_storage import ( 1 ctx 1a
38 get_concurrency_lease_storage ,
39 )
40
41 lease_storage = get_concurrency_lease_storage ( ) 1 ctx 1a
42
43 for v1_limit in v1_limits : 43 ↛ 44 line 43 didn't jump to line 44 because the loop on line 43 never started 1 ctx 1a
44 v2_name = f" tag: { v1_limit . tag } "
45
46 # Check if V2 limit already exists (for idempotency)
47 existing = connection . execute (
48 sa . text ( "SELECT id FROM concurrency_limit_v2 WHERE name = :name" ) ,
49 { "name" : v2_name } ,
50 ) . fetchone ( )
51
52 if not existing :
53 # Parse active slots from JSON
54 active_slots = (
55 json . loads ( v1_limit . active_slots ) if v1_limit . active_slots else [ ]
56 )
57 active_count = len ( active_slots )
58
59 # Preserve the same ID when migrating
60 v2_id = v1_limit . id
61
62 # Create V2 limit with the same ID
63 connection . execute (
64 sa . text ( """
65 INSERT INTO concurrency_limit_v2
66 (id, name, "limit", active, active_slots, denied_slots,
67 slot_decay_per_second, avg_slot_occupancy_seconds,
68 created, updated)
69 VALUES
70 (:id, :name, :limit, :active, :active_slots, :denied_slots,
71 :decay, :avg_occupancy, :created, :updated)
72 """ ) ,
73 {
74 "id" : str ( v2_id ) ,
75 "name" : v2_name ,
76 "limit" : v1_limit . concurrency_limit ,
77 "active" : 1 , # SQLite uses 1/0 for boolean
78 "active_slots" : active_count ,
79 "denied_slots" : 0 ,
80 "decay" : 0.0 , # No decay for migrated limits
81 "avg_occupancy" : 600.0 , # Default 10 minutes
82 "created" : v1_limit . created ,
83 "updated" : v1_limit . updated ,
84 } ,
85 )
86
87 # Create actual leases if we have active slots
88 if active_slots :
89 try :
90 from datetime import timedelta
91 from uuid import UUID
92
93 from prefect . server . concurrency . lease_storage import (
94 ConcurrencyLimitLeaseMetadata ,
95 )
96
97 # Import the ConcurrencyLeaseHolder for preserving holder info
98 from prefect . types . _concurrency import ConcurrencyLeaseHolder
99
100 async def create_leases ( ) :
101 # Create one lease for each active slot, preserving the task_run holder
102 for slot_holder_id in active_slots :
103 await lease_storage . create_lease (
104 resource_ids = [ UUID ( str ( v2_id ) ) ] ,
105 ttl = timedelta ( days = 365 * 100 ) ,
106 metadata = ConcurrencyLimitLeaseMetadata (
107 slots = 1 ,
108 holder = ConcurrencyLeaseHolder (
109 type = "task_run" , id = UUID ( slot_holder_id )
110 ) ,
111 ) ,
112 )
113
114 # Use run_coro_as_sync to handle running async code from migration context
115 from prefect . utilities . asyncutils import run_coro_as_sync
116
117 run_coro_as_sync ( create_leases ( ) )
118 except Exception as e :
119 # Log but don't fail migration if lease creation fails
120 print ( f" Warning: Could not create leases for limit { v2_id } : { e } " )
121 pass
122
123 # Delete V1 records after migration - the adapter handles all V1 API calls
124 connection . execute ( sa . text ( "DELETE FROM concurrency_limit" ) ) 1 ctx 1a
125
126
127 def downgrade ( ) : 1 ctx 1a
128 """Restore V1 limits from V2."""
129 connection = op . get_bind ( )
130
131 # Get all V2 limits that were migrated from V1 (have tag: prefix)
132 v2_limits = connection . execute (
133 sa . text ( "SELECT * FROM concurrency_limit_v2 WHERE name LIKE 'tag:%'" )
134 ) . fetchall ( )
135
136 # Try to get lease storage for recovering active slots
137 from prefect . server . concurrency . lease_storage import (
138 get_concurrency_lease_storage ,
139 )
140
141 lease_storage = get_concurrency_lease_storage ( )
142
143 for v2_limit in v2_limits :
144 tag = v2_limit . name [ 4 : ] # Remove 'tag:' prefix
145
146 # Best effort: try to recover active slots from lease storage
147 active_slots = [ ]
148 try :
149 from uuid import UUID
150
151 async def get_holders ( ) :
152 holders = await lease_storage . list_holders_for_limit (
153 UUID ( str ( v2_limit . id ) )
154 )
155 return [
156 str ( holder . id )
157 for _ , holder in holders
158 if holder and holder . type == "task_run"
159 ]
160
161 from prefect . utilities . asyncutils import run_coro_as_sync
162
163 active_slots = run_coro_as_sync ( get_holders ( ) )
164 except Exception as e :
165 # Can't recover slots - that's OK, best effort
166 print ( f" Note: Could not recover active slots for { tag } : { e } " )
167
168 # Restore with the same ID and recovered active_slots (or empty if couldn't recover)
169 connection . execute (
170 sa . text ( """
171 INSERT INTO concurrency_limit
172 (id, tag, concurrency_limit, active_slots, created, updated)
173 VALUES (:id, :tag, :limit, :slots, :created, :updated)
174 """ ) ,
175 {
176 "id" : str ( v2_limit . id ) , # Preserve the same ID when downgrading
177 "tag" : tag ,
178 "limit" : v2_limit . limit ,
179 "slots" : json . dumps ( active_slots ) , # Best effort recovery from leases
180 "created" : v2_limit . created ,
181 "updated" : v2_limit . updated ,
182 } ,
183 )
184
185 # Delete the V2 limits that were created from V1
186 connection . execute (
187 sa . text ( "DELETE FROM concurrency_limit_v2 WHERE name LIKE 'tag:%'" )
188 )