Coverage for /usr/local/lib/python3.12/site-packages/prefect/settings/models/server/services.py: 100%

73 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from datetime import timedelta 1a

2from typing import ClassVar 1a

3 

4from pydantic import AliasChoices, AliasPath, Field 1a

5from pydantic_settings import SettingsConfigDict 1a

6 

7from prefect.settings.base import PrefectBaseSettings, build_settings_config 1a

8from prefect.types import SecondsTimeDelta 1a

9 

10 

11class ServicesBaseSetting(PrefectBaseSettings): 1a

12 enabled: bool = Field( 1a

13 default=True, 

14 description="Whether or not to start the service in the server application.", 

15 ) 

16 

17 

18class ServerServicesCancellationCleanupSettings(ServicesBaseSetting): 1a

19 """ 

20 Settings for controlling the cancellation cleanup service 

21 """ 

22 

23 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

24 ("server", "services", "cancellation_cleanup") 

25 ) 

26 

27 enabled: bool = Field( 1a

28 default=True, 

29 description="Whether or not to start the cancellation cleanup service in the server application.", 

30 validation_alias=AliasChoices( 

31 AliasPath("enabled"), 

32 "prefect_server_services_cancellation_cleanup_enabled", 

33 "prefect_api_services_cancellation_cleanup_enabled", 

34 ), 

35 ) 

36 

37 loop_seconds: float = Field( 1a

38 default=20, 

39 description="The cancellation cleanup service will look for non-terminal tasks and subflows this often. Defaults to `20`.", 

40 validation_alias=AliasChoices( 

41 AliasPath("loop_seconds"), 

42 "prefect_server_services_cancellation_cleanup_loop_seconds", 

43 "prefect_api_services_cancellation_cleanup_loop_seconds", 

44 ), 

45 ) 

46 

47 

48class ServerServicesEventPersisterSettings(ServicesBaseSetting): 1a

49 """ 

50 Settings for controlling the event persister service 

51 """ 

52 

53 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

54 ("server", "services", "event_persister") 

55 ) 

56 

57 enabled: bool = Field( 1a

58 default=True, 

59 description="Whether or not to start the event persister service in the server application.", 

60 validation_alias=AliasChoices( 

61 AliasPath("enabled"), 

62 "prefect_server_services_event_persister_enabled", 

63 "prefect_api_services_event_persister_enabled", 

64 ), 

65 ) 

66 

67 batch_size: int = Field( 1a

68 default=20, 

69 gt=0, 

70 description="The number of events the event persister will attempt to insert in one batch.", 

71 validation_alias=AliasChoices( 

72 AliasPath("batch_size"), 

73 "prefect_server_services_event_persister_batch_size", 

74 "prefect_api_services_event_persister_batch_size", 

75 ), 

76 ) 

77 

78 flush_interval: float = Field( 1a

79 default=5, 

80 gt=0.0, 

81 description="The maximum number of seconds between flushes of the event persister.", 

82 validation_alias=AliasChoices( 

83 AliasPath("flush_interval"), 

84 "prefect_server_services_event_persister_flush_interval", 

85 "prefect_api_services_event_persister_flush_interval", 

86 ), 

87 ) 

88 

89 batch_size_delete: int = Field( 1a

90 default=10_000, 

91 gt=0, 

92 description="The number of expired events and event resources the event persister will attempt to delete in one batch.", 

93 validation_alias=AliasChoices( 

94 AliasPath("batch_size_delete"), 

95 "prefect_server_services_event_persister_batch_size_delete", 

96 ), 

97 ) 

98 

99 

100class ServerServicesEventLoggerSettings(ServicesBaseSetting): 1a

101 """ 

102 Settings for controlling the event logger service 

103 """ 

104 

105 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

106 ("server", "services", "event_logger") 

107 ) 

108 

109 enabled: bool = Field( 1a

110 default=False, 

111 description="Whether or not to start the event logger service in the server application.", 

112 validation_alias=AliasChoices( 

113 AliasPath("enabled"), 

114 "prefect_server_services_event_logger_enabled", 

115 "prefect_api_services_event_logger_enabled", 

116 ), 

117 ) 

118 

119 

120class ServerServicesForemanSettings(ServicesBaseSetting): 1a

121 """ 

122 Settings for controlling the foreman service 

123 """ 

124 

125 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

126 ("server", "services", "foreman") 

127 ) 

128 

129 enabled: bool = Field( 1a

130 default=True, 

131 description="Whether or not to start the foreman service in the server application.", 

132 validation_alias=AliasChoices( 

133 AliasPath("enabled"), 

134 "prefect_server_services_foreman_enabled", 

135 "prefect_api_services_foreman_enabled", 

136 ), 

137 ) 

138 

139 loop_seconds: float = Field( 1a

140 default=15, 

141 description="The foreman service will check for offline workers this often. Defaults to `15`.", 

142 validation_alias=AliasChoices( 

143 AliasPath("loop_seconds"), 

144 "prefect_server_services_foreman_loop_seconds", 

145 "prefect_api_services_foreman_loop_seconds", 

146 ), 

147 ) 

148 

149 inactivity_heartbeat_multiple: int = Field( 1a

150 default=3, 

151 description=""" 

152 The number of heartbeats that must be missed before a worker is marked as offline. Defaults to `3`. 

153 """, 

154 validation_alias=AliasChoices( 

155 AliasPath("inactivity_heartbeat_multiple"), 

156 "prefect_server_services_foreman_inactivity_heartbeat_multiple", 

157 "prefect_api_services_foreman_inactivity_heartbeat_multiple", 

158 ), 

159 ) 

160 

161 fallback_heartbeat_interval_seconds: int = Field( 1a

162 default=30, 

163 description=""" 

164 The number of seconds to use for online/offline evaluation if a worker's heartbeat 

165 interval is not set. Defaults to `30`. 

166 """, 

167 validation_alias=AliasChoices( 

168 AliasPath("fallback_heartbeat_interval_seconds"), 

169 "prefect_server_services_foreman_fallback_heartbeat_interval_seconds", 

170 "prefect_api_services_foreman_fallback_heartbeat_interval_seconds", 

171 ), 

172 ) 

173 

174 deployment_last_polled_timeout_seconds: int = Field( 1a

175 default=60, 

176 description=""" 

177 The number of seconds before a deployment is marked as not ready if it has not been 

178 polled. Defaults to `60`. 

179 """, 

180 validation_alias=AliasChoices( 

181 AliasPath("deployment_last_polled_timeout_seconds"), 

182 "prefect_server_services_foreman_deployment_last_polled_timeout_seconds", 

183 "prefect_api_services_foreman_deployment_last_polled_timeout_seconds", 

184 ), 

185 ) 

186 

187 work_queue_last_polled_timeout_seconds: int = Field( 1a

188 default=60, 

189 description=""" 

190 The number of seconds before a work queue is marked as not ready if it has not been 

191 polled. Defaults to `60`. 

192 """, 

193 validation_alias=AliasChoices( 

194 AliasPath("work_queue_last_polled_timeout_seconds"), 

195 "prefect_server_services_foreman_work_queue_last_polled_timeout_seconds", 

196 "prefect_api_services_foreman_work_queue_last_polled_timeout_seconds", 

197 ), 

198 ) 

199 

200 

201class ServerServicesLateRunsSettings(ServicesBaseSetting): 1a

202 """ 

203 Settings for controlling the late runs service 

204 """ 

205 

206 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

207 ("server", "services", "late_runs") 

208 ) 

209 

210 enabled: bool = Field( 1a

211 default=True, 

212 description="Whether or not to start the late runs service in the server application.", 

213 validation_alias=AliasChoices( 

214 AliasPath("enabled"), 

215 "prefect_server_services_late_runs_enabled", 

216 "prefect_api_services_late_runs_enabled", 

217 ), 

218 ) 

219 

220 loop_seconds: float = Field( 1a

221 default=5, 

222 description=""" 

223 The late runs service will look for runs to mark as late this often. Defaults to `5`. 

224 """, 

225 validation_alias=AliasChoices( 

226 AliasPath("loop_seconds"), 

227 "prefect_server_services_late_runs_loop_seconds", 

228 "prefect_api_services_late_runs_loop_seconds", 

229 ), 

230 ) 

231 

232 after_seconds: SecondsTimeDelta = Field( 1a

233 default=timedelta(seconds=15), 

234 description=""" 

235 The late runs service will mark runs as late after they have exceeded their scheduled start time by this many seconds. Defaults to `5` seconds. 

236 """, 

237 validation_alias=AliasChoices( 

238 AliasPath("after_seconds"), 

239 "prefect_server_services_late_runs_after_seconds", 

240 "prefect_api_services_late_runs_after_seconds", 

241 ), 

242 ) 

243 

244 

245class ServerServicesSchedulerSettings(ServicesBaseSetting): 1a

246 """ 

247 Settings for controlling the scheduler service 

248 """ 

249 

250 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

251 ("server", "services", "scheduler") 

252 ) 

253 

254 enabled: bool = Field( 1a

255 default=True, 

256 description="Whether or not to start the scheduler service in the server application.", 

257 validation_alias=AliasChoices( 

258 AliasPath("enabled"), 

259 "prefect_server_services_scheduler_enabled", 

260 "prefect_api_services_scheduler_enabled", 

261 ), 

262 ) 

263 

264 loop_seconds: float = Field( 1a

265 default=60, 

266 description=""" 

267 The scheduler loop interval, in seconds. This determines 

268 how often the scheduler will attempt to schedule new flow runs, but has no 

269 impact on how quickly either flow runs or task runs are actually executed. 

270 Defaults to `60`. 

271 """, 

272 validation_alias=AliasChoices( 

273 AliasPath("loop_seconds"), 

274 "prefect_server_services_scheduler_loop_seconds", 

275 "prefect_api_services_scheduler_loop_seconds", 

276 ), 

277 ) 

278 

279 deployment_batch_size: int = Field( 1a

280 default=100, 

281 description=""" 

282 The number of deployments the scheduler will attempt to 

283 schedule in a single batch. If there are more deployments than the batch 

284 size, the scheduler immediately attempts to schedule the next batch; it 

285 does not sleep for `scheduler_loop_seconds` until it has visited every 

286 deployment once. Defaults to `100`. 

287 """, 

288 validation_alias=AliasChoices( 

289 AliasPath("deployment_batch_size"), 

290 "prefect_server_services_scheduler_deployment_batch_size", 

291 "prefect_api_services_scheduler_deployment_batch_size", 

292 ), 

293 ) 

294 

295 max_runs: int = Field( 1a

296 default=100, 

297 description=""" 

298 The scheduler will attempt to schedule up to this many 

299 auto-scheduled runs in the future. Note that runs may have fewer than 

300 this many scheduled runs, depending on the value of 

301 `scheduler_max_scheduled_time`. Defaults to `100`. 

302 """, 

303 validation_alias=AliasChoices( 

304 AliasPath("max_runs"), 

305 "prefect_server_services_scheduler_max_runs", 

306 "prefect_api_services_scheduler_max_runs", 

307 ), 

308 ) 

309 

310 min_runs: int = Field( 1a

311 default=3, 

312 description=""" 

313 The scheduler will attempt to schedule at least this many 

314 auto-scheduled runs in the future. Note that runs may have more than 

315 this many scheduled runs, depending on the value of 

316 `scheduler_min_scheduled_time`. Defaults to `3`. 

317 """, 

318 validation_alias=AliasChoices( 

319 AliasPath("min_runs"), 

320 "prefect_server_services_scheduler_min_runs", 

321 "prefect_api_services_scheduler_min_runs", 

322 ), 

323 ) 

324 

325 max_scheduled_time: timedelta = Field( 1a

326 default=timedelta(days=100), 

327 description=""" 

328 The scheduler will create new runs up to this far in the 

329 future. Note that this setting will take precedence over 

330 `scheduler_max_runs`: if a flow runs once a month and 

331 `scheduler_max_scheduled_time` is three months, then only three runs will be 

332 scheduled. Defaults to 100 days (`8640000` seconds). 

333 """, 

334 validation_alias=AliasChoices( 

335 AliasPath("max_scheduled_time"), 

336 "prefect_server_services_scheduler_max_scheduled_time", 

337 "prefect_api_services_scheduler_max_scheduled_time", 

338 ), 

339 ) 

340 

341 min_scheduled_time: timedelta = Field( 1a

342 default=timedelta(hours=1), 

343 description=""" 

344 The scheduler will create new runs at least this far in the 

345 future. Note that this setting will take precedence over `scheduler_min_runs`: 

346 if a flow runs every hour and `scheduler_min_scheduled_time` is three hours, 

347 then three runs will be scheduled even if `scheduler_min_runs` is 1. Defaults to 

348 """, 

349 validation_alias=AliasChoices( 

350 AliasPath("min_scheduled_time"), 

351 "prefect_server_services_scheduler_min_scheduled_time", 

352 "prefect_api_services_scheduler_min_scheduled_time", 

353 ), 

354 ) 

355 

356 insert_batch_size: int = Field( 1a

357 default=500, 

358 description=""" 

359 The number of runs the scheduler will attempt to insert in a single batch. 

360 Defaults to `500`. 

361 """, 

362 validation_alias=AliasChoices( 

363 AliasPath("insert_batch_size"), 

364 "prefect_server_services_scheduler_insert_batch_size", 

365 "prefect_api_services_scheduler_insert_batch_size", 

366 ), 

367 ) 

368 

369 recent_deployments_loop_seconds: float = Field( 1a

370 default=5, 

371 description=""" 

372 The number of seconds the recent deployments scheduler will wait between checking for recently updated deployments. Defaults to `5`. 

373 """, 

374 ) 

375 

376 

377class ServerServicesPauseExpirationsSettings(ServicesBaseSetting): 1a

378 """ 

379 Settings for controlling the pause expiration service 

380 """ 

381 

382 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

383 ("server", "services", "pause_expirations") 

384 ) 

385 

386 enabled: bool = Field( 1a

387 default=True, 

388 description=""" 

389 Whether or not to start the paused flow run expiration service in the server 

390 application. If disabled, paused flows that have timed out will remain in a Paused state 

391 until a resume attempt. 

392 """, 

393 validation_alias=AliasChoices( 

394 AliasPath("enabled"), 

395 "prefect_server_services_pause_expirations_enabled", 

396 "prefect_api_services_pause_expirations_enabled", 

397 ), 

398 ) 

399 

400 loop_seconds: float = Field( 1a

401 default=5, 

402 description=""" 

403 The pause expiration service will look for runs to mark as failed this often. Defaults to `5`. 

404 """, 

405 validation_alias=AliasChoices( 

406 AliasPath("loop_seconds"), 

407 "prefect_server_services_pause_expirations_loop_seconds", 

408 "prefect_api_services_pause_expirations_loop_seconds", 

409 ), 

410 ) 

411 

412 

413class ServerServicesRepossessorSettings(ServicesBaseSetting): 1a

414 """ 

415 Settings for controlling the repossessor service 

416 """ 

417 

418 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

419 ("server", "services", "repossessor") 

420 ) 

421 

422 enabled: bool = Field( 1a

423 default=True, 

424 description="Whether or not to start the repossessor service in the server application.", 

425 ) 

426 

427 loop_seconds: float = Field( 1a

428 default=15, 

429 description="The repossessor service will look for expired leases this often. Defaults to `15`.", 

430 ) 

431 

432 

433class ServerServicesTaskRunRecorderSettings(ServicesBaseSetting): 1a

434 """ 

435 Settings for controlling the task run recorder service 

436 """ 

437 

438 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

439 ("server", "services", "task_run_recorder") 

440 ) 

441 

442 enabled: bool = Field( 1a

443 default=True, 

444 description="Whether or not to start the task run recorder service in the server application.", 

445 validation_alias=AliasChoices( 

446 AliasPath("enabled"), 

447 "prefect_server_services_task_run_recorder_enabled", 

448 "prefect_api_services_task_run_recorder_enabled", 

449 ), 

450 ) 

451 

452 

453class ServerServicesTriggersSettings(ServicesBaseSetting): 1a

454 """ 

455 Settings for controlling the triggers service 

456 """ 

457 

458 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

459 ("server", "services", "triggers") 

460 ) 

461 

462 enabled: bool = Field( 1a

463 default=True, 

464 description="Whether or not to start the triggers service in the server application.", 

465 validation_alias=AliasChoices( 

466 AliasPath("enabled"), 

467 "prefect_server_services_triggers_enabled", 

468 "prefect_api_services_triggers_enabled", 

469 ), 

470 ) 

471 

472 pg_notify_reconnect_interval_seconds: int = Field( 1a

473 default=10, 

474 description=""" 

475 The number of seconds to wait before reconnecting to the PostgreSQL NOTIFY/LISTEN  

476 connection after an error. Only used when using PostgreSQL as the database. 

477 Defaults to `10`. 

478 """, 

479 validation_alias=AliasChoices( 

480 AliasPath("pg_notify_reconnect_interval_seconds"), 

481 "prefect_server_services_triggers_pg_notify_reconnect_interval_seconds", 

482 ), 

483 ) 

484 

485 pg_notify_heartbeat_interval_seconds: int = Field( 1a

486 default=5, 

487 description=""" 

488 The number of seconds between heartbeat checks for the PostgreSQL NOTIFY/LISTEN  

489 connection to ensure it's still alive. Only used when using PostgreSQL as the database. 

490 Defaults to `5`. 

491 """, 

492 validation_alias=AliasChoices( 

493 AliasPath("pg_notify_heartbeat_interval_seconds"), 

494 "prefect_server_services_triggers_pg_notify_heartbeat_interval_seconds", 

495 ), 

496 ) 

497 

498 

499class ServerServicesSettings(PrefectBaseSettings): 1a

500 """ 

501 Settings for controlling server services 

502 """ 

503 

504 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

505 ("server", "services") 

506 ) 

507 

508 cancellation_cleanup: ServerServicesCancellationCleanupSettings = Field( 1a

509 default_factory=ServerServicesCancellationCleanupSettings, 

510 description="Settings for controlling the cancellation cleanup service", 

511 ) 

512 event_persister: ServerServicesEventPersisterSettings = Field( 1a

513 default_factory=ServerServicesEventPersisterSettings, 

514 description="Settings for controlling the event persister service", 

515 ) 

516 event_logger: ServerServicesEventLoggerSettings = Field( 1a

517 default_factory=ServerServicesEventLoggerSettings, 

518 description="Settings for controlling the event logger service", 

519 ) 

520 foreman: ServerServicesForemanSettings = Field( 1a

521 default_factory=ServerServicesForemanSettings, 

522 description="Settings for controlling the foreman service", 

523 ) 

524 late_runs: ServerServicesLateRunsSettings = Field( 1a

525 default_factory=ServerServicesLateRunsSettings, 

526 description="Settings for controlling the late runs service", 

527 ) 

528 scheduler: ServerServicesSchedulerSettings = Field( 1a

529 default_factory=ServerServicesSchedulerSettings, 

530 description="Settings for controlling the scheduler service", 

531 ) 

532 pause_expirations: ServerServicesPauseExpirationsSettings = Field( 1a

533 default_factory=ServerServicesPauseExpirationsSettings, 

534 description="Settings for controlling the pause expiration service", 

535 ) 

536 repossessor: ServerServicesRepossessorSettings = Field( 1a

537 default_factory=ServerServicesRepossessorSettings, 

538 description="Settings for controlling the repossessor service", 

539 ) 

540 task_run_recorder: ServerServicesTaskRunRecorderSettings = Field( 1a

541 default_factory=ServerServicesTaskRunRecorderSettings, 

542 description="Settings for controlling the task run recorder service", 

543 ) 

544 triggers: ServerServicesTriggersSettings = Field( 1a

545 default_factory=ServerServicesTriggersSettings, 

546 description="Settings for controlling the triggers service", 

547 )