Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2025_09_17_091213_9e83011d1f2a_migrate_v1_concurrency_limits_to_v2.py: 25%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1"""Migrate V1 concurrency limits to V2 

2 

3Revision ID: 9e83011d1f2a 

4Revises: 8bb517bae6f9 

5Create Date: 2025-09-17 09:12:13.171320 

6 

7""" 

8 

9import json 1a

10 

11import sqlalchemy as sa 1a

12from alembic import op 1a

13 

14# revision identifiers, used by Alembic. 

15revision = "9e83011d1f2a" 1a

16down_revision = "8bb517bae6f9" 1a

17branch_labels = None 1a

18depends_on = None 1a

19 

20 

21def upgrade(): 1a

22 """Migrate V1 concurrency limits to V2. 

23 

24 Creates V2 limits with 'tag:' prefix and preserves active slot counts. 

25 The V1 records are deleted after migration since the adapter routes 

26 all V1 API calls to V2. 

27 """ 

28 

29 connection = op.get_bind() 1a

30 

31 # Read all V1 limits and create corresponding V2 limits 

32 v1_limits = connection.execute( 1a

33 sa.text("SELECT * FROM concurrency_limit") 

34 ).fetchall() 

35 

36 # Get the configured lease storage to create actual leases 

37 from prefect.server.concurrency.lease_storage import ( 1a

38 get_concurrency_lease_storage, 

39 ) 

40 

41 lease_storage = get_concurrency_lease_storage() 1a

42 

43 for v1_limit in v1_limits: 43 ↛ 44line 43 didn't jump to line 44 because the loop on line 43 never started1a

44 v2_name = f"tag:{v1_limit.tag}" 

45 

46 # Check if V2 limit already exists (for idempotency) 

47 existing = connection.execute( 

48 sa.text("SELECT id FROM concurrency_limit_v2 WHERE name = :name"), 

49 {"name": v2_name}, 

50 ).fetchone() 

51 

52 if not existing: 

53 # Parse active slots from JSON 

54 active_slots = ( 

55 json.loads(v1_limit.active_slots) if v1_limit.active_slots else [] 

56 ) 

57 active_count = len(active_slots) 

58 

59 # Preserve the same ID when migrating 

60 v2_id = v1_limit.id 

61 

62 # Create V2 limit with the same ID 

63 connection.execute( 

64 sa.text(""" 

65 INSERT INTO concurrency_limit_v2  

66 (id, name, "limit", active, active_slots, denied_slots, 

67 slot_decay_per_second, avg_slot_occupancy_seconds,  

68 created, updated) 

69 VALUES  

70 (:id, :name, :limit, :active, :active_slots, :denied_slots, 

71 :decay, :avg_occupancy, :created, :updated) 

72 """), 

73 { 

74 "id": str(v2_id), 

75 "name": v2_name, 

76 "limit": v1_limit.concurrency_limit, 

77 "active": 1, # SQLite uses 1/0 for boolean 

78 "active_slots": active_count, 

79 "denied_slots": 0, 

80 "decay": 0.0, # No decay for migrated limits 

81 "avg_occupancy": 600.0, # Default 10 minutes 

82 "created": v1_limit.created, 

83 "updated": v1_limit.updated, 

84 }, 

85 ) 

86 

87 # Create actual leases if we have active slots 

88 if active_slots: 

89 try: 

90 from datetime import timedelta 

91 from uuid import UUID 

92 

93 from prefect.server.concurrency.lease_storage import ( 

94 ConcurrencyLimitLeaseMetadata, 

95 ) 

96 

97 # Import the ConcurrencyLeaseHolder for preserving holder info 

98 from prefect.types._concurrency import ConcurrencyLeaseHolder 

99 

100 async def create_leases(): 

101 # Create one lease for each active slot, preserving the task_run holder 

102 for slot_holder_id in active_slots: 

103 await lease_storage.create_lease( 

104 resource_ids=[UUID(str(v2_id))], 

105 ttl=timedelta(days=365 * 100), 

106 metadata=ConcurrencyLimitLeaseMetadata( 

107 slots=1, 

108 holder=ConcurrencyLeaseHolder( 

109 type="task_run", id=UUID(slot_holder_id) 

110 ), 

111 ), 

112 ) 

113 

114 # Use run_coro_as_sync to handle running async code from migration context 

115 from prefect.utilities.asyncutils import run_coro_as_sync 

116 

117 run_coro_as_sync(create_leases()) 

118 except Exception as e: 

119 # Log but don't fail migration if lease creation fails 

120 print(f"Warning: Could not create leases for limit {v2_id}: {e}") 

121 pass 

122 

123 # Delete V1 records after migration - the adapter handles all V1 API calls 

124 connection.execute(sa.text("DELETE FROM concurrency_limit")) 1a

125 

126 

127def downgrade(): 1a

128 """Restore V1 limits from V2.""" 

129 connection = op.get_bind() 

130 

131 # Get all V2 limits that were migrated from V1 (have tag: prefix) 

132 v2_limits = connection.execute( 

133 sa.text("SELECT * FROM concurrency_limit_v2 WHERE name LIKE 'tag:%'") 

134 ).fetchall() 

135 

136 # Try to get lease storage for recovering active slots 

137 from prefect.server.concurrency.lease_storage import ( 

138 get_concurrency_lease_storage, 

139 ) 

140 

141 lease_storage = get_concurrency_lease_storage() 

142 

143 for v2_limit in v2_limits: 

144 tag = v2_limit.name[4:] # Remove 'tag:' prefix 

145 

146 # Best effort: try to recover active slots from lease storage 

147 active_slots = [] 

148 try: 

149 from uuid import UUID 

150 

151 async def get_holders(): 

152 holders = await lease_storage.list_holders_for_limit( 

153 UUID(str(v2_limit.id)) 

154 ) 

155 return [ 

156 str(holder.id) 

157 for _, holder in holders 

158 if holder and holder.type == "task_run" 

159 ] 

160 

161 from prefect.utilities.asyncutils import run_coro_as_sync 

162 

163 active_slots = run_coro_as_sync(get_holders()) 

164 except Exception as e: 

165 # Can't recover slots - that's OK, best effort 

166 print(f"Note: Could not recover active slots for {tag}: {e}") 

167 

168 # Restore with the same ID and recovered active_slots (or empty if couldn't recover) 

169 connection.execute( 

170 sa.text(""" 

171 INSERT INTO concurrency_limit 

172 (id, tag, concurrency_limit, active_slots, created, updated) 

173 VALUES (:id, :tag, :limit, :slots, :created, :updated) 

174 """), 

175 { 

176 "id": str(v2_limit.id), # Preserve the same ID when downgrading 

177 "tag": tag, 

178 "limit": v2_limit.limit, 

179 "slots": json.dumps(active_slots), # Best effort recovery from leases 

180 "created": v2_limit.created, 

181 "updated": v2_limit.updated, 

182 }, 

183 ) 

184 

185 # Delete the V2 limits that were created from V1 

186 connection.execute( 

187 sa.text("DELETE FROM concurrency_limit_v2 WHERE name LIKE 'tag:%'") 

188 )