Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/scheduler.py: 75%

89 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2The Scheduler service. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import asyncio 1a

8import datetime 1a

9from typing import Any, Sequence 1a

10from uuid import UUID 1a

11 

12import sqlalchemy as sa 1a

13 

14import prefect.server.models as models 1a

15from prefect.server.database import PrefectDBInterface 1a

16from prefect.server.database.dependencies import db_injector 1a

17from prefect.server.schemas.states import StateType 1a

18from prefect.server.services.base import LoopService, run_multiple_services 1a

19from prefect.settings import ( 1a

20 PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE, 

21 PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE, 

22 PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS, 

23 PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS, 

24 PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME, 

25 PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS, 

26 PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME, 

27) 

28from prefect.settings.context import get_current_settings 1a

29from prefect.settings.models.server.services import ServicesBaseSetting 1a

30from prefect.types._datetime import now 1a

31from prefect.utilities.collections import batched_iterable 1a

32 

33 

34class TryAgain(Exception): 1a

35 """Internal control-flow exception used to retry the Scheduler's main loop""" 

36 

37 

38class Scheduler(LoopService): 1a

39 """ 

40 Schedules flow runs from deployments. 

41 """ 

42 

43 # the main scheduler takes its loop interval from 

44 # PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS 

45 loop_seconds: float 1a

46 

47 @classmethod 1a

48 def service_settings(cls) -> ServicesBaseSetting: 1a

49 return get_current_settings().server.services.scheduler 1d

50 

51 def __init__(self, loop_seconds: float | None = None, **kwargs: Any): 1a

52 super().__init__( 1db

53 loop_seconds=( 

54 loop_seconds 

55 or self.loop_seconds 

56 or PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS.value() 

57 ), 

58 **kwargs, 

59 ) 

60 self.deployment_batch_size: int = ( 1d

61 PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE.value() 

62 ) 

63 self.max_runs: int = PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS.value() 1d

64 self.min_runs: int = PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS.value() 1d

65 self.max_scheduled_time: datetime.timedelta = ( 1db

66 PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value() 

67 ) 

68 self.min_scheduled_time: datetime.timedelta = ( 1d

69 PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME.value() 

70 ) 

71 self.insert_batch_size: int = ( 1d

72 PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE.value() 

73 ) 

74 

75 @db_injector 1ab

76 async def run_once(self, db: PrefectDBInterface) -> None: 1a

77 """ 

78 Schedule flow runs by: 

79 

80 - Querying for deployments with active schedules 

81 - Generating the next set of flow runs based on each deployments schedule 

82 - Inserting all scheduled flow runs into the database 

83 

84 All inserted flow runs are committed to the database at the termination of the 

85 loop. 

86 """ 

87 total_inserted_runs = 0 1dceb

88 

89 last_id = None 1dceb

90 while True: 1dceb

91 async with db.session_context(begin_transaction=False) as session: 1dfceb

92 query = self._get_select_deployments_to_schedule_query() 1dceb

93 

94 # use cursor based pagination 

95 if last_id: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true1dceb

96 query = query.where(db.Deployment.id > last_id) 

97 

98 result = await session.execute(query) 1dfceb

99 deployment_ids = result.scalars().unique().all() 1fcb

100 

101 # collect runs across all deployments 

102 try: 1fcb

103 runs_to_insert = await self._collect_flow_runs( 1fcb

104 session=session, deployment_ids=deployment_ids 

105 ) 

106 except TryAgain: 

107 continue 1c

108 

109 # bulk insert the runs based on batch size setting 

110 for batch in batched_iterable(runs_to_insert, self.insert_batch_size): 110 ↛ 111line 110 didn't jump to line 111 because the loop on line 110 never started1fceb

111 async with db.session_context(begin_transaction=True) as session: 

112 inserted_runs = await self._insert_scheduled_flow_runs( 

113 session=session, runs=list(batch) 

114 ) 

115 total_inserted_runs += len(inserted_runs) 

116 

117 # if this is the last page of deployments, exit the loop 

118 if len(deployment_ids) < self.deployment_batch_size: 118 ↛ 122line 118 didn't jump to line 122 because the condition on line 118 was always true1fceb

119 break 1fceb

120 else: 

121 # record the last deployment ID 

122 last_id = deployment_ids[-1] 

123 

124 self.logger.info(f"Scheduled {total_inserted_runs} runs.") 1fceb

125 

126 @db_injector 1a

127 def _get_select_deployments_to_schedule_query( 1a

128 self, db: PrefectDBInterface 

129 ) -> sa.Select[tuple[UUID]]: 

130 """ 

131 Returns a sqlalchemy query for selecting deployments to schedule. 

132 

133 The query gets the IDs of any deployments with: 

134 

135 - an active schedule 

136 - EITHER: 

137 - fewer than `min_runs` auto-scheduled runs 

138 - OR the max scheduled time is less than `max_scheduled_time` in the future 

139 """ 

140 right_now = now("UTC") 1dcb

141 query = ( 1dcb

142 sa.select(db.Deployment.id) 

143 .select_from(db.Deployment) 

144 # TODO: on Postgres, this could be replaced with a lateral join that 

145 # sorts by `next_scheduled_start_time desc` and limits by 

146 # `self.min_runs` for a ~ 50% speedup. At the time of writing, 

147 # performance of this universal query appears to be fast enough that 

148 # this optimization is not worth maintaining db-specific queries 

149 .join( 

150 db.FlowRun, 

151 # join on matching deployments, only picking up future scheduled runs 

152 sa.and_( 

153 db.Deployment.id == db.FlowRun.deployment_id, 

154 db.FlowRun.state_type == StateType.SCHEDULED, 

155 db.FlowRun.next_scheduled_start_time >= right_now, 

156 db.FlowRun.auto_scheduled.is_(True), 

157 ), 

158 isouter=True, 

159 ) 

160 .where( 

161 sa.and_( 

162 db.Deployment.paused.is_not(True), 

163 ( 

164 # Only include deployments that have at least one 

165 # active schedule. 

166 sa.select(db.DeploymentSchedule.deployment_id) 

167 .where( 

168 sa.and_( 

169 db.DeploymentSchedule.deployment_id == db.Deployment.id, 

170 db.DeploymentSchedule.active.is_(True), 

171 ) 

172 ) 

173 .exists() 

174 ), 

175 ) 

176 ) 

177 .group_by(db.Deployment.id) 

178 # having EITHER fewer than three runs OR runs not scheduled far enough out 

179 .having( 

180 sa.or_( 

181 sa.func.count(db.FlowRun.next_scheduled_start_time) < self.min_runs, 

182 sa.func.max(db.FlowRun.next_scheduled_start_time) 

183 < right_now + self.min_scheduled_time, 

184 ) 

185 ) 

186 .order_by(db.Deployment.id) 

187 .limit(self.deployment_batch_size) 

188 ) 

189 return query 1dcb

190 

191 async def _collect_flow_runs( 1a

192 self, 

193 session: sa.orm.Session, 

194 deployment_ids: Sequence[UUID], 

195 ) -> list[dict[str, Any]]: 

196 runs_to_insert: list[dict[str, Any]] = [] 1fceb

197 for deployment_id in deployment_ids: 197 ↛ 198line 197 didn't jump to line 198 because the loop on line 197 never started1fceb

198 right_now = now("UTC") 

199 # guard against erroneously configured schedules 

200 try: 

201 runs_to_insert.extend( 

202 await self._generate_scheduled_flow_runs( 

203 session=session, 

204 deployment_id=deployment_id, 

205 start_time=right_now, 

206 end_time=right_now + self.max_scheduled_time, 

207 min_time=self.min_scheduled_time, 

208 min_runs=self.min_runs, 

209 max_runs=self.max_runs, 

210 ) 

211 ) 

212 except Exception: 

213 self.logger.exception( 

214 f"Error scheduling deployment {deployment_id!r}.", 

215 ) 

216 finally: 

217 connection = await session.connection() 

218 if connection.invalidated: 

219 # If the error we handled above was the kind of database error that 

220 # causes underlying transaction to rollback and the connection to 

221 # become invalidated, rollback this session. Errors that may cause 

222 # this are connection drops, database restarts, and things of the 

223 # sort. 

224 # 

225 # This rollback _does not rollback a transaction_, since that has 

226 # actually already happened due to the error above. It brings the 

227 # Python session in sync with underlying connection so that when we 

228 # exec the outer with block, the context manager will not attempt to 

229 # commit the session. 

230 # 

231 # Then, raise TryAgain to break out of these nested loops, back to 

232 # the outer loop, where we'll begin a new transaction with 

233 # session.begin() in the next loop iteration. 

234 await session.rollback() 

235 raise TryAgain() 

236 return runs_to_insert 1fceb

237 

238 @db_injector 1a

239 async def _generate_scheduled_flow_runs( 1a

240 self, 

241 db: PrefectDBInterface, 

242 session: sa.orm.Session, 

243 deployment_id: UUID, 

244 start_time: datetime.datetime, 

245 end_time: datetime.datetime, 

246 min_time: datetime.timedelta, 

247 min_runs: int, 

248 max_runs: int, 

249 ) -> list[dict[str, Any]]: 

250 """ 

251 Given a `deployment_id` and schedule params, generates a list of flow run 

252 objects and associated scheduled states that represent scheduled flow runs. 

253 

254 Pass-through method for overrides. 

255 

256 

257 Args: 

258 session: a database session 

259 deployment_id: the id of the deployment to schedule 

260 start_time: the time from which to start scheduling runs 

261 end_time: runs will be scheduled until at most this time 

262 min_time: runs will be scheduled until at least this far in the future 

263 min_runs: a minimum amount of runs to schedule 

264 max_runs: a maximum amount of runs to schedule 

265 

266 This function will generate the minimum number of runs that satisfy the min 

267 and max times, and the min and max counts. Specifically, the following order 

268 will be respected: 

269 

270 - Runs will be generated starting on or after the `start_time` 

271 - No more than `max_runs` runs will be generated 

272 - No runs will be generated after `end_time` is reached 

273 - At least `min_runs` runs will be generated 

274 - Runs will be generated until at least `start_time + min_time` is reached 

275 

276 """ 

277 return await models.deployments._generate_scheduled_flow_runs( 

278 db, 

279 session=session, 

280 deployment_id=deployment_id, 

281 start_time=start_time, 

282 end_time=end_time, 

283 min_time=min_time, 

284 min_runs=min_runs, 

285 max_runs=max_runs, 

286 ) 

287 

288 async def _insert_scheduled_flow_runs( 1a

289 self, 

290 session: sa.orm.Session, 

291 runs: list[dict[str, Any]], 

292 ) -> Sequence[UUID]: 

293 """ 

294 Given a list of flow runs to schedule, as generated by 

295 `_generate_scheduled_flow_runs`, inserts them into the database. Note this is a 

296 separate method to facilitate batch operations on many scheduled runs. 

297 

298 Pass-through method for overrides. 

299 """ 

300 return await models.deployments._insert_scheduled_flow_runs( 

301 session=session, runs=runs 

302 ) 

303 

304 

305class RecentDeploymentsScheduler(Scheduler): 1a

306 """ 

307 Schedules deployments that were updated very recently 

308 

309 This scheduler can run on a tight loop and ensure that runs from 

310 newly-created or updated deployments are rapidly scheduled without having to 

311 wait for the "main" scheduler to complete its loop. 

312 

313 Note that scheduling is idempotent, so its ok for this scheduler to attempt 

314 to schedule the same deployments as the main scheduler. It's purpose is to 

315 accelerate scheduling for any deployments that users are interacting with. 

316 """ 

317 

318 # this scheduler runs on a tight loop 

319 loop_seconds: float 1a

320 

321 @classmethod 1a

322 def service_settings(cls) -> ServicesBaseSetting: 1a

323 return get_current_settings().server.services.scheduler 1d

324 

325 def __init__(self, loop_seconds: float | None = None, **kwargs: Any): 1a

326 super().__init__( 1d

327 loop_seconds=( 

328 loop_seconds 

329 or get_current_settings().server.services.scheduler.recent_deployments_loop_seconds 

330 ), 

331 **kwargs, 

332 ) 

333 

334 @db_injector 1a

335 def _get_select_deployments_to_schedule_query( 1a

336 self, db: PrefectDBInterface 

337 ) -> sa.Select[tuple[UUID]]: 

338 """ 

339 Returns a sqlalchemy query for selecting deployments to schedule 

340 """ 

341 query = ( 1dceb

342 sa.select(db.Deployment.id) 

343 .where( 

344 sa.and_( 

345 db.Deployment.paused.is_not(True), 

346 # use a slightly larger window than the loop interval to pick up 

347 # any deployments that were created *while* the scheduler was 

348 # last running (assuming the scheduler takes less than one 

349 # second to run). Scheduling is idempotent so picking up schedules 

350 # multiple times is not a concern. 

351 db.Deployment.updated 

352 >= now("UTC") - datetime.timedelta(seconds=self.loop_seconds + 1), 

353 ( 

354 # Only include deployments that have at least one 

355 # active schedule. 

356 sa.select(db.DeploymentSchedule.deployment_id) 

357 .where( 

358 sa.and_( 

359 db.DeploymentSchedule.deployment_id == db.Deployment.id, 

360 db.DeploymentSchedule.active.is_(True), 

361 ) 

362 ) 

363 .exists() 

364 ), 

365 ) 

366 ) 

367 .order_by(db.Deployment.id) 

368 .limit(self.deployment_batch_size) 

369 ) 

370 return query 1dceb

371 

372 

373if __name__ == "__main__": 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true1a

374 asyncio.run( 

375 run_multiple_services( 

376 [ 

377 Scheduler(handle_signals=True), 

378 RecentDeploymentsScheduler(handle_signals=True), 

379 ] 

380 ) 

381 )