Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/foreman.py: 74%
62 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Foreman is a loop service designed to monitor workers.
3"""
5from datetime import timedelta 1a
6from typing import Any, Optional 1a
8import sqlalchemy as sa 1a
10from prefect.server import models 1a
11from prefect.server.database import PrefectDBInterface, db_injector 1a
12from prefect.server.models.deployments import mark_deployments_not_ready 1a
13from prefect.server.models.work_queues import mark_work_queues_not_ready 1a
14from prefect.server.models.workers import emit_work_pool_status_event 1a
15from prefect.server.schemas.internal import InternalWorkPoolUpdate 1a
16from prefect.server.schemas.statuses import ( 1a
17 DeploymentStatus,
18 WorkerStatus,
19 WorkPoolStatus,
20)
21from prefect.server.services.base import LoopService 1a
22from prefect.settings import ( 1a
23 PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS,
24 PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS,
25 PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE,
26 PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS,
27 PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS,
28 get_current_settings,
29)
30from prefect.settings.models.server.services import ServicesBaseSetting 1a
31from prefect.types._datetime import now 1a
34class Foreman(LoopService): 1a
35 """
36 Monitors the status of workers and their associated work pools
37 """
39 @classmethod 1a
40 def service_settings(cls) -> ServicesBaseSetting: 1a
41 return get_current_settings().server.services.foreman 1b
43 def __init__( 1a
44 self,
45 loop_seconds: Optional[float] = None,
46 inactivity_heartbeat_multiple: Optional[int] = None,
47 fallback_heartbeat_interval_seconds: Optional[int] = None,
48 deployment_last_polled_timeout_seconds: Optional[int] = None,
49 work_queue_last_polled_timeout_seconds: Optional[int] = None,
50 **kwargs: Any,
51 ):
52 super().__init__( 1b
53 loop_seconds=loop_seconds
54 or PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS.value(),
55 **kwargs,
56 )
57 self._inactivity_heartbeat_multiple = ( 1b
58 PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE.value()
59 if inactivity_heartbeat_multiple is None
60 else inactivity_heartbeat_multiple
61 )
62 self._fallback_heartbeat_interval_seconds = ( 1b
63 PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS.value()
64 if fallback_heartbeat_interval_seconds is None
65 else fallback_heartbeat_interval_seconds
66 )
67 self._deployment_last_polled_timeout_seconds = ( 1b
68 PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS.value()
69 if deployment_last_polled_timeout_seconds is None
70 else deployment_last_polled_timeout_seconds
71 )
72 self._work_queue_last_polled_timeout_seconds = ( 1b
73 PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS.value()
74 if work_queue_last_polled_timeout_seconds is None
75 else work_queue_last_polled_timeout_seconds
76 )
78 @db_injector 1a
79 async def run_once(self, db: PrefectDBInterface) -> None: 1a
80 """
81 Iterate over workers current marked as online. Mark workers as offline
82 if they have an old last_heartbeat_time. Marks work pools as not ready
83 if they do not have any online workers and are currently marked as ready.
84 Mark deployments as not ready if they have a last_polled time that is
85 older than the configured deployment last polled timeout.
86 """
87 await self._mark_online_workers_without_a_recent_heartbeat_as_offline() 1bdc
88 await self._mark_work_pools_as_not_ready() 1dc
89 await self._mark_deployments_as_not_ready()
90 await self._mark_work_queues_as_not_ready()
92 @db_injector 1a
93 async def _mark_online_workers_without_a_recent_heartbeat_as_offline( 1a
94 self, db: PrefectDBInterface
95 ) -> None:
96 """
97 Updates the status of workers that have an old last heartbeat time
98 to OFFLINE.
100 An old heartbeat last heartbeat that is one more than
101 their heartbeat interval multiplied by the
102 INACTIVITY_HEARTBEAT_MULTIPLE seconds ago.
104 Args:
105 session (AsyncSession): The session to use for the database operation.
106 """
107 async with db.session_context(begin_transaction=True) as session: 1bd
108 worker_update_stmt = ( 1b
109 sa.update(db.Worker)
110 .values(status=WorkerStatus.OFFLINE)
111 .where(
112 sa.func.date_diff_seconds(db.Worker.last_heartbeat_time)
113 > (
114 sa.func.coalesce(
115 db.Worker.heartbeat_interval_seconds,
116 sa.bindparam("default_interval", sa.Integer),
117 )
118 * sa.bindparam("multiplier", sa.Integer)
119 ),
120 db.Worker.status == WorkerStatus.ONLINE,
121 )
122 )
124 result = await session.execute( 1bdc
125 worker_update_stmt,
126 {
127 "multiplier": self._inactivity_heartbeat_multiple,
128 "default_interval": self._fallback_heartbeat_interval_seconds,
129 },
130 )
132 if result.rowcount: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true1d
133 self.logger.info(f"Marked {result.rowcount} workers as offline.")
135 @db_injector 1a
136 async def _mark_work_pools_as_not_ready(self, db: PrefectDBInterface): 1a
137 """
138 Marks a work pool as not ready.
140 Emits and event and updates any bookkeeping fields on the work pool.
142 Args:
143 work_pool (db.WorkPool): The work pool to mark as not ready.
144 """
145 async with db.session_context(begin_transaction=True) as session: 1dc
146 work_pools_select_stmt = ( 1d
147 sa.select(db.WorkPool)
148 .filter(db.WorkPool.status == "READY")
149 .outerjoin(
150 db.Worker,
151 sa.and_(
152 db.Worker.work_pool_id == db.WorkPool.id,
153 db.Worker.status == "ONLINE",
154 ),
155 )
156 .group_by(db.WorkPool.id)
157 .having(sa.func.count(db.Worker.id) == 0)
158 )
160 result = await session.execute(work_pools_select_stmt) 1d
161 work_pools = result.scalars().all()
163 for work_pool in work_pools:
164 await models.workers.update_work_pool(
165 session=session,
166 work_pool_id=work_pool.id,
167 work_pool=InternalWorkPoolUpdate(status=WorkPoolStatus.NOT_READY),
168 emit_status_change=emit_work_pool_status_event,
169 )
171 self.logger.info(f"Marked work pool {work_pool.id} as NOT_READY.")
173 @db_injector 1a
174 async def _mark_deployments_as_not_ready(self, db: PrefectDBInterface) -> None: 1a
175 """
176 Marks a deployment as NOT_READY and emits a deployment status event.
177 Emits an event and updates any bookkeeping fields on the deployment.
178 Args:
179 session (AsyncSession): The session to use for the database operation.
180 """
181 async with db.session_context(begin_transaction=True) as session:
182 status_timeout_threshold = now("UTC") - timedelta(
183 seconds=self._deployment_last_polled_timeout_seconds
184 )
185 deployment_id_select_stmt = (
186 sa.select(db.Deployment.id)
187 .outerjoin(db.WorkQueue, db.WorkQueue.id == db.Deployment.work_queue_id)
188 .filter(db.Deployment.status == DeploymentStatus.READY)
189 .filter(db.Deployment.last_polled.isnot(None))
190 .filter(
191 sa.or_(
192 # if work_queue.last_polled doesn't exist, use only deployment's
193 # last_polled
194 sa.and_(
195 db.WorkQueue.last_polled.is_(None),
196 db.Deployment.last_polled < status_timeout_threshold,
197 ),
198 # if work_queue.last_polled exists, both times should be less than
199 # the threshold
200 sa.and_(
201 db.WorkQueue.last_polled.isnot(None),
202 db.Deployment.last_polled < status_timeout_threshold,
203 db.WorkQueue.last_polled < status_timeout_threshold,
204 ),
205 )
206 )
207 )
208 result = await session.execute(deployment_id_select_stmt)
210 deployment_ids_to_mark_unready = result.scalars().all()
212 await mark_deployments_not_ready(
213 deployment_ids=deployment_ids_to_mark_unready,
214 )
216 @db_injector 1a
217 async def _mark_work_queues_as_not_ready(self, db: PrefectDBInterface): 1a
218 """
219 Marks work queues as NOT_READY based on their last_polled field.
221 Args:
222 session (AsyncSession): The session to use for the database operation.
223 """
224 async with db.session_context(begin_transaction=True) as session:
225 status_timeout_threshold = now("UTC") - timedelta(
226 seconds=self._work_queue_last_polled_timeout_seconds
227 )
228 id_select_stmt = (
229 sa.select(db.WorkQueue.id)
230 .outerjoin(db.WorkPool, db.WorkPool.id == db.WorkQueue.work_pool_id)
231 .filter(db.WorkQueue.status == "READY")
232 .filter(db.WorkQueue.last_polled.isnot(None))
233 .filter(db.WorkQueue.last_polled < status_timeout_threshold)
234 .order_by(db.WorkQueue.last_polled.asc())
235 )
236 result = await session.execute(id_select_stmt)
237 unready_work_queue_ids = result.scalars().all()
239 await mark_work_queues_not_ready(
240 work_queue_ids=unready_work_queue_ids,
241 )