Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/scheduler.py: 71%
89 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2The Scheduler service.
3"""
5from __future__ import annotations 1a
7import asyncio 1a
8import datetime 1a
9from typing import Any, Sequence 1a
10from uuid import UUID 1a
12import sqlalchemy as sa 1a
14import prefect.server.models as models 1a
15from prefect.server.database import PrefectDBInterface 1a
16from prefect.server.database.dependencies import db_injector 1a
17from prefect.server.schemas.states import StateType 1a
18from prefect.server.services.base import LoopService, run_multiple_services 1a
19from prefect.settings import ( 1a
20 PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE,
21 PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE,
22 PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS,
23 PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS,
24 PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME,
25 PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS,
26 PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME,
27)
28from prefect.settings.context import get_current_settings 1a
29from prefect.settings.models.server.services import ServicesBaseSetting 1a
30from prefect.types._datetime import now 1a
31from prefect.utilities.collections import batched_iterable 1a
34class TryAgain(Exception): 1a
35 """Internal control-flow exception used to retry the Scheduler's main loop"""
38class Scheduler(LoopService): 1a
39 """
40 Schedules flow runs from deployments.
41 """
43 # the main scheduler takes its loop interval from
44 # PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS
45 loop_seconds: float 1a
47 @classmethod 1a
48 def service_settings(cls) -> ServicesBaseSetting: 1a
49 return get_current_settings().server.services.scheduler 1b
51 def __init__(self, loop_seconds: float | None = None, **kwargs: Any): 1a
52 super().__init__( 1b
53 loop_seconds=(
54 loop_seconds
55 or self.loop_seconds
56 or PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS.value()
57 ),
58 **kwargs,
59 )
60 self.deployment_batch_size: int = ( 1b
61 PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE.value()
62 )
63 self.max_runs: int = PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS.value() 1b
64 self.min_runs: int = PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS.value() 1b
65 self.max_scheduled_time: datetime.timedelta = ( 1b
66 PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value()
67 )
68 self.min_scheduled_time: datetime.timedelta = ( 1b
69 PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME.value()
70 )
71 self.insert_batch_size: int = ( 1b
72 PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE.value()
73 )
75 @db_injector 1a
76 async def run_once(self, db: PrefectDBInterface) -> None: 1a
77 """
78 Schedule flow runs by:
80 - Querying for deployments with active schedules
81 - Generating the next set of flow runs based on each deployments schedule
82 - Inserting all scheduled flow runs into the database
84 All inserted flow runs are committed to the database at the termination of the
85 loop.
86 """
87 total_inserted_runs = 0 1b
89 last_id = None 1b
90 while True: 1b
91 async with db.session_context(begin_transaction=False) as session: 1bc
92 query = self._get_select_deployments_to_schedule_query() 1b
94 # use cursor based pagination
95 if last_id: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true1b
96 query = query.where(db.Deployment.id > last_id)
98 result = await session.execute(query) 1bc
99 deployment_ids = result.scalars().unique().all()
101 # collect runs across all deployments
102 try:
103 runs_to_insert = await self._collect_flow_runs(
104 session=session, deployment_ids=deployment_ids
105 )
106 except TryAgain:
107 continue
109 # bulk insert the runs based on batch size setting
110 for batch in batched_iterable(runs_to_insert, self.insert_batch_size): 110 ↛ 111line 110 didn't jump to line 111 because the loop on line 110 never started1c
111 async with db.session_context(begin_transaction=True) as session:
112 inserted_runs = await self._insert_scheduled_flow_runs(
113 session=session, runs=list(batch)
114 )
115 total_inserted_runs += len(inserted_runs)
117 # if this is the last page of deployments, exit the loop
118 if len(deployment_ids) < self.deployment_batch_size: 118 ↛ 122line 118 didn't jump to line 122 because the condition on line 118 was always true1c
119 break 1c
120 else:
121 # record the last deployment ID
122 last_id = deployment_ids[-1]
124 self.logger.info(f"Scheduled {total_inserted_runs} runs.") 1c
126 @db_injector 1a
127 def _get_select_deployments_to_schedule_query( 1a
128 self, db: PrefectDBInterface
129 ) -> sa.Select[tuple[UUID]]:
130 """
131 Returns a sqlalchemy query for selecting deployments to schedule.
133 The query gets the IDs of any deployments with:
135 - an active schedule
136 - EITHER:
137 - fewer than `min_runs` auto-scheduled runs
138 - OR the max scheduled time is less than `max_scheduled_time` in the future
139 """
140 right_now = now("UTC") 1b
141 query = ( 1b
142 sa.select(db.Deployment.id)
143 .select_from(db.Deployment)
144 # TODO: on Postgres, this could be replaced with a lateral join that
145 # sorts by `next_scheduled_start_time desc` and limits by
146 # `self.min_runs` for a ~ 50% speedup. At the time of writing,
147 # performance of this universal query appears to be fast enough that
148 # this optimization is not worth maintaining db-specific queries
149 .join(
150 db.FlowRun,
151 # join on matching deployments, only picking up future scheduled runs
152 sa.and_(
153 db.Deployment.id == db.FlowRun.deployment_id,
154 db.FlowRun.state_type == StateType.SCHEDULED,
155 db.FlowRun.next_scheduled_start_time >= right_now,
156 db.FlowRun.auto_scheduled.is_(True),
157 ),
158 isouter=True,
159 )
160 .where(
161 sa.and_(
162 db.Deployment.paused.is_not(True),
163 (
164 # Only include deployments that have at least one
165 # active schedule.
166 sa.select(db.DeploymentSchedule.deployment_id)
167 .where(
168 sa.and_(
169 db.DeploymentSchedule.deployment_id == db.Deployment.id,
170 db.DeploymentSchedule.active.is_(True),
171 )
172 )
173 .exists()
174 ),
175 )
176 )
177 .group_by(db.Deployment.id)
178 # having EITHER fewer than three runs OR runs not scheduled far enough out
179 .having(
180 sa.or_(
181 sa.func.count(db.FlowRun.next_scheduled_start_time) < self.min_runs,
182 sa.func.max(db.FlowRun.next_scheduled_start_time)
183 < right_now + self.min_scheduled_time,
184 )
185 )
186 .order_by(db.Deployment.id)
187 .limit(self.deployment_batch_size)
188 )
189 return query 1b
191 async def _collect_flow_runs( 1a
192 self,
193 session: sa.orm.Session,
194 deployment_ids: Sequence[UUID],
195 ) -> list[dict[str, Any]]:
196 runs_to_insert: list[dict[str, Any]] = [] 1c
197 for deployment_id in deployment_ids: 197 ↛ 198line 197 didn't jump to line 198 because the loop on line 197 never started1c
198 right_now = now("UTC")
199 # guard against erroneously configured schedules
200 try:
201 runs_to_insert.extend(
202 await self._generate_scheduled_flow_runs(
203 session=session,
204 deployment_id=deployment_id,
205 start_time=right_now,
206 end_time=right_now + self.max_scheduled_time,
207 min_time=self.min_scheduled_time,
208 min_runs=self.min_runs,
209 max_runs=self.max_runs,
210 )
211 )
212 except Exception:
213 self.logger.exception(
214 f"Error scheduling deployment {deployment_id!r}.",
215 )
216 finally:
217 connection = await session.connection()
218 if connection.invalidated:
219 # If the error we handled above was the kind of database error that
220 # causes underlying transaction to rollback and the connection to
221 # become invalidated, rollback this session. Errors that may cause
222 # this are connection drops, database restarts, and things of the
223 # sort.
224 #
225 # This rollback _does not rollback a transaction_, since that has
226 # actually already happened due to the error above. It brings the
227 # Python session in sync with underlying connection so that when we
228 # exec the outer with block, the context manager will not attempt to
229 # commit the session.
230 #
231 # Then, raise TryAgain to break out of these nested loops, back to
232 # the outer loop, where we'll begin a new transaction with
233 # session.begin() in the next loop iteration.
234 await session.rollback()
235 raise TryAgain()
236 return runs_to_insert 1c
238 @db_injector 1a
239 async def _generate_scheduled_flow_runs( 1a
240 self,
241 db: PrefectDBInterface,
242 session: sa.orm.Session,
243 deployment_id: UUID,
244 start_time: datetime.datetime,
245 end_time: datetime.datetime,
246 min_time: datetime.timedelta,
247 min_runs: int,
248 max_runs: int,
249 ) -> list[dict[str, Any]]:
250 """
251 Given a `deployment_id` and schedule params, generates a list of flow run
252 objects and associated scheduled states that represent scheduled flow runs.
254 Pass-through method for overrides.
257 Args:
258 session: a database session
259 deployment_id: the id of the deployment to schedule
260 start_time: the time from which to start scheduling runs
261 end_time: runs will be scheduled until at most this time
262 min_time: runs will be scheduled until at least this far in the future
263 min_runs: a minimum amount of runs to schedule
264 max_runs: a maximum amount of runs to schedule
266 This function will generate the minimum number of runs that satisfy the min
267 and max times, and the min and max counts. Specifically, the following order
268 will be respected:
270 - Runs will be generated starting on or after the `start_time`
271 - No more than `max_runs` runs will be generated
272 - No runs will be generated after `end_time` is reached
273 - At least `min_runs` runs will be generated
274 - Runs will be generated until at least `start_time + min_time` is reached
276 """
277 return await models.deployments._generate_scheduled_flow_runs(
278 db,
279 session=session,
280 deployment_id=deployment_id,
281 start_time=start_time,
282 end_time=end_time,
283 min_time=min_time,
284 min_runs=min_runs,
285 max_runs=max_runs,
286 )
288 async def _insert_scheduled_flow_runs( 1a
289 self,
290 session: sa.orm.Session,
291 runs: list[dict[str, Any]],
292 ) -> Sequence[UUID]:
293 """
294 Given a list of flow runs to schedule, as generated by
295 `_generate_scheduled_flow_runs`, inserts them into the database. Note this is a
296 separate method to facilitate batch operations on many scheduled runs.
298 Pass-through method for overrides.
299 """
300 return await models.deployments._insert_scheduled_flow_runs(
301 session=session, runs=runs
302 )
305class RecentDeploymentsScheduler(Scheduler): 1a
306 """
307 Schedules deployments that were updated very recently
309 This scheduler can run on a tight loop and ensure that runs from
310 newly-created or updated deployments are rapidly scheduled without having to
311 wait for the "main" scheduler to complete its loop.
313 Note that scheduling is idempotent, so its ok for this scheduler to attempt
314 to schedule the same deployments as the main scheduler. It's purpose is to
315 accelerate scheduling for any deployments that users are interacting with.
316 """
318 # this scheduler runs on a tight loop
319 loop_seconds: float 1a
321 @classmethod 1a
322 def service_settings(cls) -> ServicesBaseSetting: 1a
323 return get_current_settings().server.services.scheduler 1b
325 def __init__(self, loop_seconds: float | None = None, **kwargs: Any): 1a
326 super().__init__( 1b
327 loop_seconds=(
328 loop_seconds
329 or get_current_settings().server.services.scheduler.recent_deployments_loop_seconds
330 ),
331 **kwargs,
332 )
334 @db_injector 1a
335 def _get_select_deployments_to_schedule_query( 1a
336 self, db: PrefectDBInterface
337 ) -> sa.Select[tuple[UUID]]:
338 """
339 Returns a sqlalchemy query for selecting deployments to schedule
340 """
341 query = ( 1b
342 sa.select(db.Deployment.id)
343 .where(
344 sa.and_(
345 db.Deployment.paused.is_not(True),
346 # use a slightly larger window than the loop interval to pick up
347 # any deployments that were created *while* the scheduler was
348 # last running (assuming the scheduler takes less than one
349 # second to run). Scheduling is idempotent so picking up schedules
350 # multiple times is not a concern.
351 db.Deployment.updated
352 >= now("UTC") - datetime.timedelta(seconds=self.loop_seconds + 1),
353 (
354 # Only include deployments that have at least one
355 # active schedule.
356 sa.select(db.DeploymentSchedule.deployment_id)
357 .where(
358 sa.and_(
359 db.DeploymentSchedule.deployment_id == db.Deployment.id,
360 db.DeploymentSchedule.active.is_(True),
361 )
362 )
363 .exists()
364 ),
365 )
366 )
367 .order_by(db.Deployment.id)
368 .limit(self.deployment_batch_size)
369 )
370 return query 1b
373if __name__ == "__main__": 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true1a
374 asyncio.run(
375 run_multiple_services(
376 [
377 Scheduler(handle_signals=True),
378 RecentDeploymentsScheduler(handle_signals=True),
379 ]
380 )
381 )