Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/foreman.py: 100%
62 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Foreman is a loop service designed to monitor workers.
3"""
5from datetime import timedelta 1b
6from typing import Any, Optional 1b
8import sqlalchemy as sa 1b
10from prefect.server import models 1b
11from prefect.server.database import PrefectDBInterface, db_injector 1b
12from prefect.server.models.deployments import mark_deployments_not_ready 1b
13from prefect.server.models.work_queues import mark_work_queues_not_ready 1b
14from prefect.server.models.workers import emit_work_pool_status_event 1b
15from prefect.server.schemas.internal import InternalWorkPoolUpdate 1b
16from prefect.server.schemas.statuses import ( 1b
17 DeploymentStatus,
18 WorkerStatus,
19 WorkPoolStatus,
20)
21from prefect.server.services.base import LoopService 1b
22from prefect.settings import ( 1b
23 PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS,
24 PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS,
25 PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE,
26 PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS,
27 PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS,
28 get_current_settings,
29)
30from prefect.settings.models.server.services import ServicesBaseSetting 1b
31from prefect.types._datetime import now 1b
34class Foreman(LoopService): 1b
35 """
36 Monitors the status of workers and their associated work pools
37 """
39 @classmethod 1b
40 def service_settings(cls) -> ServicesBaseSetting: 1b
41 return get_current_settings().server.services.foreman 1d
43 def __init__( 1b
44 self,
45 loop_seconds: Optional[float] = None,
46 inactivity_heartbeat_multiple: Optional[int] = None,
47 fallback_heartbeat_interval_seconds: Optional[int] = None,
48 deployment_last_polled_timeout_seconds: Optional[int] = None,
49 work_queue_last_polled_timeout_seconds: Optional[int] = None,
50 **kwargs: Any,
51 ):
52 super().__init__( 1da
53 loop_seconds=loop_seconds
54 or PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS.value(),
55 **kwargs,
56 )
57 self._inactivity_heartbeat_multiple = ( 1d
58 PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE.value()
59 if inactivity_heartbeat_multiple is None
60 else inactivity_heartbeat_multiple
61 )
62 self._fallback_heartbeat_interval_seconds = ( 1da
63 PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS.value()
64 if fallback_heartbeat_interval_seconds is None
65 else fallback_heartbeat_interval_seconds
66 )
67 self._deployment_last_polled_timeout_seconds = ( 1d
68 PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS.value()
69 if deployment_last_polled_timeout_seconds is None
70 else deployment_last_polled_timeout_seconds
71 )
72 self._work_queue_last_polled_timeout_seconds = ( 1d
73 PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS.value()
74 if work_queue_last_polled_timeout_seconds is None
75 else work_queue_last_polled_timeout_seconds
76 )
78 @db_injector 1b
79 async def run_once(self, db: PrefectDBInterface) -> None: 1b
80 """
81 Iterate over workers current marked as online. Mark workers as offline
82 if they have an old last_heartbeat_time. Marks work pools as not ready
83 if they do not have any online workers and are currently marked as ready.
84 Mark deployments as not ready if they have a last_polled time that is
85 older than the configured deployment last polled timeout.
86 """
87 await self._mark_online_workers_without_a_recent_heartbeat_as_offline() 1deca
88 await self._mark_work_pools_as_not_ready() 1eca
89 await self._mark_deployments_as_not_ready() 1ca
90 await self._mark_work_queues_as_not_ready() 1ca
92 @db_injector 1b
93 async def _mark_online_workers_without_a_recent_heartbeat_as_offline( 1b
94 self, db: PrefectDBInterface
95 ) -> None:
96 """
97 Updates the status of workers that have an old last heartbeat time
98 to OFFLINE.
100 An old heartbeat last heartbeat that is one more than
101 their heartbeat interval multiplied by the
102 INACTIVITY_HEARTBEAT_MULTIPLE seconds ago.
104 Args:
105 session (AsyncSession): The session to use for the database operation.
106 """
107 async with db.session_context(begin_transaction=True) as session: 1deca
108 worker_update_stmt = ( 1dca
109 sa.update(db.Worker)
110 .values(status=WorkerStatus.OFFLINE)
111 .where(
112 sa.func.date_diff_seconds(db.Worker.last_heartbeat_time)
113 > (
114 sa.func.coalesce(
115 db.Worker.heartbeat_interval_seconds,
116 sa.bindparam("default_interval", sa.Integer),
117 )
118 * sa.bindparam("multiplier", sa.Integer)
119 ),
120 db.Worker.status == WorkerStatus.ONLINE,
121 )
122 )
124 result = await session.execute( 1deca
125 worker_update_stmt,
126 {
127 "multiplier": self._inactivity_heartbeat_multiple,
128 "default_interval": self._fallback_heartbeat_interval_seconds,
129 },
130 )
132 if result.rowcount: 1eca
133 self.logger.info(f"Marked {result.rowcount} workers as offline.")
135 @db_injector 1b
136 async def _mark_work_pools_as_not_ready(self, db: PrefectDBInterface): 1b
137 """
138 Marks a work pool as not ready.
140 Emits and event and updates any bookkeeping fields on the work pool.
142 Args:
143 work_pool (db.WorkPool): The work pool to mark as not ready.
144 """
145 async with db.session_context(begin_transaction=True) as session: 1eca
146 work_pools_select_stmt = ( 1eca
147 sa.select(db.WorkPool)
148 .filter(db.WorkPool.status == "READY")
149 .outerjoin(
150 db.Worker,
151 sa.and_(
152 db.Worker.work_pool_id == db.WorkPool.id,
153 db.Worker.status == "ONLINE",
154 ),
155 )
156 .group_by(db.WorkPool.id)
157 .having(sa.func.count(db.Worker.id) == 0)
158 )
160 result = await session.execute(work_pools_select_stmt) 1eca
161 work_pools = result.scalars().all() 1ca
163 for work_pool in work_pools: 1ca
164 await models.workers.update_work_pool(
165 session=session,
166 work_pool_id=work_pool.id,
167 work_pool=InternalWorkPoolUpdate(status=WorkPoolStatus.NOT_READY),
168 emit_status_change=emit_work_pool_status_event,
169 )
171 self.logger.info(f"Marked work pool {work_pool.id} as NOT_READY.")
173 @db_injector 1b
174 async def _mark_deployments_as_not_ready(self, db: PrefectDBInterface) -> None: 1b
175 """
176 Marks a deployment as NOT_READY and emits a deployment status event.
177 Emits an event and updates any bookkeeping fields on the deployment.
178 Args:
179 session (AsyncSession): The session to use for the database operation.
180 """
181 async with db.session_context(begin_transaction=True) as session: 1ca
182 status_timeout_threshold = now("UTC") - timedelta( 1ca
183 seconds=self._deployment_last_polled_timeout_seconds
184 )
185 deployment_id_select_stmt = ( 1ca
186 sa.select(db.Deployment.id)
187 .outerjoin(db.WorkQueue, db.WorkQueue.id == db.Deployment.work_queue_id)
188 .filter(db.Deployment.status == DeploymentStatus.READY)
189 .filter(db.Deployment.last_polled.isnot(None))
190 .filter(
191 sa.or_(
192 # if work_queue.last_polled doesn't exist, use only deployment's
193 # last_polled
194 sa.and_(
195 db.WorkQueue.last_polled.is_(None),
196 db.Deployment.last_polled < status_timeout_threshold,
197 ),
198 # if work_queue.last_polled exists, both times should be less than
199 # the threshold
200 sa.and_(
201 db.WorkQueue.last_polled.isnot(None),
202 db.Deployment.last_polled < status_timeout_threshold,
203 db.WorkQueue.last_polled < status_timeout_threshold,
204 ),
205 )
206 )
207 )
208 result = await session.execute(deployment_id_select_stmt) 1ca
210 deployment_ids_to_mark_unready = result.scalars().all() 1ca
212 await mark_deployments_not_ready( 1ca
213 deployment_ids=deployment_ids_to_mark_unready,
214 )
216 @db_injector 1b
217 async def _mark_work_queues_as_not_ready(self, db: PrefectDBInterface): 1b
218 """
219 Marks work queues as NOT_READY based on their last_polled field.
221 Args:
222 session (AsyncSession): The session to use for the database operation.
223 """
224 async with db.session_context(begin_transaction=True) as session: 1ca
225 status_timeout_threshold = now("UTC") - timedelta( 1ca
226 seconds=self._work_queue_last_polled_timeout_seconds
227 )
228 id_select_stmt = ( 1ca
229 sa.select(db.WorkQueue.id)
230 .outerjoin(db.WorkPool, db.WorkPool.id == db.WorkQueue.work_pool_id)
231 .filter(db.WorkQueue.status == "READY")
232 .filter(db.WorkQueue.last_polled.isnot(None))
233 .filter(db.WorkQueue.last_polled < status_timeout_threshold)
234 .order_by(db.WorkQueue.last_polled.asc())
235 )
236 result = await session.execute(id_select_stmt) 1ca
237 unready_work_queue_ids = result.scalars().all() 1ca
239 await mark_work_queues_not_ready( 1ca
240 work_queue_ids=unready_work_queue_ids,
241 )