Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/server.py: 27%

334 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Command line interface for working with the Prefect API and server. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import asyncio 1a

8import inspect 1a

9import os 1a

10import shlex 1a

11import signal 1a

12import socket 1a

13import subprocess 1a

14import sys 1a

15import textwrap 1a

16from pathlib import Path 1a

17from typing import TYPE_CHECKING 1a

18 

19import typer 1a

20import uvicorn 1a

21from rich.table import Table 1a

22from rich.text import Text 1a

23 

24import prefect 1a

25import prefect.settings 1a

26from prefect.cli._prompts import prompt 1a

27from prefect.cli._types import PrefectTyper, SettingsOption 1a

28from prefect.cli._utilities import exit_with_error, exit_with_success 1a

29from prefect.cli.cloud import prompt_select_from_list 1a

30from prefect.cli.root import app, is_interactive 1a

31from prefect.logging import get_logger 1a

32from prefect.settings import ( 1a

33 PREFECT_API_SERVICES_LATE_RUNS_ENABLED, 

34 PREFECT_API_SERVICES_SCHEDULER_ENABLED, 

35 PREFECT_API_URL, 

36 PREFECT_HOME, 

37 PREFECT_SERVER_ANALYTICS_ENABLED, 

38 PREFECT_SERVER_API_BASE_PATH, 

39 PREFECT_SERVER_API_HOST, 

40 PREFECT_SERVER_API_KEEPALIVE_TIMEOUT, 

41 PREFECT_SERVER_API_PORT, 

42 PREFECT_SERVER_LOGGING_LEVEL, 

43 PREFECT_UI_ENABLED, 

44 Profile, 

45 get_current_settings, 

46 load_current_profile, 

47 load_profiles, 

48 save_profiles, 

49 update_current_profile, 

50) 

51from prefect.settings.context import temporary_settings 1a

52from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a

53 

54if TYPE_CHECKING: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true1a

55 import logging 

56 

57server_app: PrefectTyper = PrefectTyper( 1a

58 name="server", 

59 help="Start a Prefect server instance and interact with the database", 

60) 

61database_app: PrefectTyper = PrefectTyper( 1a

62 name="database", help="Interact with the database." 

63) 

64services_app: PrefectTyper = PrefectTyper( 1a

65 name="services", help="Interact with server loop services." 

66) 

67server_app.add_typer(database_app) 1a

68server_app.add_typer(services_app) 1a

69app.add_typer(server_app) 1a

70 

71logger: "logging.Logger" = get_logger(__name__) 1a

72 

73SERVER_PID_FILE_NAME = "server.pid" 1a

74SERVICES_PID_FILE = Path(PREFECT_HOME.value()) / "services.pid" 1a

75 

76 

77def generate_welcome_blurb(base_url: str, ui_enabled: bool) -> str: 1a

78 if PREFECT_SERVER_API_BASE_PATH: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true1a

79 suffix = PREFECT_SERVER_API_BASE_PATH.value() 

80 else: 

81 suffix = "/api" 1a

82 

83 blurb = textwrap.dedent( 1a

84 r""" 

85 ___ ___ ___ ___ ___ ___ _____ 

86 | _ \ _ \ __| __| __/ __|_ _| 

87 | _/ / _|| _|| _| (__ | | 

88 |_| |_|_\___|_| |___\___| |_| 

89 

90 Configure Prefect to communicate with the server with: 

91 

92 prefect config set PREFECT_API_URL={api_url} 

93 

94 View the API reference documentation at {docs_url} 

95 """ 

96 ).format(api_url=base_url + suffix, docs_url=base_url + "/docs") 

97 

98 visit_dashboard = textwrap.dedent( 1a

99 f""" 

100 Check out the dashboard at {base_url} 

101 """ 

102 ) 

103 

104 dashboard_not_built = textwrap.dedent( 1a

105 """ 

106 The dashboard is not built. It looks like you're on a development version. 

107 See `prefect dev` for development commands. 

108 """ 

109 ) 

110 

111 dashboard_disabled = textwrap.dedent( 1a

112 """ 

113 The dashboard is disabled. Set `PREFECT_UI_ENABLED=1` to re-enable it. 

114 """ 

115 ) 

116 

117 if not os.path.exists(prefect.__ui_static_path__): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true1a

118 blurb += dashboard_not_built 

119 elif not ui_enabled: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true1a

120 blurb += dashboard_disabled 

121 else: 

122 blurb += visit_dashboard 1a

123 

124 return blurb 1a

125 

126 

127def prestart_check(base_url: str) -> None: 1a

128 """ 

129 Check if `PREFECT_API_URL` is set in the current profile. If not, prompt the user to set it. 

130 

131 Args: 

132 base_url: The base URL the server will be running on 

133 """ 

134 api_url = f"{base_url}/api" 

135 current_profile = load_current_profile() 

136 profiles = load_profiles() 

137 if current_profile and PREFECT_API_URL not in current_profile.settings: 

138 profiles_with_matching_url = [ 

139 name 

140 for name, profile in profiles.items() 

141 if profile.settings.get(PREFECT_API_URL) == api_url 

142 ] 

143 if len(profiles_with_matching_url) == 1: 

144 profiles.set_active(profiles_with_matching_url[0]) 

145 save_profiles(profiles) 

146 app.console.print( 

147 f"Switched to profile {profiles_with_matching_url[0]!r}", 

148 style="green", 

149 ) 

150 return 

151 elif len(profiles_with_matching_url) > 1: 

152 app.console.print( 

153 "Your current profile doesn't have `PREFECT_API_URL` set to the address" 

154 " of the server that's running. Some of your other profiles do." 

155 ) 

156 selected_profile = prompt_select_from_list( 

157 app.console, 

158 "Which profile would you like to switch to?", 

159 sorted( 

160 [profile for profile in profiles_with_matching_url], 

161 ), 

162 ) 

163 profiles.set_active(selected_profile) 

164 save_profiles(profiles) 

165 app.console.print( 

166 f"Switched to profile {selected_profile!r}", style="green" 

167 ) 

168 return 

169 

170 app.console.print( 

171 "The `PREFECT_API_URL` setting for your current profile doesn't match the" 

172 " address of the server that's running. You need to set it to communicate" 

173 " with the server.", 

174 style="yellow", 

175 ) 

176 

177 choice = prompt_select_from_list( 

178 app.console, 

179 "How would you like to proceed?", 

180 [ 

181 ( 

182 "create", 

183 "Create a new profile with `PREFECT_API_URL` set and switch to it", 

184 ), 

185 ( 

186 "set", 

187 f"Set `PREFECT_API_URL` in the current profile: {current_profile.name!r}", 

188 ), 

189 ], 

190 ) 

191 

192 if choice == "create": 

193 while True: 

194 profile_name = prompt("Enter a new profile name") 

195 if profile_name in profiles: 

196 app.console.print( 

197 f"Profile {profile_name!r} already exists. Please choose a different name.", 

198 style="red", 

199 ) 

200 else: 

201 break 

202 

203 profiles.add_profile( 

204 Profile( 

205 name=profile_name, settings={PREFECT_API_URL: f"{base_url}/api"} 

206 ) 

207 ) 

208 profiles.set_active(profile_name) 

209 save_profiles(profiles) 

210 

211 app.console.print( 

212 f"Switched to new profile {profile_name!r}", style="green" 

213 ) 

214 elif choice == "set": 

215 api_url = prompt( 

216 "Enter the `PREFECT_API_URL` value", default="http://127.0.0.1:4200/api" 

217 ) 

218 update_current_profile({PREFECT_API_URL: api_url}) 

219 app.console.print( 

220 f"Set `PREFECT_API_URL` to {api_url!r} in the current profile {current_profile.name!r}", 

221 style="green", 

222 ) 

223 

224 

225def _validate_multi_worker(workers: int) -> None: 1a

226 """ 

227 Validates the configuration for running multiple Prefect server workers. 

228 

229 Multi-worker mode requires specific infrastructure components to ensure proper 

230 coordination and data consistency across worker processes: 

231 

232 - **Database**: PostgreSQL is required (SQLite causes database locking issues) 

233 - **Messaging**: Redis is required for event messaging (in-memory messaging 

234 doesn't work across processes) 

235 

236 Args: 

237 workers: The number of worker processes to run. Must be >= 1. 

238 

239 Raises: 

240 exit_with_error: If the configuration is invalid. 

241 """ 

242 from prefect.server.utilities.database import get_dialect 1a

243 

244 if workers == 1: 244 ↛ 247line 244 didn't jump to line 247 because the condition on line 244 was always true1a

245 return 1a

246 

247 if workers < 1: 

248 exit_with_error("Number of workers must be >= 1") 

249 

250 settings = get_current_settings() 

251 

252 try: 

253 dialect = get_dialect( 

254 settings.server.database.connection_url.get_secret_value() 

255 ) 

256 except Exception as e: 

257 exit_with_error(f"Unable to validate database configuration: {e}") 

258 

259 if dialect.name != "postgresql": 

260 exit_with_error( 

261 "Multi-worker mode (--workers > 1) is not supported with SQLite database." 

262 ) 

263 

264 try: 

265 messaging_cache = settings.server.events.messaging_cache 

266 messaging_broker = settings.server.events.messaging_broker 

267 causal_ordering = settings.server.events.causal_ordering 

268 lease_storage = settings.server.concurrency.lease_storage 

269 except Exception as e: 

270 exit_with_error(f"Unable to validate messaging configuration: {e}") 

271 

272 if ( 

273 messaging_cache == "prefect.server.utilities.messaging.memory" 

274 or messaging_broker == "prefect.server.utilities.messaging.memory" 

275 or causal_ordering == "prefect.server.events.ordering.memory" 

276 or lease_storage == "prefect.server.concurrency.lease_storage.memory" 

277 ): 

278 error_message = textwrap.dedent( 

279 """ 

280 Multi-worker mode (--workers > 1) requires Redis for messaging and lease storage. 

281  

282 Please configure the following settings to use Redis: 

283  

284 prefect config set PREFECT_MESSAGING_BROKER="prefect_redis.messaging" 

285 prefect config set PREFECT_MESSAGING_CACHE="prefect_redis.messaging" 

286 prefect config set PREFECT_SERVER_EVENTS_CAUSAL_ORDERING="prefect_redis.ordering" 

287 prefect config set PREFECT_SERVER_CONCURRENCY_LEASE_STORAGE="prefect_redis.lease_storage" 

288  

289 You'll also need to configure your Redis connection: 

290  

291 export PREFECT_REDIS_MESSAGING_HOST="your-redis-host" 

292 export PREFECT_REDIS_MESSAGING_PORT="6379" 

293 export PREFECT_REDIS_MESSAGING_DB="0" 

294  

295 For complete setup instructions, see: 

296 https://docs.prefect.io/v3/how-to-guides/self-hosted/server-cli#multi-worker-api-server 

297 https://docs.prefect.io/v3/advanced/self-hosted#redis-setup 

298 """ 

299 ).strip() 

300 exit_with_error(error_message) 

301 

302 

303@server_app.command() 1a

304def start( 1a

305 host: str = SettingsOption(PREFECT_SERVER_API_HOST), 

306 port: int = SettingsOption(PREFECT_SERVER_API_PORT), 

307 keep_alive_timeout: int = SettingsOption(PREFECT_SERVER_API_KEEPALIVE_TIMEOUT), 

308 log_level: str = SettingsOption(PREFECT_SERVER_LOGGING_LEVEL), 

309 scheduler: bool = SettingsOption(PREFECT_API_SERVICES_SCHEDULER_ENABLED), 

310 analytics: bool = SettingsOption( 

311 PREFECT_SERVER_ANALYTICS_ENABLED, "--analytics-on/--analytics-off" 

312 ), 

313 late_runs: bool = SettingsOption(PREFECT_API_SERVICES_LATE_RUNS_ENABLED), 

314 ui: bool = SettingsOption(PREFECT_UI_ENABLED), 

315 no_services: bool = typer.Option( 

316 False, "--no-services", help="Only run the webserver API and UI" 

317 ), 

318 background: bool = typer.Option( 

319 False, "--background", "-b", help="Run the server in the background" 

320 ), 

321 workers: int = typer.Option( 

322 1, 

323 "--workers", 

324 help="Number of worker processes to run. Only runs the webserver API and UI", 

325 ), 

326): 

327 """ 

328 Start a Prefect server instance 

329 """ 

330 base_url = f"http://{host}:{port}" 1a

331 if is_interactive(): 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true1a

332 try: 

333 prestart_check(base_url) 

334 except Exception: 

335 pass 

336 

337 if workers > 1: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true1a

338 no_services = True 

339 _validate_multi_worker(workers) 1a

340 

341 server_settings = { 1a

342 "PREFECT_API_SERVICES_SCHEDULER_ENABLED": str(scheduler), 

343 "PREFECT_SERVER_ANALYTICS_ENABLED": str(analytics), 

344 "PREFECT_API_SERVICES_LATE_RUNS_ENABLED": str(late_runs), 

345 "PREFECT_UI_ENABLED": str(ui), 

346 "PREFECT_SERVER_LOGGING_LEVEL": log_level, 

347 } 

348 

349 if no_services: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true1a

350 server_settings["PREFECT_SERVER_ANALYTICS_ENABLED"] = "False" 

351 

352 pid_file = Path(PREFECT_HOME.value()) / SERVER_PID_FILE_NAME 1a

353 # check if port is already in use 

354 try: 1a

355 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1a

356 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1a

357 s.bind((host, port)) 1a

358 except socket.gaierror: 

359 exit_with_error( 

360 f"Invalid host '{host}'. Please specify a valid hostname or IP address." 

361 ) 

362 except socket.error: 

363 if pid_file.exists(): 

364 exit_with_error( 

365 f"A background server process is already running on port {port}. " 

366 "Run `prefect server stop` to stop it or specify a different port " 

367 "with the `--port` flag." 

368 ) 

369 exit_with_error( 

370 f"Port {port} is already in use. Please specify a different port with the " 

371 "`--port` flag." 

372 ) 

373 

374 # check if server is already running in the background 

375 if background: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true1a

376 try: 

377 pid_file.touch(mode=0o600, exist_ok=False) 

378 except FileExistsError: 

379 exit_with_error( 

380 "A server is already running in the background. To stop it," 

381 " run `prefect server stop`." 

382 ) 

383 

384 app.console.print(generate_welcome_blurb(base_url, ui_enabled=ui)) 1a

385 app.console.print("\n") 1a

386 

387 if workers > 1: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true1a

388 app.console.print( 

389 f"Starting server with {workers} worker processes.\n", style="blue" 

390 ) 

391 

392 if background: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true1a

393 _run_in_background( 

394 pid_file, 

395 server_settings, 

396 host, 

397 port, 

398 keep_alive_timeout, 

399 no_services, 

400 workers, 

401 ) 

402 else: 

403 _run_in_foreground( 1a

404 server_settings, 

405 host, 

406 port, 

407 keep_alive_timeout, 

408 no_services, 

409 workers, 

410 ) 

411 

412 

413def _run_in_background( 1a

414 pid_file: Path, 

415 server_settings: dict[str, str], 

416 host: str, 

417 port: int, 

418 keep_alive_timeout: int, 

419 no_services: bool, 

420 workers: int, 

421) -> None: 

422 command = [ 

423 sys.executable, 

424 "-m", 

425 "uvicorn", 

426 "--app-dir", 

427 str(prefect.__module_path__.parent), 

428 "--factory", 

429 "prefect.server.api.server:create_app", 

430 "--host", 

431 str(host), 

432 "--port", 

433 str(port), 

434 "--timeout-keep-alive", 

435 str(keep_alive_timeout), 

436 "--workers", 

437 str(workers), 

438 ] 

439 logger.debug("Opening server process with command: %s", shlex.join(command)) 

440 

441 env = {**os.environ, **server_settings, "PREFECT__SERVER_FINAL": "1"} 

442 if no_services: 

443 env["PREFECT__SERVER_WEBSERVER_ONLY"] = "1" 

444 

445 process = subprocess.Popen( 

446 command, 

447 env=env, 

448 stdout=subprocess.PIPE, 

449 stderr=subprocess.PIPE, 

450 ) 

451 

452 process_id = process.pid 

453 pid_file.write_text(str(process_id)) 

454 

455 app.console.print( 

456 "The Prefect server is running in the background. Run `prefect" 

457 " server stop` to stop it." 

458 ) 

459 

460 

461def _run_in_foreground( 1a

462 server_settings: dict[str, str], 

463 host: str, 

464 port: int, 

465 keep_alive_timeout: int, 

466 no_services: bool, 

467 workers: int, 

468) -> None: 

469 from prefect.server.api.server import create_app 1a

470 

471 try: 1a

472 with temporary_settings( 1a

473 {getattr(prefect.settings, k): v for k, v in server_settings.items()} 

474 ): 

475 if workers == 1: 475 ↛ 488line 475 didn't jump to line 488 because the condition on line 475 was always true1a

476 uvicorn.run( 1a

477 app=create_app(final=True, webserver_only=no_services), 

478 app_dir=str(prefect.__module_path__.parent), 

479 host=host, 

480 port=port, 

481 timeout_keep_alive=keep_alive_timeout, 

482 log_level=server_settings.get( 

483 "PREFECT_SERVER_LOGGING_LEVEL", "info" 

484 ).lower(), 

485 ) 

486 

487 else: 

488 os.environ["PREFECT__SERVER_FINAL"] = "1" 

489 os.environ["PREFECT__SERVER_WEBSERVER_ONLY"] = "1" 

490 

491 uvicorn.run( 

492 app="prefect.server.api.server:create_app", 

493 factory=True, 

494 host=host, 

495 port=port, 

496 timeout_keep_alive=keep_alive_timeout, 

497 log_level=server_settings.get( 

498 "PREFECT_SERVER_LOGGING_LEVEL", "info" 

499 ).lower(), 

500 workers=workers, 

501 ) 

502 

503 finally: 

504 app.console.print("Server stopped!") 

505 

506 

507@server_app.command() 1a

508async def stop(): 1a

509 """Stop a Prefect server instance running in the background""" 

510 pid_file = Path(PREFECT_HOME.value()) / SERVER_PID_FILE_NAME 

511 if not pid_file.exists(): 

512 exit_with_success("No server running in the background.") 

513 pid = int(pid_file.read_text()) 

514 try: 

515 os.kill(pid, signal.SIGTERM) 

516 except ProcessLookupError: 

517 exit_with_success( 

518 "The server process is not running. Cleaning up stale PID file." 

519 ) 

520 finally: 

521 # The file probably exists, but use `missing_ok` to avoid an 

522 # error if the file was deleted by another actor 

523 pid_file.unlink(missing_ok=True) 

524 app.console.print("Server stopped!") 

525 

526 

527@database_app.command() 1a

528async def reset(yes: bool = typer.Option(False, "--yes", "-y")): 1a

529 """Drop and recreate all Prefect database tables""" 

530 from prefect.server.database import provide_database_interface 

531 

532 db = provide_database_interface() 

533 engine = await db.engine() 

534 if not yes: 

535 confirm = typer.confirm( 

536 "Are you sure you want to reset the Prefect database located " 

537 f'at "{engine.url!r}"? This will drop and recreate all tables.' 

538 ) 

539 if not confirm: 

540 exit_with_error("Database reset aborted") 

541 app.console.print("Downgrading database...") 

542 await db.drop_db() 

543 app.console.print("Upgrading database...") 

544 await db.create_db() 

545 exit_with_success(f'Prefect database "{engine.url!r}" reset!') 

546 

547 

548@database_app.command() 1a

549async def upgrade( 1a

550 yes: bool = typer.Option(False, "--yes", "-y"), 

551 revision: str = typer.Option( 

552 "head", 

553 "-r", 

554 help=( 

555 "The revision to pass to `alembic upgrade`. If not provided, runs all" 

556 " migrations." 

557 ), 

558 ), 

559 dry_run: bool = typer.Option( 

560 False, 

561 help=( 

562 "Flag to show what migrations would be made without applying them. Will" 

563 " emit sql statements to stdout." 

564 ), 

565 ), 

566): 

567 """Upgrade the Prefect database""" 

568 from prefect.server.database import provide_database_interface 

569 from prefect.server.database.alembic_commands import alembic_upgrade 

570 

571 db = provide_database_interface() 

572 engine = await db.engine() 

573 

574 if not yes: 

575 confirm = typer.confirm( 

576 f"Are you sure you want to upgrade the Prefect database at {engine.url!r}?" 

577 ) 

578 if not confirm: 

579 exit_with_error("Database upgrade aborted!") 

580 

581 app.console.print("Running upgrade migrations ...") 

582 await run_sync_in_worker_thread(alembic_upgrade, revision=revision, dry_run=dry_run) 

583 app.console.print("Migrations succeeded!") 

584 exit_with_success(f"Prefect database at {engine.url!r} upgraded!") 

585 

586 

587@database_app.command() 1a

588async def downgrade( 1a

589 yes: bool = typer.Option(False, "--yes", "-y"), 

590 revision: str = typer.Option( 

591 "-1", 

592 "-r", 

593 help=( 

594 "The revision to pass to `alembic downgrade`. If not provided, " 

595 "downgrades to the most recent revision. Use 'base' to run all " 

596 "migrations." 

597 ), 

598 ), 

599 dry_run: bool = typer.Option( 

600 False, 

601 help=( 

602 "Flag to show what migrations would be made without applying them. Will" 

603 " emit sql statements to stdout." 

604 ), 

605 ), 

606): 

607 """Downgrade the Prefect database""" 

608 from prefect.server.database import provide_database_interface 

609 from prefect.server.database.alembic_commands import alembic_downgrade 

610 

611 db = provide_database_interface() 

612 

613 engine = await db.engine() 

614 

615 if not yes: 

616 confirm = typer.confirm( 

617 "Are you sure you want to downgrade the Prefect " 

618 f"database at {engine.url!r}?" 

619 ) 

620 if not confirm: 

621 exit_with_error("Database downgrade aborted!") 

622 

623 app.console.print("Running downgrade migrations ...") 

624 await run_sync_in_worker_thread( 

625 alembic_downgrade, revision=revision, dry_run=dry_run 

626 ) 

627 app.console.print("Migrations succeeded!") 

628 exit_with_success(f"Prefect database at {engine.url!r} downgraded!") 

629 

630 

631@database_app.command() 1a

632async def revision( 1a

633 message: str = typer.Option( 

634 None, 

635 "--message", 

636 "-m", 

637 help="A message to describe the migration.", 

638 ), 

639 autogenerate: bool = False, 

640): 

641 """Create a new migration for the Prefect database""" 

642 from prefect.server.database.alembic_commands import alembic_revision 

643 

644 app.console.print("Running migration file creation ...") 

645 await run_sync_in_worker_thread( 

646 alembic_revision, 

647 message=message, 

648 autogenerate=autogenerate, 

649 ) 

650 exit_with_success("Creating new migration file succeeded!") 

651 

652 

653@database_app.command() 1a

654async def stamp(revision: str): 1a

655 """Stamp the revision table with the given revision; don't run any migrations""" 

656 from prefect.server.database.alembic_commands import alembic_stamp 

657 

658 app.console.print("Stamping database with revision ...") 

659 await run_sync_in_worker_thread(alembic_stamp, revision=revision) 

660 exit_with_success("Stamping database with revision succeeded!") 

661 

662 

663def _is_process_running(pid: int) -> bool: 1a

664 """Check if a process is running by attempting to send signal 0.""" 

665 try: 

666 os.kill(pid, 0) 

667 return True 

668 except (ProcessLookupError, OSError): 

669 return False 

670 

671 

672def _read_pid_file(path: Path) -> int | None: 1a

673 """Read and validate a PID from a file.""" 

674 try: 

675 return int(path.read_text()) 

676 except (ValueError, OSError, FileNotFoundError): 

677 return None 

678 

679 

680def _write_pid_file(path: Path, pid: int) -> None: 1a

681 """Write a PID to a file, creating parent directories if needed.""" 

682 path.parent.mkdir(parents=True, exist_ok=True) 

683 path.write_text(str(pid)) 

684 

685 

686def _cleanup_pid_file(path: Path) -> None: 1a

687 """Remove PID file and try to cleanup empty parent directory.""" 

688 path.unlink(missing_ok=True) 

689 try: 

690 path.parent.rmdir() 

691 except OSError: 

692 pass 

693 

694 

695# this is a hidden command used by the `prefect server services start --background` command 

696@services_app.command(hidden=True, name="manager") 1a

697def run_manager_process(): 1a

698 """ 

699 This is an internal entrypoint used by `prefect server services start --background`. 

700 Users do not call this directly. 

701 

702 We do everything in sync so that the child won't exit until the user kills it. 

703 """ 

704 from prefect.server.services.base import Service 

705 

706 if not Service.enabled_services(): 

707 logger.error("No services are enabled! Exiting manager.") 

708 sys.exit(1) 

709 

710 logger.debug("Manager process started. Starting services...") 

711 try: 

712 asyncio.run(Service.run_services()) 

713 except KeyboardInterrupt: 

714 pass 

715 finally: 

716 logger.debug("Manager process has exited.") 

717 

718 

719# public, user-facing `prefect server services` commands 

720@services_app.command(aliases=["ls"]) 1a

721def list_services(): 1a

722 """List all available services and their status.""" 

723 from prefect.server.services.base import Service 

724 

725 table = Table(title="Available Services", expand=True) 

726 table.add_column("Name", no_wrap=True) 

727 table.add_column("Enabled?", no_wrap=True) 

728 table.add_column("Description", style="cyan", no_wrap=False) 

729 

730 for svc in Service.all_services(): 

731 name = svc.__name__ 

732 

733 setting_text = Text(f"{svc.environment_variable_name()}", style="green") 

734 if not svc.enabled(): 

735 setting_text = Text(f"x {svc.environment_variable_name()}", style="gray50") 

736 

737 doc = inspect.getdoc(svc) or "" 

738 description = doc.split("\n", 1)[0].strip() 

739 

740 table.add_row(name, setting_text, description) 

741 

742 app.console.print(table) 

743 

744 

745@services_app.command(aliases=["start"]) 1a

746def start_services( 1a

747 background: bool = typer.Option( 

748 False, "--background", "-b", help="Run the services in the background" 

749 ), 

750): 

751 """Start all enabled Prefect services in one process.""" 

752 from prefect.server.services.base import Service 

753 

754 SERVICES_PID_FILE.parent.mkdir(parents=True, exist_ok=True) 

755 

756 if SERVICES_PID_FILE.exists(): 

757 pid = _read_pid_file(SERVICES_PID_FILE) 

758 if pid is not None and _is_process_running(pid): 

759 app.console.print( 

760 "\n[yellow]Services are already running in the background.[/]" 

761 "\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]" 

762 ) 

763 raise typer.Exit(code=1) 

764 else: 

765 # Stale or invalid file 

766 _cleanup_pid_file(SERVICES_PID_FILE) 

767 

768 if not Service.enabled_services(): 

769 app.console.print("[red]No services are enabled![/]") 

770 raise typer.Exit(code=1) 

771 

772 if not background: 

773 app.console.print("\n[blue]Starting services... Press CTRL+C to stop[/]\n") 

774 try: 

775 asyncio.run(Service.run_services()) 

776 except KeyboardInterrupt: 

777 pass 

778 app.console.print("\n[green]All services stopped.[/]") 

779 return 

780 

781 process = subprocess.Popen( 

782 [ 

783 "prefect", 

784 "server", 

785 "services", 

786 "manager", 

787 ], 

788 env=os.environ.copy(), 

789 stdout=subprocess.DEVNULL, 

790 stderr=subprocess.DEVNULL, 

791 start_new_session=(False if os.name == "nt" else True), # POSIX-only 

792 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == "nt" else 0, 

793 ) 

794 

795 if process.poll() is not None: 

796 app.console.print("[red]Failed to start services in the background![/]") 

797 raise typer.Exit(code=1) 

798 

799 _write_pid_file(SERVICES_PID_FILE, process.pid) 

800 app.console.print( 

801 "\n[green]Services are running in the background.[/]" 

802 "\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]" 

803 ) 

804 

805 

806@services_app.command(aliases=["stop"]) 1a

807async def stop_services(): 1a

808 """Stop any background Prefect services that were started.""" 

809 

810 if not SERVICES_PID_FILE.exists(): 

811 app.console.print("No services are running in the background.") 

812 raise typer.Exit() 

813 

814 if (pid := _read_pid_file(SERVICES_PID_FILE)) is None: 

815 _cleanup_pid_file(SERVICES_PID_FILE) 

816 app.console.print("No valid PID file found.") 

817 raise typer.Exit() 

818 

819 if not _is_process_running(pid): 

820 app.console.print("[yellow]Services were not running[/]") 

821 _cleanup_pid_file(SERVICES_PID_FILE) 

822 return 

823 

824 app.console.print("\n[yellow]Shutting down...[/]") 

825 try: 

826 if os.name == "nt": 

827 # On Windows, send Ctrl+C to the process group 

828 os.kill(pid, signal.CTRL_C_EVENT) 

829 else: 

830 # On Unix, send SIGTERM 

831 os.kill(pid, signal.SIGTERM) 

832 except (ProcessLookupError, OSError): 

833 pass 

834 

835 for _ in range(5): 

836 if not _is_process_running(pid): 

837 app.console.print("[dim]✓ Services stopped[/]") 

838 break 

839 await asyncio.sleep(1) 

840 

841 _cleanup_pid_file(SERVICES_PID_FILE) 

842 app.console.print("\n[green]All services stopped.[/]")