Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/dev.py: 20%

161 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Command line interface for working with Prefect Server 

3""" 

4 

5import json 1a

6import os 1a

7import platform 1a

8import shutil 1a

9import subprocess 1a

10import sys 1a

11import textwrap 1a

12import time 1a

13from functools import partial 1a

14from typing import Optional 1a

15 

16import anyio 1a

17import typer 1a

18 

19import prefect 1a

20from prefect.cli._types import PrefectTyper, SettingsOption 1a

21from prefect.cli._utilities import exit_with_error, exit_with_success 1a

22from prefect.cli.root import app 1a

23from prefect.settings import ( 1a

24 PREFECT_SERVER_API_HOST, 

25 PREFECT_SERVER_API_PORT, 

26) 

27from prefect.utilities.dockerutils import get_prefect_image_name, python_version_minor 1a

28from prefect.utilities.filesystem import tmpchdir 1a

29from prefect.utilities.processutils import run_process 1a

30 

31DEV_HELP = """ 1a

32Internal Prefect development. 

33 

34Note that many of these commands require extra dependencies (such as npm and MkDocs) 

35to function properly. 

36""" 

37dev_app: PrefectTyper = PrefectTyper( 1a

38 name="dev", short_help="Internal Prefect development.", help=DEV_HELP 

39) 

40app.add_typer(dev_app) 1a

41 

42 

43def exit_with_error_if_not_editable_install() -> None: 1a

44 if ( 

45 prefect.__module_path__.parent == "site-packages" 

46 or not (prefect.__development_base_path__ / "pyproject.toml").exists() 

47 ): 

48 exit_with_error( 

49 "Development commands require an editable Prefect installation. " 

50 "Development commands require content outside of the 'prefect' module " 

51 "which is not available when installed into your site-packages. " 

52 f"Detected module path: {prefect.__module_path__}." 

53 ) 

54 

55 

56@dev_app.command() 1a

57def build_docs(schema_path: Optional[str] = None): 1a

58 """ 

59 Builds REST API reference documentation for static display. 

60 """ 

61 exit_with_error_if_not_editable_install() 

62 

63 from prefect.server.api.server import create_app 

64 

65 schema = create_app(ephemeral=True).openapi() 

66 

67 if not schema_path: 

68 path_to_schema = ( 

69 prefect.__development_base_path__ / "docs" / "api-ref" / "schema.json" 

70 ).absolute() 

71 else: 

72 path_to_schema = os.path.abspath(schema_path) 

73 # overwrite info for display purposes 

74 schema["info"] = {} 

75 with open(path_to_schema, "w") as f: 

76 json.dump(schema, f) 

77 app.console.print(f"OpenAPI schema written to {path_to_schema}") 

78 

79 

80BUILD_UI_HELP = """ 1a

81Installs dependencies and builds UI locally. Requires npm. 

82""" 

83 

84 

85@dev_app.command(help=BUILD_UI_HELP) 1a

86def build_ui( 1a

87 no_install: bool = False, 

88): 

89 exit_with_error_if_not_editable_install() 

90 with tmpchdir(prefect.__development_base_path__ / "ui"): 

91 if not no_install: 

92 app.console.print("Installing npm packages...") 

93 try: 

94 subprocess.check_output(["npm", "ci"], shell=sys.platform == "win32") 

95 except Exception: 

96 app.console.print( 

97 "npm call failed - try running `nvm use` first.", style="red" 

98 ) 

99 raise 

100 app.console.print("Building for distribution...") 

101 env = os.environ.copy() 

102 subprocess.check_output( 

103 ["npm", "run", "build"], env=env, shell=sys.platform == "win32" 

104 ) 

105 

106 with tmpchdir(prefect.__development_base_path__): 

107 if os.path.exists(prefect.__ui_static_path__): 

108 app.console.print("Removing existing build files...") 

109 shutil.rmtree(prefect.__ui_static_path__) 

110 

111 app.console.print("Copying build into src...") 

112 shutil.copytree("ui/dist", prefect.__ui_static_path__) 

113 

114 app.console.print("Complete!") 

115 

116 

117@dev_app.command() 1a

118async def ui(): 1a

119 """ 

120 Starts a hot-reloading development UI. 

121 """ 

122 exit_with_error_if_not_editable_install() 

123 with tmpchdir(prefect.__development_base_path__ / "ui"): 

124 app.console.print("Installing npm packages...") 

125 await run_process(["npm", "install"], stream_output=True) 

126 

127 app.console.print("Starting UI development server...") 

128 await run_process(command=["npm", "run", "serve"], stream_output=True) 

129 

130 

131@dev_app.command() 1a

132async def api( 1a

133 host: str = SettingsOption(PREFECT_SERVER_API_HOST), 

134 port: int = SettingsOption(PREFECT_SERVER_API_PORT), 

135 log_level: str = "DEBUG", 

136 services: bool = True, 

137): 

138 """ 

139 Starts a hot-reloading development API. 

140 """ 

141 import watchfiles 

142 

143 server_env = os.environ.copy() 

144 server_env["PREFECT_API_SERVICES_RUN_IN_APP"] = str(services) 

145 server_env["PREFECT_API_SERVICES_UI"] = "False" 

146 server_env["PREFECT_UI_API_URL"] = f"http://{host}:{port}/api" 

147 

148 command = [ 

149 sys.executable, 

150 "-m", 

151 "uvicorn", 

152 "--factory", 

153 "prefect.server.api.server:create_app", 

154 "--host", 

155 str(host), 

156 "--port", 

157 str(port), 

158 "--log-level", 

159 log_level.lower(), 

160 ] 

161 

162 app.console.print(f"Running: {' '.join(command)}") 

163 import signal 

164 

165 stop_event = anyio.Event() 

166 start_command = partial( 

167 run_process, command=command, env=server_env, stream_output=True 

168 ) 

169 

170 async with anyio.create_task_group() as tg: 

171 try: 

172 server_pid = await tg.start(start_command) 

173 async for _ in watchfiles.awatch( 

174 prefect.__module_path__, 

175 stop_event=stop_event, # type: ignore 

176 ): 

177 # when any watched files change, restart the server 

178 app.console.print("Restarting Prefect Server...") 

179 os.kill(server_pid, signal.SIGTERM) # type: ignore 

180 # start a new server 

181 server_pid = await tg.start(start_command) 

182 except RuntimeError as err: 

183 # a bug in watchfiles causes an 'Already borrowed' error from Rust when 

184 # exiting: https://github.com/samuelcolvin/watchfiles/issues/200 

185 if str(err).strip() != "Already borrowed": 

186 raise 

187 except KeyboardInterrupt: 

188 # exit cleanly on ctrl-c by killing the server process if it's 

189 # still running 

190 try: 

191 os.kill(server_pid, signal.SIGTERM) # type: ignore 

192 except ProcessLookupError: 

193 # process already exited 

194 pass 

195 

196 stop_event.set() 

197 

198 

199@dev_app.command() 1a

200async def start( 1a

201 exclude_api: bool = typer.Option(False, "--no-api"), 

202 exclude_ui: bool = typer.Option(False, "--no-ui"), 

203): 

204 """ 

205 Starts a hot-reloading development server with API, UI, and agent processes. 

206 

207 Each service has an individual command if you wish to start them separately. 

208 Each service can be excluded here as well. 

209 """ 

210 async with anyio.create_task_group() as tg: 

211 if not exclude_api: 

212 tg.start_soon( 

213 partial( 

214 # CLI commands are wrapped in sync_compatible, but this 

215 # task group is async, so we need use the wrapped function 

216 # directly 

217 api.aio, 

218 host=PREFECT_SERVER_API_HOST.value(), 

219 port=PREFECT_SERVER_API_PORT.value(), 

220 ) 

221 ) 

222 if not exclude_ui: 

223 tg.start_soon(ui.aio) 

224 

225 

226@dev_app.command() 1a

227def build_image( 1a

228 arch: str = typer.Option( 

229 None, 

230 help=( 

231 "The architecture to build the container for. " 

232 "Defaults to the architecture of the host Python. " 

233 f"[default: {platform.machine()}]" 

234 ), 

235 ), 

236 python_version: str = typer.Option( 

237 None, 

238 help=( 

239 "The Python version to build the container for. " 

240 "Defaults to the version of the host Python. " 

241 f"[default: {python_version_minor()}]" 

242 ), 

243 ), 

244 flavor: str = typer.Option( 

245 None, 

246 help=( 

247 "An alternative flavor to build, for example 'conda'. " 

248 "Defaults to the standard Python base image" 

249 ), 

250 ), 

251 build_arg: list[str] = typer.Option( 

252 [], 

253 help=( 

254 "This will directly pass a --build-arg into the docker build process. " 

255 "Can be added to the command line multiple times." 

256 ), 

257 ), 

258 dry_run: bool = False, 

259): 

260 """ 

261 Build a docker image for development. 

262 """ 

263 exit_with_error_if_not_editable_install() 

264 # TODO: Once https://github.com/tiangolo/typer/issues/354 is addressed, the 

265 # default can be set in the function signature 

266 arch = arch or platform.machine() 

267 python_version = python_version or python_version_minor() 

268 

269 tag = get_prefect_image_name(python_version=python_version, flavor=flavor) 

270 

271 # Here we use a subprocess instead of the docker-py client to easily stream output 

272 # as it comes 

273 command = [ 

274 "docker", 

275 "build", 

276 str(prefect.__development_base_path__), 

277 "--tag", 

278 tag, 

279 "--platform", 

280 f"linux/{arch}", 

281 "--build-arg", 

282 "PREFECT_EXTRAS=[dev]", 

283 "--build-arg", 

284 f"PYTHON_VERSION={python_version}", 

285 ] 

286 

287 if flavor: 

288 command += ["--build-arg", f"BASE_IMAGE=prefect-{flavor}"] 

289 

290 for arg in build_arg: 

291 command += ["--build-arg", arg] 

292 

293 if dry_run: 

294 print(" ".join(command)) 

295 return 

296 

297 try: 

298 subprocess.check_call(command, shell=sys.platform == "win32") 

299 except subprocess.CalledProcessError: 

300 exit_with_error("Failed to build image!") 

301 else: 

302 exit_with_success(f"Built image {tag!r} for linux/{arch}") 

303 

304 

305@dev_app.command() 1a

306def container( 1a

307 bg: bool = False, name="prefect-dev", api: bool = True, tag: Optional[str] = None 

308): 

309 """ 

310 Run a docker container with local code mounted and installed. 

311 """ 

312 exit_with_error_if_not_editable_install() 

313 import docker 

314 from docker.models.containers import Container 

315 

316 client = docker.from_env() 

317 

318 containers = client.containers.list() 

319 container_names = {container.name for container in containers} 

320 if name in container_names: 

321 exit_with_error( 

322 f"Container {name!r} already exists. Specify a different name or stop " 

323 "the existing container." 

324 ) 

325 

326 blocking_cmd = "prefect dev api" if api else "sleep infinity" 

327 tag = tag or get_prefect_image_name() 

328 

329 container: Container = client.containers.create( 

330 image=tag, 

331 command=[ 

332 "/bin/bash", 

333 "-c", 

334 ( # noqa 

335 "pip install -e /opt/prefect/repo\\[dev\\] && touch /READY &&" 

336 f" {blocking_cmd}" 

337 ), 

338 ], 

339 name=name, 

340 auto_remove=True, 

341 working_dir="/opt/prefect/repo", 

342 volumes=[f"{prefect.__development_base_path__}:/opt/prefect/repo"], 

343 shm_size="4G", 

344 ) 

345 

346 print(f"Starting container for image {tag!r}...") 

347 container.start() 

348 

349 print("Waiting for installation to complete", end="", flush=True) 

350 try: 

351 ready = False 

352 while not ready: 

353 print(".", end="", flush=True) 

354 result = container.exec_run("test -f /READY") 

355 ready = result.exit_code == 0 

356 if not ready: 

357 time.sleep(3) 

358 except BaseException: 

359 print("\nInterrupted. Stopping container...") 

360 container.stop() 

361 raise 

362 

363 print( 

364 textwrap.dedent( 

365 f""" 

366 Container {container.name!r} is ready! To connect to the container, run: 

367 

368 docker exec -it {container.name} /bin/bash 

369 """ 

370 ) 

371 ) 

372 

373 if bg: 

374 print( 

375 textwrap.dedent( 

376 f""" 

377 The container will run forever. Stop the container with: 

378 

379 docker stop {container.name} 

380 """ 

381 ) 

382 ) 

383 # Exit without stopping 

384 return 

385 

386 try: 

387 print("Send a keyboard interrupt to exit...") 

388 container.wait() 

389 except KeyboardInterrupt: 

390 pass # Avoid showing "Abort" 

391 finally: 

392 print("\nStopping container...") 

393 container.stop()