Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/foreman.py: 100%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Foreman is a loop service designed to monitor workers. 

3""" 

4 

5from datetime import timedelta 1b

6from typing import Any, Optional 1b

7 

8import sqlalchemy as sa 1b

9 

10from prefect.server import models 1b

11from prefect.server.database import PrefectDBInterface, db_injector 1b

12from prefect.server.models.deployments import mark_deployments_not_ready 1b

13from prefect.server.models.work_queues import mark_work_queues_not_ready 1b

14from prefect.server.models.workers import emit_work_pool_status_event 1b

15from prefect.server.schemas.internal import InternalWorkPoolUpdate 1b

16from prefect.server.schemas.statuses import ( 1b

17 DeploymentStatus, 

18 WorkerStatus, 

19 WorkPoolStatus, 

20) 

21from prefect.server.services.base import LoopService 1b

22from prefect.settings import ( 1b

23 PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS, 

24 PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS, 

25 PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE, 

26 PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS, 

27 PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS, 

28 get_current_settings, 

29) 

30from prefect.settings.models.server.services import ServicesBaseSetting 1b

31from prefect.types._datetime import now 1b

32 

33 

34class Foreman(LoopService): 1b

35 """ 

36 Monitors the status of workers and their associated work pools 

37 """ 

38 

39 @classmethod 1b

40 def service_settings(cls) -> ServicesBaseSetting: 1b

41 return get_current_settings().server.services.foreman 1d

42 

43 def __init__( 1b

44 self, 

45 loop_seconds: Optional[float] = None, 

46 inactivity_heartbeat_multiple: Optional[int] = None, 

47 fallback_heartbeat_interval_seconds: Optional[int] = None, 

48 deployment_last_polled_timeout_seconds: Optional[int] = None, 

49 work_queue_last_polled_timeout_seconds: Optional[int] = None, 

50 **kwargs: Any, 

51 ): 

52 super().__init__( 1da

53 loop_seconds=loop_seconds 

54 or PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS.value(), 

55 **kwargs, 

56 ) 

57 self._inactivity_heartbeat_multiple = ( 1d

58 PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE.value() 

59 if inactivity_heartbeat_multiple is None 

60 else inactivity_heartbeat_multiple 

61 ) 

62 self._fallback_heartbeat_interval_seconds = ( 1da

63 PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS.value() 

64 if fallback_heartbeat_interval_seconds is None 

65 else fallback_heartbeat_interval_seconds 

66 ) 

67 self._deployment_last_polled_timeout_seconds = ( 1d

68 PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS.value() 

69 if deployment_last_polled_timeout_seconds is None 

70 else deployment_last_polled_timeout_seconds 

71 ) 

72 self._work_queue_last_polled_timeout_seconds = ( 1d

73 PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS.value() 

74 if work_queue_last_polled_timeout_seconds is None 

75 else work_queue_last_polled_timeout_seconds 

76 ) 

77 

78 @db_injector 1b

79 async def run_once(self, db: PrefectDBInterface) -> None: 1b

80 """ 

81 Iterate over workers current marked as online. Mark workers as offline 

82 if they have an old last_heartbeat_time. Marks work pools as not ready 

83 if they do not have any online workers and are currently marked as ready. 

84 Mark deployments as not ready if they have a last_polled time that is 

85 older than the configured deployment last polled timeout. 

86 """ 

87 await self._mark_online_workers_without_a_recent_heartbeat_as_offline() 1deca

88 await self._mark_work_pools_as_not_ready() 1eca

89 await self._mark_deployments_as_not_ready() 1ca

90 await self._mark_work_queues_as_not_ready() 1ca

91 

92 @db_injector 1b

93 async def _mark_online_workers_without_a_recent_heartbeat_as_offline( 1b

94 self, db: PrefectDBInterface 

95 ) -> None: 

96 """ 

97 Updates the status of workers that have an old last heartbeat time 

98 to OFFLINE. 

99 

100 An old heartbeat last heartbeat that is one more than 

101 their heartbeat interval multiplied by the 

102 INACTIVITY_HEARTBEAT_MULTIPLE seconds ago. 

103 

104 Args: 

105 session (AsyncSession): The session to use for the database operation. 

106 """ 

107 async with db.session_context(begin_transaction=True) as session: 1deca

108 worker_update_stmt = ( 1dca

109 sa.update(db.Worker) 

110 .values(status=WorkerStatus.OFFLINE) 

111 .where( 

112 sa.func.date_diff_seconds(db.Worker.last_heartbeat_time) 

113 > ( 

114 sa.func.coalesce( 

115 db.Worker.heartbeat_interval_seconds, 

116 sa.bindparam("default_interval", sa.Integer), 

117 ) 

118 * sa.bindparam("multiplier", sa.Integer) 

119 ), 

120 db.Worker.status == WorkerStatus.ONLINE, 

121 ) 

122 ) 

123 

124 result = await session.execute( 1deca

125 worker_update_stmt, 

126 { 

127 "multiplier": self._inactivity_heartbeat_multiple, 

128 "default_interval": self._fallback_heartbeat_interval_seconds, 

129 }, 

130 ) 

131 

132 if result.rowcount: 1eca

133 self.logger.info(f"Marked {result.rowcount} workers as offline.") 

134 

135 @db_injector 1b

136 async def _mark_work_pools_as_not_ready(self, db: PrefectDBInterface): 1b

137 """ 

138 Marks a work pool as not ready. 

139 

140 Emits and event and updates any bookkeeping fields on the work pool. 

141 

142 Args: 

143 work_pool (db.WorkPool): The work pool to mark as not ready. 

144 """ 

145 async with db.session_context(begin_transaction=True) as session: 1eca

146 work_pools_select_stmt = ( 1eca

147 sa.select(db.WorkPool) 

148 .filter(db.WorkPool.status == "READY") 

149 .outerjoin( 

150 db.Worker, 

151 sa.and_( 

152 db.Worker.work_pool_id == db.WorkPool.id, 

153 db.Worker.status == "ONLINE", 

154 ), 

155 ) 

156 .group_by(db.WorkPool.id) 

157 .having(sa.func.count(db.Worker.id) == 0) 

158 ) 

159 

160 result = await session.execute(work_pools_select_stmt) 1eca

161 work_pools = result.scalars().all() 1ca

162 

163 for work_pool in work_pools: 1ca

164 await models.workers.update_work_pool( 

165 session=session, 

166 work_pool_id=work_pool.id, 

167 work_pool=InternalWorkPoolUpdate(status=WorkPoolStatus.NOT_READY), 

168 emit_status_change=emit_work_pool_status_event, 

169 ) 

170 

171 self.logger.info(f"Marked work pool {work_pool.id} as NOT_READY.") 

172 

173 @db_injector 1b

174 async def _mark_deployments_as_not_ready(self, db: PrefectDBInterface) -> None: 1b

175 """ 

176 Marks a deployment as NOT_READY and emits a deployment status event. 

177 Emits an event and updates any bookkeeping fields on the deployment. 

178 Args: 

179 session (AsyncSession): The session to use for the database operation. 

180 """ 

181 async with db.session_context(begin_transaction=True) as session: 1ca

182 status_timeout_threshold = now("UTC") - timedelta( 1ca

183 seconds=self._deployment_last_polled_timeout_seconds 

184 ) 

185 deployment_id_select_stmt = ( 1ca

186 sa.select(db.Deployment.id) 

187 .outerjoin(db.WorkQueue, db.WorkQueue.id == db.Deployment.work_queue_id) 

188 .filter(db.Deployment.status == DeploymentStatus.READY) 

189 .filter(db.Deployment.last_polled.isnot(None)) 

190 .filter( 

191 sa.or_( 

192 # if work_queue.last_polled doesn't exist, use only deployment's 

193 # last_polled 

194 sa.and_( 

195 db.WorkQueue.last_polled.is_(None), 

196 db.Deployment.last_polled < status_timeout_threshold, 

197 ), 

198 # if work_queue.last_polled exists, both times should be less than 

199 # the threshold 

200 sa.and_( 

201 db.WorkQueue.last_polled.isnot(None), 

202 db.Deployment.last_polled < status_timeout_threshold, 

203 db.WorkQueue.last_polled < status_timeout_threshold, 

204 ), 

205 ) 

206 ) 

207 ) 

208 result = await session.execute(deployment_id_select_stmt) 1ca

209 

210 deployment_ids_to_mark_unready = result.scalars().all() 1ca

211 

212 await mark_deployments_not_ready( 1ca

213 deployment_ids=deployment_ids_to_mark_unready, 

214 ) 

215 

216 @db_injector 1b

217 async def _mark_work_queues_as_not_ready(self, db: PrefectDBInterface): 1b

218 """ 

219 Marks work queues as NOT_READY based on their last_polled field. 

220 

221 Args: 

222 session (AsyncSession): The session to use for the database operation. 

223 """ 

224 async with db.session_context(begin_transaction=True) as session: 1ca

225 status_timeout_threshold = now("UTC") - timedelta( 1ca

226 seconds=self._work_queue_last_polled_timeout_seconds 

227 ) 

228 id_select_stmt = ( 1ca

229 sa.select(db.WorkQueue.id) 

230 .outerjoin(db.WorkPool, db.WorkPool.id == db.WorkQueue.work_pool_id) 

231 .filter(db.WorkQueue.status == "READY") 

232 .filter(db.WorkQueue.last_polled.isnot(None)) 

233 .filter(db.WorkQueue.last_polled < status_timeout_threshold) 

234 .order_by(db.WorkQueue.last_polled.asc()) 

235 ) 

236 result = await session.execute(id_select_stmt) 1ca

237 unready_work_queue_ids = result.scalars().all() 1ca

238 

239 await mark_work_queues_not_ready( 1ca

240 work_queue_ids=unready_work_queue_ids, 

241 )