Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/shell.py: 31%
73 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Provides a set of tools for executing shell commands as Prefect flows.
3Includes functionalities for running shell commands ad-hoc or serving them as Prefect flows,
4with options for logging output, scheduling, and deployment customization.
5"""
7import logging 1a
8import subprocess 1a
9import sys 1a
10import threading 1a
11from typing import IO, Any, Callable, Dict, List, Optional 1a
13import typer 1a
14from typing_extensions import Annotated 1a
16from prefect import flow 1a
17from prefect.cli._types import PrefectTyper 1a
18from prefect.cli.root import app 1a
19from prefect.client.schemas.actions import DeploymentScheduleCreate 1a
20from prefect.client.schemas.schedules import CronSchedule 1a
21from prefect.context import tags 1a
22from prefect.exceptions import FailedRun 1a
23from prefect.logging.loggers import get_run_logger 1a
24from prefect.runner import Runner 1a
25from prefect.settings import get_current_settings 1a
26from prefect.types.entrypoint import EntrypointType 1a
28shell_app: PrefectTyper = PrefectTyper( 1a
29 name="shell", help="Serve and watch shell commands as Prefect flows."
30)
31app.add_typer(shell_app) 1a
34def output_stream(pipe: IO[str], logger_function: Callable[[str], None]) -> None: 1a
35 """
36 Read from a pipe line by line and log using the provided logging function.
38 Args:
39 pipe (IO): A file-like object for reading process output.
40 logger_function (function): A logging function from the logger.
41 """
42 with pipe:
43 for line in iter(pipe.readline, ""):
44 logger_function(line.strip())
47def output_collect(pipe: IO[str], container: list[str]) -> None: 1a
48 """
49 Collects output from a subprocess pipe and stores it in a container list.
51 Args:
52 pipe: The output pipe of the subprocess, either stdout or stderr.
53 container: A list to store the collected output lines.
54 """
55 for line in iter(pipe.readline, ""):
56 container.append(line)
59@flow 1a
60def run_shell_process( 1a
61 command: str,
62 log_output: bool = True,
63 stream_stdout: bool = False,
64 log_stderr: bool = False,
65 popen_kwargs: Optional[Dict[str, Any]] = None,
66):
67 """
68 Asynchronously executes the specified shell command and logs its output.
70 This function is designed to be used within Prefect flows to run shell commands as part of task execution.
71 It handles both the execution of the command and the collection of its output for logging purposes.
73 Args:
74 command: The shell command to execute.
75 log_output: If True, the output of the command (both stdout and stderr) is logged to Prefect.
76 stream_stdout: If True, the stdout of the command is streamed to Prefect logs.
77 log_stderr: If True, the stderr of the command is logged to Prefect logs.
78 popen_kwargs: Additional keyword arguments to pass to the `subprocess.Popen` call.
80 """
82 logger = get_run_logger() if log_output else logging.getLogger("prefect")
84 # Default Popen kwargs that can be overridden
85 kwargs = {
86 "stdout": subprocess.PIPE,
87 "stderr": subprocess.PIPE,
88 "shell": True,
89 "text": True,
90 "bufsize": 1,
91 "universal_newlines": True,
92 }
94 if popen_kwargs:
95 kwargs |= popen_kwargs
97 # Containers for log batching
98 stdout_container, stderr_container = [], []
99 with subprocess.Popen(command, **kwargs) as proc:
100 # Create threads for collecting stdout and stderr
101 if stream_stdout:
102 stdout_logger = logger.info
103 output = output_stream
104 else:
105 stdout_logger = stdout_container
106 output = output_collect
108 stdout_thread = threading.Thread(
109 target=output, args=(proc.stdout, stdout_logger)
110 )
112 stderr_thread = threading.Thread(
113 target=output_collect, args=(proc.stderr, stderr_container)
114 )
116 stdout_thread.start()
117 stderr_thread.start()
119 stdout_thread.join()
120 stderr_thread.join()
122 proc.wait()
123 if stdout_container:
124 logger.info("".join(stdout_container).strip())
126 if stderr_container and log_stderr:
127 logger.error("".join(stderr_container).strip())
128 # Suppress traceback
129 if proc.returncode != 0:
130 logger.error("".join(stderr_container).strip())
131 sys.tracebacklimit = 0
132 raise FailedRun(f"Command failed with exit code {proc.returncode}")
135@shell_app.command("watch") 1a
136async def watch( 1a
137 command: str,
138 log_output: bool = typer.Option(
139 True, help="Log the output of the command to Prefect logs."
140 ),
141 flow_run_name: str = typer.Option(None, help="Name of the flow run."),
142 flow_name: str = typer.Option("Shell Command", help="Name of the flow."),
143 stream_stdout: bool = typer.Option(True, help="Stream the output of the command."),
144 tag: Annotated[
145 Optional[List[str]], typer.Option(help="Optional tags for the flow run.")
146 ] = None,
147):
148 """
149 Executes a shell command and observes it as Prefect flow.
151 Args:
152 command (str): The shell command to be executed.
153 log_output (bool, optional): If True, logs the command's output. Defaults to True.
154 flow_run_name (str, optional): An optional name for the flow run.
155 flow_name (str, optional): An optional name for the flow. Useful for identification in the Prefect UI.
156 tag (List[str], optional): An optional list of tags for categorizing and filtering flows in the Prefect UI.
157 """
158 tag = (tag or []) + ["shell"]
160 # Call the shell_run_command flow with provided arguments
161 defined_flow = run_shell_process.with_options(
162 name=flow_name, flow_run_name=flow_run_name
163 )
164 with tags(*tag):
165 defined_flow(
166 command=command, log_output=log_output, stream_stdout=stream_stdout
167 )
170@shell_app.command("serve") 1a
171async def serve( 1a
172 command: str,
173 flow_name: str = typer.Option(..., help="Name of the flow"),
174 deployment_name: str = typer.Option(
175 "CLI Runner Deployment", help="Name of the deployment"
176 ),
177 deployment_tags: List[str] = typer.Option(
178 None, "--tag", help="Tag for the deployment (can be provided multiple times)"
179 ),
180 log_output: bool = typer.Option(
181 True, help="Stream the output of the command", hidden=True
182 ),
183 stream_stdout: bool = typer.Option(True, help="Stream the output of the command"),
184 cron_schedule: str = typer.Option(None, help="Cron schedule for the flow"),
185 timezone: str = typer.Option(None, help="Timezone for the schedule"),
186 concurrency_limit: int = typer.Option(
187 None,
188 min=1,
189 help="The maximum number of flow runs that can execute at the same time",
190 ),
191 run_once: bool = typer.Option(
192 False, help="Run the agent loop once, instead of forever."
193 ),
194):
195 """
196 Creates and serves a Prefect deployment that runs a specified shell command according to a cron schedule or ad hoc.
198 This function allows users to integrate shell command execution into Prefect workflows seamlessly. It provides options for
199 scheduled execution via cron expressions, flow and deployment naming for better management, and the application of tags for
200 easier categorization and filtering within the Prefect UI. Additionally, it supports streaming command output to Prefect logs,
201 setting concurrency limits to control flow execution, and optionally running the deployment once for ad-hoc tasks.
203 Args:
204 command (str): The shell command the flow will execute.
205 name (str): The name assigned to the flow. This is required..
206 deployment_tags (List[str], optional): Optional tags for the deployment to facilitate filtering and organization.
207 log_output (bool, optional): If True, streams the output of the shell command to the Prefect logs. Defaults to True.
208 cron_schedule (str, optional): A cron expression that defines when the flow will run. If not provided, the flow can be triggered manually.
209 timezone (str, optional): The timezone for the cron schedule. This is important if the schedule should align with local time.
210 concurrency_limit (int, optional): The maximum number of instances of the flow that can run simultaneously.
211 deployment_name (str, optional): The name of the deployment. This helps distinguish deployments within the Prefect platform.
212 run_once (bool, optional): When True, the flow will only run once upon deployment initiation, rather than continuously.
213 """
214 schedule = (
215 CronSchedule(cron=cron_schedule, timezone=timezone) if cron_schedule else None
216 )
217 defined_flow = run_shell_process.with_options(name=flow_name)
219 runner_deployment = await defined_flow.to_deployment(
220 name=deployment_name,
221 parameters={
222 "command": command,
223 "log_output": log_output,
224 "stream_stdout": stream_stdout,
225 },
226 entrypoint_type=EntrypointType.MODULE_PATH,
227 schedules=[DeploymentScheduleCreate(schedule=schedule)] if schedule else [],
228 tags=(deployment_tags or []) + ["shell"],
229 )
231 runner = Runner(name=flow_name)
232 deployment_id = await runner.add_deployment(runner_deployment)
233 help_message = (
234 f"[green]Your flow {runner_deployment.flow_name!r} is being served and polling"
235 " for scheduled runs!\n[/]\nTo trigger a run for this flow, use the following"
236 " command:\n[blue]\n\t$ prefect deployment run"
237 f" '{runner_deployment.flow_name}/{deployment_name}'\n[/]"
238 )
239 if ui_url := get_current_settings().ui_url:
240 help_message += (
241 "\nYou can also run your flow via the Prefect UI:"
242 f" [blue]{ui_url}/deployments/deployment/{deployment_id}[/]\n"
243 )
245 app.console.print(help_message, soft_wrap=True)
246 await runner.start(run_once=run_once)