Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/dev.py: 20%
161 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Command line interface for working with Prefect Server
3"""
5import json 1a
6import os 1a
7import platform 1a
8import shutil 1a
9import subprocess 1a
10import sys 1a
11import textwrap 1a
12import time 1a
13from functools import partial 1a
14from typing import Optional 1a
16import anyio 1a
17import typer 1a
19import prefect 1a
20from prefect.cli._types import PrefectTyper, SettingsOption 1a
21from prefect.cli._utilities import exit_with_error, exit_with_success 1a
22from prefect.cli.root import app 1a
23from prefect.settings import ( 1a
24 PREFECT_SERVER_API_HOST,
25 PREFECT_SERVER_API_PORT,
26)
27from prefect.utilities.dockerutils import get_prefect_image_name, python_version_minor 1a
28from prefect.utilities.filesystem import tmpchdir 1a
29from prefect.utilities.processutils import run_process 1a
31DEV_HELP = """ 1a
32Internal Prefect development.
34Note that many of these commands require extra dependencies (such as npm and MkDocs)
35to function properly.
36"""
37dev_app: PrefectTyper = PrefectTyper( 1a
38 name="dev", short_help="Internal Prefect development.", help=DEV_HELP
39)
40app.add_typer(dev_app) 1a
43def exit_with_error_if_not_editable_install() -> None: 1a
44 if (
45 prefect.__module_path__.parent == "site-packages"
46 or not (prefect.__development_base_path__ / "pyproject.toml").exists()
47 ):
48 exit_with_error(
49 "Development commands require an editable Prefect installation. "
50 "Development commands require content outside of the 'prefect' module "
51 "which is not available when installed into your site-packages. "
52 f"Detected module path: {prefect.__module_path__}."
53 )
56@dev_app.command() 1a
57def build_docs(schema_path: Optional[str] = None): 1a
58 """
59 Builds REST API reference documentation for static display.
60 """
61 exit_with_error_if_not_editable_install()
63 from prefect.server.api.server import create_app
65 schema = create_app(ephemeral=True).openapi()
67 if not schema_path:
68 path_to_schema = (
69 prefect.__development_base_path__ / "docs" / "api-ref" / "schema.json"
70 ).absolute()
71 else:
72 path_to_schema = os.path.abspath(schema_path)
73 # overwrite info for display purposes
74 schema["info"] = {}
75 with open(path_to_schema, "w") as f:
76 json.dump(schema, f)
77 app.console.print(f"OpenAPI schema written to {path_to_schema}")
80BUILD_UI_HELP = """ 1a
81Installs dependencies and builds UI locally. Requires npm.
82"""
85@dev_app.command(help=BUILD_UI_HELP) 1a
86def build_ui( 1a
87 no_install: bool = False,
88):
89 exit_with_error_if_not_editable_install()
90 with tmpchdir(prefect.__development_base_path__ / "ui"):
91 if not no_install:
92 app.console.print("Installing npm packages...")
93 try:
94 subprocess.check_output(["npm", "ci"], shell=sys.platform == "win32")
95 except Exception:
96 app.console.print(
97 "npm call failed - try running `nvm use` first.", style="red"
98 )
99 raise
100 app.console.print("Building for distribution...")
101 env = os.environ.copy()
102 subprocess.check_output(
103 ["npm", "run", "build"], env=env, shell=sys.platform == "win32"
104 )
106 with tmpchdir(prefect.__development_base_path__):
107 if os.path.exists(prefect.__ui_static_path__):
108 app.console.print("Removing existing build files...")
109 shutil.rmtree(prefect.__ui_static_path__)
111 app.console.print("Copying build into src...")
112 shutil.copytree("ui/dist", prefect.__ui_static_path__)
114 app.console.print("Complete!")
117@dev_app.command() 1a
118async def ui(): 1a
119 """
120 Starts a hot-reloading development UI.
121 """
122 exit_with_error_if_not_editable_install()
123 with tmpchdir(prefect.__development_base_path__ / "ui"):
124 app.console.print("Installing npm packages...")
125 await run_process(["npm", "install"], stream_output=True)
127 app.console.print("Starting UI development server...")
128 await run_process(command=["npm", "run", "serve"], stream_output=True)
131@dev_app.command() 1a
132async def api( 1a
133 host: str = SettingsOption(PREFECT_SERVER_API_HOST),
134 port: int = SettingsOption(PREFECT_SERVER_API_PORT),
135 log_level: str = "DEBUG",
136 services: bool = True,
137):
138 """
139 Starts a hot-reloading development API.
140 """
141 import watchfiles
143 server_env = os.environ.copy()
144 server_env["PREFECT_API_SERVICES_RUN_IN_APP"] = str(services)
145 server_env["PREFECT_API_SERVICES_UI"] = "False"
146 server_env["PREFECT_UI_API_URL"] = f"http://{host}:{port}/api"
148 command = [
149 sys.executable,
150 "-m",
151 "uvicorn",
152 "--factory",
153 "prefect.server.api.server:create_app",
154 "--host",
155 str(host),
156 "--port",
157 str(port),
158 "--log-level",
159 log_level.lower(),
160 ]
162 app.console.print(f"Running: {' '.join(command)}")
163 import signal
165 stop_event = anyio.Event()
166 start_command = partial(
167 run_process, command=command, env=server_env, stream_output=True
168 )
170 async with anyio.create_task_group() as tg:
171 try:
172 server_pid = await tg.start(start_command)
173 async for _ in watchfiles.awatch(
174 prefect.__module_path__,
175 stop_event=stop_event, # type: ignore
176 ):
177 # when any watched files change, restart the server
178 app.console.print("Restarting Prefect Server...")
179 os.kill(server_pid, signal.SIGTERM) # type: ignore
180 # start a new server
181 server_pid = await tg.start(start_command)
182 except RuntimeError as err:
183 # a bug in watchfiles causes an 'Already borrowed' error from Rust when
184 # exiting: https://github.com/samuelcolvin/watchfiles/issues/200
185 if str(err).strip() != "Already borrowed":
186 raise
187 except KeyboardInterrupt:
188 # exit cleanly on ctrl-c by killing the server process if it's
189 # still running
190 try:
191 os.kill(server_pid, signal.SIGTERM) # type: ignore
192 except ProcessLookupError:
193 # process already exited
194 pass
196 stop_event.set()
199@dev_app.command() 1a
200async def start( 1a
201 exclude_api: bool = typer.Option(False, "--no-api"),
202 exclude_ui: bool = typer.Option(False, "--no-ui"),
203):
204 """
205 Starts a hot-reloading development server with API, UI, and agent processes.
207 Each service has an individual command if you wish to start them separately.
208 Each service can be excluded here as well.
209 """
210 async with anyio.create_task_group() as tg:
211 if not exclude_api:
212 tg.start_soon(
213 partial(
214 # CLI commands are wrapped in sync_compatible, but this
215 # task group is async, so we need use the wrapped function
216 # directly
217 api.aio,
218 host=PREFECT_SERVER_API_HOST.value(),
219 port=PREFECT_SERVER_API_PORT.value(),
220 )
221 )
222 if not exclude_ui:
223 tg.start_soon(ui.aio)
226@dev_app.command() 1a
227def build_image( 1a
228 arch: str = typer.Option(
229 None,
230 help=(
231 "The architecture to build the container for. "
232 "Defaults to the architecture of the host Python. "
233 f"[default: {platform.machine()}]"
234 ),
235 ),
236 python_version: str = typer.Option(
237 None,
238 help=(
239 "The Python version to build the container for. "
240 "Defaults to the version of the host Python. "
241 f"[default: {python_version_minor()}]"
242 ),
243 ),
244 flavor: str = typer.Option(
245 None,
246 help=(
247 "An alternative flavor to build, for example 'conda'. "
248 "Defaults to the standard Python base image"
249 ),
250 ),
251 build_arg: list[str] = typer.Option(
252 [],
253 help=(
254 "This will directly pass a --build-arg into the docker build process. "
255 "Can be added to the command line multiple times."
256 ),
257 ),
258 dry_run: bool = False,
259):
260 """
261 Build a docker image for development.
262 """
263 exit_with_error_if_not_editable_install()
264 # TODO: Once https://github.com/tiangolo/typer/issues/354 is addressed, the
265 # default can be set in the function signature
266 arch = arch or platform.machine()
267 python_version = python_version or python_version_minor()
269 tag = get_prefect_image_name(python_version=python_version, flavor=flavor)
271 # Here we use a subprocess instead of the docker-py client to easily stream output
272 # as it comes
273 command = [
274 "docker",
275 "build",
276 str(prefect.__development_base_path__),
277 "--tag",
278 tag,
279 "--platform",
280 f"linux/{arch}",
281 "--build-arg",
282 "PREFECT_EXTRAS=[dev]",
283 "--build-arg",
284 f"PYTHON_VERSION={python_version}",
285 ]
287 if flavor:
288 command += ["--build-arg", f"BASE_IMAGE=prefect-{flavor}"]
290 for arg in build_arg:
291 command += ["--build-arg", arg]
293 if dry_run:
294 print(" ".join(command))
295 return
297 try:
298 subprocess.check_call(command, shell=sys.platform == "win32")
299 except subprocess.CalledProcessError:
300 exit_with_error("Failed to build image!")
301 else:
302 exit_with_success(f"Built image {tag!r} for linux/{arch}")
305@dev_app.command() 1a
306def container( 1a
307 bg: bool = False, name="prefect-dev", api: bool = True, tag: Optional[str] = None
308):
309 """
310 Run a docker container with local code mounted and installed.
311 """
312 exit_with_error_if_not_editable_install()
313 import docker
314 from docker.models.containers import Container
316 client = docker.from_env()
318 containers = client.containers.list()
319 container_names = {container.name for container in containers}
320 if name in container_names:
321 exit_with_error(
322 f"Container {name!r} already exists. Specify a different name or stop "
323 "the existing container."
324 )
326 blocking_cmd = "prefect dev api" if api else "sleep infinity"
327 tag = tag or get_prefect_image_name()
329 container: Container = client.containers.create(
330 image=tag,
331 command=[
332 "/bin/bash",
333 "-c",
334 ( # noqa
335 "pip install -e /opt/prefect/repo\\[dev\\] && touch /READY &&"
336 f" {blocking_cmd}"
337 ),
338 ],
339 name=name,
340 auto_remove=True,
341 working_dir="/opt/prefect/repo",
342 volumes=[f"{prefect.__development_base_path__}:/opt/prefect/repo"],
343 shm_size="4G",
344 )
346 print(f"Starting container for image {tag!r}...")
347 container.start()
349 print("Waiting for installation to complete", end="", flush=True)
350 try:
351 ready = False
352 while not ready:
353 print(".", end="", flush=True)
354 result = container.exec_run("test -f /READY")
355 ready = result.exit_code == 0
356 if not ready:
357 time.sleep(3)
358 except BaseException:
359 print("\nInterrupted. Stopping container...")
360 container.stop()
361 raise
363 print(
364 textwrap.dedent(
365 f"""
366 Container {container.name!r} is ready! To connect to the container, run:
368 docker exec -it {container.name} /bin/bash
369 """
370 )
371 )
373 if bg:
374 print(
375 textwrap.dedent(
376 f"""
377 The container will run forever. Stop the container with:
379 docker stop {container.name}
380 """
381 )
382 )
383 # Exit without stopping
384 return
386 try:
387 print("Send a keyboard interrupt to exit...")
388 container.wait()
389 except KeyboardInterrupt:
390 pass # Avoid showing "Abort"
391 finally:
392 print("\nStopping container...")
393 container.stop()