Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/foreman.py: 74%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Foreman is a loop service designed to monitor workers. 

3""" 

4 

5from datetime import timedelta 1a

6from typing import Any, Optional 1a

7 

8import sqlalchemy as sa 1a

9 

10from prefect.server import models 1a

11from prefect.server.database import PrefectDBInterface, db_injector 1a

12from prefect.server.models.deployments import mark_deployments_not_ready 1a

13from prefect.server.models.work_queues import mark_work_queues_not_ready 1a

14from prefect.server.models.workers import emit_work_pool_status_event 1a

15from prefect.server.schemas.internal import InternalWorkPoolUpdate 1a

16from prefect.server.schemas.statuses import ( 1a

17 DeploymentStatus, 

18 WorkerStatus, 

19 WorkPoolStatus, 

20) 

21from prefect.server.services.base import LoopService 1a

22from prefect.settings import ( 1a

23 PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS, 

24 PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS, 

25 PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE, 

26 PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS, 

27 PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS, 

28 get_current_settings, 

29) 

30from prefect.settings.models.server.services import ServicesBaseSetting 1a

31from prefect.types._datetime import now 1a

32 

33 

34class Foreman(LoopService): 1a

35 """ 

36 Monitors the status of workers and their associated work pools 

37 """ 

38 

39 @classmethod 1a

40 def service_settings(cls) -> ServicesBaseSetting: 1a

41 return get_current_settings().server.services.foreman 1b

42 

43 def __init__( 1a

44 self, 

45 loop_seconds: Optional[float] = None, 

46 inactivity_heartbeat_multiple: Optional[int] = None, 

47 fallback_heartbeat_interval_seconds: Optional[int] = None, 

48 deployment_last_polled_timeout_seconds: Optional[int] = None, 

49 work_queue_last_polled_timeout_seconds: Optional[int] = None, 

50 **kwargs: Any, 

51 ): 

52 super().__init__( 1b

53 loop_seconds=loop_seconds 

54 or PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS.value(), 

55 **kwargs, 

56 ) 

57 self._inactivity_heartbeat_multiple = ( 1b

58 PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE.value() 

59 if inactivity_heartbeat_multiple is None 

60 else inactivity_heartbeat_multiple 

61 ) 

62 self._fallback_heartbeat_interval_seconds = ( 1b

63 PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS.value() 

64 if fallback_heartbeat_interval_seconds is None 

65 else fallback_heartbeat_interval_seconds 

66 ) 

67 self._deployment_last_polled_timeout_seconds = ( 1b

68 PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS.value() 

69 if deployment_last_polled_timeout_seconds is None 

70 else deployment_last_polled_timeout_seconds 

71 ) 

72 self._work_queue_last_polled_timeout_seconds = ( 1b

73 PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS.value() 

74 if work_queue_last_polled_timeout_seconds is None 

75 else work_queue_last_polled_timeout_seconds 

76 ) 

77 

78 @db_injector 1a

79 async def run_once(self, db: PrefectDBInterface) -> None: 1a

80 """ 

81 Iterate over workers current marked as online. Mark workers as offline 

82 if they have an old last_heartbeat_time. Marks work pools as not ready 

83 if they do not have any online workers and are currently marked as ready. 

84 Mark deployments as not ready if they have a last_polled time that is 

85 older than the configured deployment last polled timeout. 

86 """ 

87 await self._mark_online_workers_without_a_recent_heartbeat_as_offline() 1bdc

88 await self._mark_work_pools_as_not_ready() 1dc

89 await self._mark_deployments_as_not_ready() 

90 await self._mark_work_queues_as_not_ready() 

91 

92 @db_injector 1a

93 async def _mark_online_workers_without_a_recent_heartbeat_as_offline( 1a

94 self, db: PrefectDBInterface 

95 ) -> None: 

96 """ 

97 Updates the status of workers that have an old last heartbeat time 

98 to OFFLINE. 

99 

100 An old heartbeat last heartbeat that is one more than 

101 their heartbeat interval multiplied by the 

102 INACTIVITY_HEARTBEAT_MULTIPLE seconds ago. 

103 

104 Args: 

105 session (AsyncSession): The session to use for the database operation. 

106 """ 

107 async with db.session_context(begin_transaction=True) as session: 1bd

108 worker_update_stmt = ( 1b

109 sa.update(db.Worker) 

110 .values(status=WorkerStatus.OFFLINE) 

111 .where( 

112 sa.func.date_diff_seconds(db.Worker.last_heartbeat_time) 

113 > ( 

114 sa.func.coalesce( 

115 db.Worker.heartbeat_interval_seconds, 

116 sa.bindparam("default_interval", sa.Integer), 

117 ) 

118 * sa.bindparam("multiplier", sa.Integer) 

119 ), 

120 db.Worker.status == WorkerStatus.ONLINE, 

121 ) 

122 ) 

123 

124 result = await session.execute( 1bdc

125 worker_update_stmt, 

126 { 

127 "multiplier": self._inactivity_heartbeat_multiple, 

128 "default_interval": self._fallback_heartbeat_interval_seconds, 

129 }, 

130 ) 

131 

132 if result.rowcount: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true1d

133 self.logger.info(f"Marked {result.rowcount} workers as offline.") 

134 

135 @db_injector 1a

136 async def _mark_work_pools_as_not_ready(self, db: PrefectDBInterface): 1a

137 """ 

138 Marks a work pool as not ready. 

139 

140 Emits and event and updates any bookkeeping fields on the work pool. 

141 

142 Args: 

143 work_pool (db.WorkPool): The work pool to mark as not ready. 

144 """ 

145 async with db.session_context(begin_transaction=True) as session: 1dc

146 work_pools_select_stmt = ( 1d

147 sa.select(db.WorkPool) 

148 .filter(db.WorkPool.status == "READY") 

149 .outerjoin( 

150 db.Worker, 

151 sa.and_( 

152 db.Worker.work_pool_id == db.WorkPool.id, 

153 db.Worker.status == "ONLINE", 

154 ), 

155 ) 

156 .group_by(db.WorkPool.id) 

157 .having(sa.func.count(db.Worker.id) == 0) 

158 ) 

159 

160 result = await session.execute(work_pools_select_stmt) 1d

161 work_pools = result.scalars().all() 

162 

163 for work_pool in work_pools: 

164 await models.workers.update_work_pool( 

165 session=session, 

166 work_pool_id=work_pool.id, 

167 work_pool=InternalWorkPoolUpdate(status=WorkPoolStatus.NOT_READY), 

168 emit_status_change=emit_work_pool_status_event, 

169 ) 

170 

171 self.logger.info(f"Marked work pool {work_pool.id} as NOT_READY.") 

172 

173 @db_injector 1a

174 async def _mark_deployments_as_not_ready(self, db: PrefectDBInterface) -> None: 1a

175 """ 

176 Marks a deployment as NOT_READY and emits a deployment status event. 

177 Emits an event and updates any bookkeeping fields on the deployment. 

178 Args: 

179 session (AsyncSession): The session to use for the database operation. 

180 """ 

181 async with db.session_context(begin_transaction=True) as session: 

182 status_timeout_threshold = now("UTC") - timedelta( 

183 seconds=self._deployment_last_polled_timeout_seconds 

184 ) 

185 deployment_id_select_stmt = ( 

186 sa.select(db.Deployment.id) 

187 .outerjoin(db.WorkQueue, db.WorkQueue.id == db.Deployment.work_queue_id) 

188 .filter(db.Deployment.status == DeploymentStatus.READY) 

189 .filter(db.Deployment.last_polled.isnot(None)) 

190 .filter( 

191 sa.or_( 

192 # if work_queue.last_polled doesn't exist, use only deployment's 

193 # last_polled 

194 sa.and_( 

195 db.WorkQueue.last_polled.is_(None), 

196 db.Deployment.last_polled < status_timeout_threshold, 

197 ), 

198 # if work_queue.last_polled exists, both times should be less than 

199 # the threshold 

200 sa.and_( 

201 db.WorkQueue.last_polled.isnot(None), 

202 db.Deployment.last_polled < status_timeout_threshold, 

203 db.WorkQueue.last_polled < status_timeout_threshold, 

204 ), 

205 ) 

206 ) 

207 ) 

208 result = await session.execute(deployment_id_select_stmt) 

209 

210 deployment_ids_to_mark_unready = result.scalars().all() 

211 

212 await mark_deployments_not_ready( 

213 deployment_ids=deployment_ids_to_mark_unready, 

214 ) 

215 

216 @db_injector 1a

217 async def _mark_work_queues_as_not_ready(self, db: PrefectDBInterface): 1a

218 """ 

219 Marks work queues as NOT_READY based on their last_polled field. 

220 

221 Args: 

222 session (AsyncSession): The session to use for the database operation. 

223 """ 

224 async with db.session_context(begin_transaction=True) as session: 

225 status_timeout_threshold = now("UTC") - timedelta( 

226 seconds=self._work_queue_last_polled_timeout_seconds 

227 ) 

228 id_select_stmt = ( 

229 sa.select(db.WorkQueue.id) 

230 .outerjoin(db.WorkPool, db.WorkPool.id == db.WorkQueue.work_pool_id) 

231 .filter(db.WorkQueue.status == "READY") 

232 .filter(db.WorkQueue.last_polled.isnot(None)) 

233 .filter(db.WorkQueue.last_polled < status_timeout_threshold) 

234 .order_by(db.WorkQueue.last_polled.asc()) 

235 ) 

236 result = await session.execute(id_select_stmt) 

237 unready_work_queue_ids = result.scalars().all() 

238 

239 await mark_work_queues_not_ready( 

240 work_queue_ids=unready_work_queue_ids, 

241 )