Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/server.py: 27%
334 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Command line interface for working with the Prefect API and server.
3"""
5from __future__ import annotations 1a
7import asyncio 1a
8import inspect 1a
9import os 1a
10import shlex 1a
11import signal 1a
12import socket 1a
13import subprocess 1a
14import sys 1a
15import textwrap 1a
16from pathlib import Path 1a
17from typing import TYPE_CHECKING 1a
19import typer 1a
20import uvicorn 1a
21from rich.table import Table 1a
22from rich.text import Text 1a
24import prefect 1a
25import prefect.settings 1a
26from prefect.cli._prompts import prompt 1a
27from prefect.cli._types import PrefectTyper, SettingsOption 1a
28from prefect.cli._utilities import exit_with_error, exit_with_success 1a
29from prefect.cli.cloud import prompt_select_from_list 1a
30from prefect.cli.root import app, is_interactive 1a
31from prefect.logging import get_logger 1a
32from prefect.settings import ( 1a
33 PREFECT_API_SERVICES_LATE_RUNS_ENABLED,
34 PREFECT_API_SERVICES_SCHEDULER_ENABLED,
35 PREFECT_API_URL,
36 PREFECT_HOME,
37 PREFECT_SERVER_ANALYTICS_ENABLED,
38 PREFECT_SERVER_API_BASE_PATH,
39 PREFECT_SERVER_API_HOST,
40 PREFECT_SERVER_API_KEEPALIVE_TIMEOUT,
41 PREFECT_SERVER_API_PORT,
42 PREFECT_SERVER_LOGGING_LEVEL,
43 PREFECT_UI_ENABLED,
44 Profile,
45 get_current_settings,
46 load_current_profile,
47 load_profiles,
48 save_profiles,
49 update_current_profile,
50)
51from prefect.settings.context import temporary_settings 1a
52from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a
54if TYPE_CHECKING: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true1a
55 import logging
57server_app: PrefectTyper = PrefectTyper( 1a
58 name="server",
59 help="Start a Prefect server instance and interact with the database",
60)
61database_app: PrefectTyper = PrefectTyper( 1a
62 name="database", help="Interact with the database."
63)
64services_app: PrefectTyper = PrefectTyper( 1a
65 name="services", help="Interact with server loop services."
66)
67server_app.add_typer(database_app) 1a
68server_app.add_typer(services_app) 1a
69app.add_typer(server_app) 1a
71logger: "logging.Logger" = get_logger(__name__) 1a
73SERVER_PID_FILE_NAME = "server.pid" 1a
74SERVICES_PID_FILE = Path(PREFECT_HOME.value()) / "services.pid" 1a
77def generate_welcome_blurb(base_url: str, ui_enabled: bool) -> str: 1a
78 if PREFECT_SERVER_API_BASE_PATH: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true1a
79 suffix = PREFECT_SERVER_API_BASE_PATH.value()
80 else:
81 suffix = "/api" 1a
83 blurb = textwrap.dedent( 1a
84 r"""
85 ___ ___ ___ ___ ___ ___ _____
86 | _ \ _ \ __| __| __/ __|_ _|
87 | _/ / _|| _|| _| (__ | |
88 |_| |_|_\___|_| |___\___| |_|
90 Configure Prefect to communicate with the server with:
92 prefect config set PREFECT_API_URL={api_url}
94 View the API reference documentation at {docs_url}
95 """
96 ).format(api_url=base_url + suffix, docs_url=base_url + "/docs")
98 visit_dashboard = textwrap.dedent( 1a
99 f"""
100 Check out the dashboard at {base_url}
101 """
102 )
104 dashboard_not_built = textwrap.dedent( 1a
105 """
106 The dashboard is not built. It looks like you're on a development version.
107 See `prefect dev` for development commands.
108 """
109 )
111 dashboard_disabled = textwrap.dedent( 1a
112 """
113 The dashboard is disabled. Set `PREFECT_UI_ENABLED=1` to re-enable it.
114 """
115 )
117 if not os.path.exists(prefect.__ui_static_path__): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true1a
118 blurb += dashboard_not_built
119 elif not ui_enabled: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true1a
120 blurb += dashboard_disabled
121 else:
122 blurb += visit_dashboard 1a
124 return blurb 1a
127def prestart_check(base_url: str) -> None: 1a
128 """
129 Check if `PREFECT_API_URL` is set in the current profile. If not, prompt the user to set it.
131 Args:
132 base_url: The base URL the server will be running on
133 """
134 api_url = f"{base_url}/api"
135 current_profile = load_current_profile()
136 profiles = load_profiles()
137 if current_profile and PREFECT_API_URL not in current_profile.settings:
138 profiles_with_matching_url = [
139 name
140 for name, profile in profiles.items()
141 if profile.settings.get(PREFECT_API_URL) == api_url
142 ]
143 if len(profiles_with_matching_url) == 1:
144 profiles.set_active(profiles_with_matching_url[0])
145 save_profiles(profiles)
146 app.console.print(
147 f"Switched to profile {profiles_with_matching_url[0]!r}",
148 style="green",
149 )
150 return
151 elif len(profiles_with_matching_url) > 1:
152 app.console.print(
153 "Your current profile doesn't have `PREFECT_API_URL` set to the address"
154 " of the server that's running. Some of your other profiles do."
155 )
156 selected_profile = prompt_select_from_list(
157 app.console,
158 "Which profile would you like to switch to?",
159 sorted(
160 [profile for profile in profiles_with_matching_url],
161 ),
162 )
163 profiles.set_active(selected_profile)
164 save_profiles(profiles)
165 app.console.print(
166 f"Switched to profile {selected_profile!r}", style="green"
167 )
168 return
170 app.console.print(
171 "The `PREFECT_API_URL` setting for your current profile doesn't match the"
172 " address of the server that's running. You need to set it to communicate"
173 " with the server.",
174 style="yellow",
175 )
177 choice = prompt_select_from_list(
178 app.console,
179 "How would you like to proceed?",
180 [
181 (
182 "create",
183 "Create a new profile with `PREFECT_API_URL` set and switch to it",
184 ),
185 (
186 "set",
187 f"Set `PREFECT_API_URL` in the current profile: {current_profile.name!r}",
188 ),
189 ],
190 )
192 if choice == "create":
193 while True:
194 profile_name = prompt("Enter a new profile name")
195 if profile_name in profiles:
196 app.console.print(
197 f"Profile {profile_name!r} already exists. Please choose a different name.",
198 style="red",
199 )
200 else:
201 break
203 profiles.add_profile(
204 Profile(
205 name=profile_name, settings={PREFECT_API_URL: f"{base_url}/api"}
206 )
207 )
208 profiles.set_active(profile_name)
209 save_profiles(profiles)
211 app.console.print(
212 f"Switched to new profile {profile_name!r}", style="green"
213 )
214 elif choice == "set":
215 api_url = prompt(
216 "Enter the `PREFECT_API_URL` value", default="http://127.0.0.1:4200/api"
217 )
218 update_current_profile({PREFECT_API_URL: api_url})
219 app.console.print(
220 f"Set `PREFECT_API_URL` to {api_url!r} in the current profile {current_profile.name!r}",
221 style="green",
222 )
225def _validate_multi_worker(workers: int) -> None: 1a
226 """
227 Validates the configuration for running multiple Prefect server workers.
229 Multi-worker mode requires specific infrastructure components to ensure proper
230 coordination and data consistency across worker processes:
232 - **Database**: PostgreSQL is required (SQLite causes database locking issues)
233 - **Messaging**: Redis is required for event messaging (in-memory messaging
234 doesn't work across processes)
236 Args:
237 workers: The number of worker processes to run. Must be >= 1.
239 Raises:
240 exit_with_error: If the configuration is invalid.
241 """
242 from prefect.server.utilities.database import get_dialect 1a
244 if workers == 1: 244 ↛ 247line 244 didn't jump to line 247 because the condition on line 244 was always true1a
245 return 1a
247 if workers < 1:
248 exit_with_error("Number of workers must be >= 1")
250 settings = get_current_settings()
252 try:
253 dialect = get_dialect(
254 settings.server.database.connection_url.get_secret_value()
255 )
256 except Exception as e:
257 exit_with_error(f"Unable to validate database configuration: {e}")
259 if dialect.name != "postgresql":
260 exit_with_error(
261 "Multi-worker mode (--workers > 1) is not supported with SQLite database."
262 )
264 try:
265 messaging_cache = settings.server.events.messaging_cache
266 messaging_broker = settings.server.events.messaging_broker
267 causal_ordering = settings.server.events.causal_ordering
268 lease_storage = settings.server.concurrency.lease_storage
269 except Exception as e:
270 exit_with_error(f"Unable to validate messaging configuration: {e}")
272 if (
273 messaging_cache == "prefect.server.utilities.messaging.memory"
274 or messaging_broker == "prefect.server.utilities.messaging.memory"
275 or causal_ordering == "prefect.server.events.ordering.memory"
276 or lease_storage == "prefect.server.concurrency.lease_storage.memory"
277 ):
278 error_message = textwrap.dedent(
279 """
280 Multi-worker mode (--workers > 1) requires Redis for messaging and lease storage.
282 Please configure the following settings to use Redis:
284 prefect config set PREFECT_MESSAGING_BROKER="prefect_redis.messaging"
285 prefect config set PREFECT_MESSAGING_CACHE="prefect_redis.messaging"
286 prefect config set PREFECT_SERVER_EVENTS_CAUSAL_ORDERING="prefect_redis.ordering"
287 prefect config set PREFECT_SERVER_CONCURRENCY_LEASE_STORAGE="prefect_redis.lease_storage"
289 You'll also need to configure your Redis connection:
291 export PREFECT_REDIS_MESSAGING_HOST="your-redis-host"
292 export PREFECT_REDIS_MESSAGING_PORT="6379"
293 export PREFECT_REDIS_MESSAGING_DB="0"
295 For complete setup instructions, see:
296 https://docs.prefect.io/v3/how-to-guides/self-hosted/server-cli#multi-worker-api-server
297 https://docs.prefect.io/v3/advanced/self-hosted#redis-setup
298 """
299 ).strip()
300 exit_with_error(error_message)
303@server_app.command() 1a
304def start( 1a
305 host: str = SettingsOption(PREFECT_SERVER_API_HOST),
306 port: int = SettingsOption(PREFECT_SERVER_API_PORT),
307 keep_alive_timeout: int = SettingsOption(PREFECT_SERVER_API_KEEPALIVE_TIMEOUT),
308 log_level: str = SettingsOption(PREFECT_SERVER_LOGGING_LEVEL),
309 scheduler: bool = SettingsOption(PREFECT_API_SERVICES_SCHEDULER_ENABLED),
310 analytics: bool = SettingsOption(
311 PREFECT_SERVER_ANALYTICS_ENABLED, "--analytics-on/--analytics-off"
312 ),
313 late_runs: bool = SettingsOption(PREFECT_API_SERVICES_LATE_RUNS_ENABLED),
314 ui: bool = SettingsOption(PREFECT_UI_ENABLED),
315 no_services: bool = typer.Option(
316 False, "--no-services", help="Only run the webserver API and UI"
317 ),
318 background: bool = typer.Option(
319 False, "--background", "-b", help="Run the server in the background"
320 ),
321 workers: int = typer.Option(
322 1,
323 "--workers",
324 help="Number of worker processes to run. Only runs the webserver API and UI",
325 ),
326):
327 """
328 Start a Prefect server instance
329 """
330 base_url = f"http://{host}:{port}" 1a
331 if is_interactive(): 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true1a
332 try:
333 prestart_check(base_url)
334 except Exception:
335 pass
337 if workers > 1: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true1a
338 no_services = True
339 _validate_multi_worker(workers) 1a
341 server_settings = { 1a
342 "PREFECT_API_SERVICES_SCHEDULER_ENABLED": str(scheduler),
343 "PREFECT_SERVER_ANALYTICS_ENABLED": str(analytics),
344 "PREFECT_API_SERVICES_LATE_RUNS_ENABLED": str(late_runs),
345 "PREFECT_UI_ENABLED": str(ui),
346 "PREFECT_SERVER_LOGGING_LEVEL": log_level,
347 }
349 if no_services: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true1a
350 server_settings["PREFECT_SERVER_ANALYTICS_ENABLED"] = "False"
352 pid_file = Path(PREFECT_HOME.value()) / SERVER_PID_FILE_NAME 1a
353 # check if port is already in use
354 try: 1a
355 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1a
356 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1a
357 s.bind((host, port)) 1a
358 except socket.gaierror:
359 exit_with_error(
360 f"Invalid host '{host}'. Please specify a valid hostname or IP address."
361 )
362 except socket.error:
363 if pid_file.exists():
364 exit_with_error(
365 f"A background server process is already running on port {port}. "
366 "Run `prefect server stop` to stop it or specify a different port "
367 "with the `--port` flag."
368 )
369 exit_with_error(
370 f"Port {port} is already in use. Please specify a different port with the "
371 "`--port` flag."
372 )
374 # check if server is already running in the background
375 if background: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true1a
376 try:
377 pid_file.touch(mode=0o600, exist_ok=False)
378 except FileExistsError:
379 exit_with_error(
380 "A server is already running in the background. To stop it,"
381 " run `prefect server stop`."
382 )
384 app.console.print(generate_welcome_blurb(base_url, ui_enabled=ui)) 1a
385 app.console.print("\n") 1a
387 if workers > 1: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true1a
388 app.console.print(
389 f"Starting server with {workers} worker processes.\n", style="blue"
390 )
392 if background: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true1a
393 _run_in_background(
394 pid_file,
395 server_settings,
396 host,
397 port,
398 keep_alive_timeout,
399 no_services,
400 workers,
401 )
402 else:
403 _run_in_foreground( 1a
404 server_settings,
405 host,
406 port,
407 keep_alive_timeout,
408 no_services,
409 workers,
410 )
413def _run_in_background( 1a
414 pid_file: Path,
415 server_settings: dict[str, str],
416 host: str,
417 port: int,
418 keep_alive_timeout: int,
419 no_services: bool,
420 workers: int,
421) -> None:
422 command = [
423 sys.executable,
424 "-m",
425 "uvicorn",
426 "--app-dir",
427 str(prefect.__module_path__.parent),
428 "--factory",
429 "prefect.server.api.server:create_app",
430 "--host",
431 str(host),
432 "--port",
433 str(port),
434 "--timeout-keep-alive",
435 str(keep_alive_timeout),
436 "--workers",
437 str(workers),
438 ]
439 logger.debug("Opening server process with command: %s", shlex.join(command))
441 env = {**os.environ, **server_settings, "PREFECT__SERVER_FINAL": "1"}
442 if no_services:
443 env["PREFECT__SERVER_WEBSERVER_ONLY"] = "1"
445 process = subprocess.Popen(
446 command,
447 env=env,
448 stdout=subprocess.PIPE,
449 stderr=subprocess.PIPE,
450 )
452 process_id = process.pid
453 pid_file.write_text(str(process_id))
455 app.console.print(
456 "The Prefect server is running in the background. Run `prefect"
457 " server stop` to stop it."
458 )
461def _run_in_foreground( 1a
462 server_settings: dict[str, str],
463 host: str,
464 port: int,
465 keep_alive_timeout: int,
466 no_services: bool,
467 workers: int,
468) -> None:
469 from prefect.server.api.server import create_app 1a
471 try: 1a
472 with temporary_settings( 1a
473 {getattr(prefect.settings, k): v for k, v in server_settings.items()}
474 ):
475 if workers == 1: 475 ↛ 488line 475 didn't jump to line 488 because the condition on line 475 was always true1a
476 uvicorn.run( 1a
477 app=create_app(final=True, webserver_only=no_services),
478 app_dir=str(prefect.__module_path__.parent),
479 host=host,
480 port=port,
481 timeout_keep_alive=keep_alive_timeout,
482 log_level=server_settings.get(
483 "PREFECT_SERVER_LOGGING_LEVEL", "info"
484 ).lower(),
485 )
487 else:
488 os.environ["PREFECT__SERVER_FINAL"] = "1"
489 os.environ["PREFECT__SERVER_WEBSERVER_ONLY"] = "1"
491 uvicorn.run(
492 app="prefect.server.api.server:create_app",
493 factory=True,
494 host=host,
495 port=port,
496 timeout_keep_alive=keep_alive_timeout,
497 log_level=server_settings.get(
498 "PREFECT_SERVER_LOGGING_LEVEL", "info"
499 ).lower(),
500 workers=workers,
501 )
503 finally:
504 app.console.print("Server stopped!")
507@server_app.command() 1a
508async def stop(): 1a
509 """Stop a Prefect server instance running in the background"""
510 pid_file = Path(PREFECT_HOME.value()) / SERVER_PID_FILE_NAME
511 if not pid_file.exists():
512 exit_with_success("No server running in the background.")
513 pid = int(pid_file.read_text())
514 try:
515 os.kill(pid, signal.SIGTERM)
516 except ProcessLookupError:
517 exit_with_success(
518 "The server process is not running. Cleaning up stale PID file."
519 )
520 finally:
521 # The file probably exists, but use `missing_ok` to avoid an
522 # error if the file was deleted by another actor
523 pid_file.unlink(missing_ok=True)
524 app.console.print("Server stopped!")
527@database_app.command() 1a
528async def reset(yes: bool = typer.Option(False, "--yes", "-y")): 1a
529 """Drop and recreate all Prefect database tables"""
530 from prefect.server.database import provide_database_interface
532 db = provide_database_interface()
533 engine = await db.engine()
534 if not yes:
535 confirm = typer.confirm(
536 "Are you sure you want to reset the Prefect database located "
537 f'at "{engine.url!r}"? This will drop and recreate all tables.'
538 )
539 if not confirm:
540 exit_with_error("Database reset aborted")
541 app.console.print("Downgrading database...")
542 await db.drop_db()
543 app.console.print("Upgrading database...")
544 await db.create_db()
545 exit_with_success(f'Prefect database "{engine.url!r}" reset!')
548@database_app.command() 1a
549async def upgrade( 1a
550 yes: bool = typer.Option(False, "--yes", "-y"),
551 revision: str = typer.Option(
552 "head",
553 "-r",
554 help=(
555 "The revision to pass to `alembic upgrade`. If not provided, runs all"
556 " migrations."
557 ),
558 ),
559 dry_run: bool = typer.Option(
560 False,
561 help=(
562 "Flag to show what migrations would be made without applying them. Will"
563 " emit sql statements to stdout."
564 ),
565 ),
566):
567 """Upgrade the Prefect database"""
568 from prefect.server.database import provide_database_interface
569 from prefect.server.database.alembic_commands import alembic_upgrade
571 db = provide_database_interface()
572 engine = await db.engine()
574 if not yes:
575 confirm = typer.confirm(
576 f"Are you sure you want to upgrade the Prefect database at {engine.url!r}?"
577 )
578 if not confirm:
579 exit_with_error("Database upgrade aborted!")
581 app.console.print("Running upgrade migrations ...")
582 await run_sync_in_worker_thread(alembic_upgrade, revision=revision, dry_run=dry_run)
583 app.console.print("Migrations succeeded!")
584 exit_with_success(f"Prefect database at {engine.url!r} upgraded!")
587@database_app.command() 1a
588async def downgrade( 1a
589 yes: bool = typer.Option(False, "--yes", "-y"),
590 revision: str = typer.Option(
591 "-1",
592 "-r",
593 help=(
594 "The revision to pass to `alembic downgrade`. If not provided, "
595 "downgrades to the most recent revision. Use 'base' to run all "
596 "migrations."
597 ),
598 ),
599 dry_run: bool = typer.Option(
600 False,
601 help=(
602 "Flag to show what migrations would be made without applying them. Will"
603 " emit sql statements to stdout."
604 ),
605 ),
606):
607 """Downgrade the Prefect database"""
608 from prefect.server.database import provide_database_interface
609 from prefect.server.database.alembic_commands import alembic_downgrade
611 db = provide_database_interface()
613 engine = await db.engine()
615 if not yes:
616 confirm = typer.confirm(
617 "Are you sure you want to downgrade the Prefect "
618 f"database at {engine.url!r}?"
619 )
620 if not confirm:
621 exit_with_error("Database downgrade aborted!")
623 app.console.print("Running downgrade migrations ...")
624 await run_sync_in_worker_thread(
625 alembic_downgrade, revision=revision, dry_run=dry_run
626 )
627 app.console.print("Migrations succeeded!")
628 exit_with_success(f"Prefect database at {engine.url!r} downgraded!")
631@database_app.command() 1a
632async def revision( 1a
633 message: str = typer.Option(
634 None,
635 "--message",
636 "-m",
637 help="A message to describe the migration.",
638 ),
639 autogenerate: bool = False,
640):
641 """Create a new migration for the Prefect database"""
642 from prefect.server.database.alembic_commands import alembic_revision
644 app.console.print("Running migration file creation ...")
645 await run_sync_in_worker_thread(
646 alembic_revision,
647 message=message,
648 autogenerate=autogenerate,
649 )
650 exit_with_success("Creating new migration file succeeded!")
653@database_app.command() 1a
654async def stamp(revision: str): 1a
655 """Stamp the revision table with the given revision; don't run any migrations"""
656 from prefect.server.database.alembic_commands import alembic_stamp
658 app.console.print("Stamping database with revision ...")
659 await run_sync_in_worker_thread(alembic_stamp, revision=revision)
660 exit_with_success("Stamping database with revision succeeded!")
663def _is_process_running(pid: int) -> bool: 1a
664 """Check if a process is running by attempting to send signal 0."""
665 try:
666 os.kill(pid, 0)
667 return True
668 except (ProcessLookupError, OSError):
669 return False
672def _read_pid_file(path: Path) -> int | None: 1a
673 """Read and validate a PID from a file."""
674 try:
675 return int(path.read_text())
676 except (ValueError, OSError, FileNotFoundError):
677 return None
680def _write_pid_file(path: Path, pid: int) -> None: 1a
681 """Write a PID to a file, creating parent directories if needed."""
682 path.parent.mkdir(parents=True, exist_ok=True)
683 path.write_text(str(pid))
686def _cleanup_pid_file(path: Path) -> None: 1a
687 """Remove PID file and try to cleanup empty parent directory."""
688 path.unlink(missing_ok=True)
689 try:
690 path.parent.rmdir()
691 except OSError:
692 pass
695# this is a hidden command used by the `prefect server services start --background` command
696@services_app.command(hidden=True, name="manager") 1a
697def run_manager_process(): 1a
698 """
699 This is an internal entrypoint used by `prefect server services start --background`.
700 Users do not call this directly.
702 We do everything in sync so that the child won't exit until the user kills it.
703 """
704 from prefect.server.services.base import Service
706 if not Service.enabled_services():
707 logger.error("No services are enabled! Exiting manager.")
708 sys.exit(1)
710 logger.debug("Manager process started. Starting services...")
711 try:
712 asyncio.run(Service.run_services())
713 except KeyboardInterrupt:
714 pass
715 finally:
716 logger.debug("Manager process has exited.")
719# public, user-facing `prefect server services` commands
720@services_app.command(aliases=["ls"]) 1a
721def list_services(): 1a
722 """List all available services and their status."""
723 from prefect.server.services.base import Service
725 table = Table(title="Available Services", expand=True)
726 table.add_column("Name", no_wrap=True)
727 table.add_column("Enabled?", no_wrap=True)
728 table.add_column("Description", style="cyan", no_wrap=False)
730 for svc in Service.all_services():
731 name = svc.__name__
733 setting_text = Text(f"✓ {svc.environment_variable_name()}", style="green")
734 if not svc.enabled():
735 setting_text = Text(f"x {svc.environment_variable_name()}", style="gray50")
737 doc = inspect.getdoc(svc) or ""
738 description = doc.split("\n", 1)[0].strip()
740 table.add_row(name, setting_text, description)
742 app.console.print(table)
745@services_app.command(aliases=["start"]) 1a
746def start_services( 1a
747 background: bool = typer.Option(
748 False, "--background", "-b", help="Run the services in the background"
749 ),
750):
751 """Start all enabled Prefect services in one process."""
752 from prefect.server.services.base import Service
754 SERVICES_PID_FILE.parent.mkdir(parents=True, exist_ok=True)
756 if SERVICES_PID_FILE.exists():
757 pid = _read_pid_file(SERVICES_PID_FILE)
758 if pid is not None and _is_process_running(pid):
759 app.console.print(
760 "\n[yellow]Services are already running in the background.[/]"
761 "\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]"
762 )
763 raise typer.Exit(code=1)
764 else:
765 # Stale or invalid file
766 _cleanup_pid_file(SERVICES_PID_FILE)
768 if not Service.enabled_services():
769 app.console.print("[red]No services are enabled![/]")
770 raise typer.Exit(code=1)
772 if not background:
773 app.console.print("\n[blue]Starting services... Press CTRL+C to stop[/]\n")
774 try:
775 asyncio.run(Service.run_services())
776 except KeyboardInterrupt:
777 pass
778 app.console.print("\n[green]All services stopped.[/]")
779 return
781 process = subprocess.Popen(
782 [
783 "prefect",
784 "server",
785 "services",
786 "manager",
787 ],
788 env=os.environ.copy(),
789 stdout=subprocess.DEVNULL,
790 stderr=subprocess.DEVNULL,
791 start_new_session=(False if os.name == "nt" else True), # POSIX-only
792 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == "nt" else 0,
793 )
795 if process.poll() is not None:
796 app.console.print("[red]Failed to start services in the background![/]")
797 raise typer.Exit(code=1)
799 _write_pid_file(SERVICES_PID_FILE, process.pid)
800 app.console.print(
801 "\n[green]Services are running in the background.[/]"
802 "\n[blue]Use[/] [yellow]`prefect server services stop`[/] [blue]to stop them.[/]"
803 )
806@services_app.command(aliases=["stop"]) 1a
807async def stop_services(): 1a
808 """Stop any background Prefect services that were started."""
810 if not SERVICES_PID_FILE.exists():
811 app.console.print("No services are running in the background.")
812 raise typer.Exit()
814 if (pid := _read_pid_file(SERVICES_PID_FILE)) is None:
815 _cleanup_pid_file(SERVICES_PID_FILE)
816 app.console.print("No valid PID file found.")
817 raise typer.Exit()
819 if not _is_process_running(pid):
820 app.console.print("[yellow]Services were not running[/]")
821 _cleanup_pid_file(SERVICES_PID_FILE)
822 return
824 app.console.print("\n[yellow]Shutting down...[/]")
825 try:
826 if os.name == "nt":
827 # On Windows, send Ctrl+C to the process group
828 os.kill(pid, signal.CTRL_C_EVENT)
829 else:
830 # On Unix, send SIGTERM
831 os.kill(pid, signal.SIGTERM)
832 except (ProcessLookupError, OSError):
833 pass
835 for _ in range(5):
836 if not _is_process_running(pid):
837 app.console.print("[dim]✓ Services stopped[/]")
838 break
839 await asyncio.sleep(1)
841 _cleanup_pid_file(SERVICES_PID_FILE)
842 app.console.print("\n[green]All services stopped.[/]")