Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/shell.py: 31%

73 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Provides a set of tools for executing shell commands as Prefect flows. 

3Includes functionalities for running shell commands ad-hoc or serving them as Prefect flows, 

4with options for logging output, scheduling, and deployment customization. 

5""" 

6 

7import logging 1a

8import subprocess 1a

9import sys 1a

10import threading 1a

11from typing import IO, Any, Callable, Dict, List, Optional 1a

12 

13import typer 1a

14from typing_extensions import Annotated 1a

15 

16from prefect import flow 1a

17from prefect.cli._types import PrefectTyper 1a

18from prefect.cli.root import app 1a

19from prefect.client.schemas.actions import DeploymentScheduleCreate 1a

20from prefect.client.schemas.schedules import CronSchedule 1a

21from prefect.context import tags 1a

22from prefect.exceptions import FailedRun 1a

23from prefect.logging.loggers import get_run_logger 1a

24from prefect.runner import Runner 1a

25from prefect.settings import get_current_settings 1a

26from prefect.types.entrypoint import EntrypointType 1a

27 

28shell_app: PrefectTyper = PrefectTyper( 1a

29 name="shell", help="Serve and watch shell commands as Prefect flows." 

30) 

31app.add_typer(shell_app) 1a

32 

33 

34def output_stream(pipe: IO[str], logger_function: Callable[[str], None]) -> None: 1a

35 """ 

36 Read from a pipe line by line and log using the provided logging function. 

37 

38 Args: 

39 pipe (IO): A file-like object for reading process output. 

40 logger_function (function): A logging function from the logger. 

41 """ 

42 with pipe: 

43 for line in iter(pipe.readline, ""): 

44 logger_function(line.strip()) 

45 

46 

47def output_collect(pipe: IO[str], container: list[str]) -> None: 1a

48 """ 

49 Collects output from a subprocess pipe and stores it in a container list. 

50 

51 Args: 

52 pipe: The output pipe of the subprocess, either stdout or stderr. 

53 container: A list to store the collected output lines. 

54 """ 

55 for line in iter(pipe.readline, ""): 

56 container.append(line) 

57 

58 

59@flow 1a

60def run_shell_process( 1a

61 command: str, 

62 log_output: bool = True, 

63 stream_stdout: bool = False, 

64 log_stderr: bool = False, 

65 popen_kwargs: Optional[Dict[str, Any]] = None, 

66): 

67 """ 

68 Asynchronously executes the specified shell command and logs its output. 

69 

70 This function is designed to be used within Prefect flows to run shell commands as part of task execution. 

71 It handles both the execution of the command and the collection of its output for logging purposes. 

72 

73 Args: 

74 command: The shell command to execute. 

75 log_output: If True, the output of the command (both stdout and stderr) is logged to Prefect. 

76 stream_stdout: If True, the stdout of the command is streamed to Prefect logs. 

77 log_stderr: If True, the stderr of the command is logged to Prefect logs. 

78 popen_kwargs: Additional keyword arguments to pass to the `subprocess.Popen` call. 

79 

80 """ 

81 

82 logger = get_run_logger() if log_output else logging.getLogger("prefect") 

83 

84 # Default Popen kwargs that can be overridden 

85 kwargs = { 

86 "stdout": subprocess.PIPE, 

87 "stderr": subprocess.PIPE, 

88 "shell": True, 

89 "text": True, 

90 "bufsize": 1, 

91 "universal_newlines": True, 

92 } 

93 

94 if popen_kwargs: 

95 kwargs |= popen_kwargs 

96 

97 # Containers for log batching 

98 stdout_container, stderr_container = [], [] 

99 with subprocess.Popen(command, **kwargs) as proc: 

100 # Create threads for collecting stdout and stderr 

101 if stream_stdout: 

102 stdout_logger = logger.info 

103 output = output_stream 

104 else: 

105 stdout_logger = stdout_container 

106 output = output_collect 

107 

108 stdout_thread = threading.Thread( 

109 target=output, args=(proc.stdout, stdout_logger) 

110 ) 

111 

112 stderr_thread = threading.Thread( 

113 target=output_collect, args=(proc.stderr, stderr_container) 

114 ) 

115 

116 stdout_thread.start() 

117 stderr_thread.start() 

118 

119 stdout_thread.join() 

120 stderr_thread.join() 

121 

122 proc.wait() 

123 if stdout_container: 

124 logger.info("".join(stdout_container).strip()) 

125 

126 if stderr_container and log_stderr: 

127 logger.error("".join(stderr_container).strip()) 

128 # Suppress traceback 

129 if proc.returncode != 0: 

130 logger.error("".join(stderr_container).strip()) 

131 sys.tracebacklimit = 0 

132 raise FailedRun(f"Command failed with exit code {proc.returncode}") 

133 

134 

135@shell_app.command("watch") 1a

136async def watch( 1a

137 command: str, 

138 log_output: bool = typer.Option( 

139 True, help="Log the output of the command to Prefect logs." 

140 ), 

141 flow_run_name: str = typer.Option(None, help="Name of the flow run."), 

142 flow_name: str = typer.Option("Shell Command", help="Name of the flow."), 

143 stream_stdout: bool = typer.Option(True, help="Stream the output of the command."), 

144 tag: Annotated[ 

145 Optional[List[str]], typer.Option(help="Optional tags for the flow run.") 

146 ] = None, 

147): 

148 """ 

149 Executes a shell command and observes it as Prefect flow. 

150 

151 Args: 

152 command (str): The shell command to be executed. 

153 log_output (bool, optional): If True, logs the command's output. Defaults to True. 

154 flow_run_name (str, optional): An optional name for the flow run. 

155 flow_name (str, optional): An optional name for the flow. Useful for identification in the Prefect UI. 

156 tag (List[str], optional): An optional list of tags for categorizing and filtering flows in the Prefect UI. 

157 """ 

158 tag = (tag or []) + ["shell"] 

159 

160 # Call the shell_run_command flow with provided arguments 

161 defined_flow = run_shell_process.with_options( 

162 name=flow_name, flow_run_name=flow_run_name 

163 ) 

164 with tags(*tag): 

165 defined_flow( 

166 command=command, log_output=log_output, stream_stdout=stream_stdout 

167 ) 

168 

169 

170@shell_app.command("serve") 1a

171async def serve( 1a

172 command: str, 

173 flow_name: str = typer.Option(..., help="Name of the flow"), 

174 deployment_name: str = typer.Option( 

175 "CLI Runner Deployment", help="Name of the deployment" 

176 ), 

177 deployment_tags: List[str] = typer.Option( 

178 None, "--tag", help="Tag for the deployment (can be provided multiple times)" 

179 ), 

180 log_output: bool = typer.Option( 

181 True, help="Stream the output of the command", hidden=True 

182 ), 

183 stream_stdout: bool = typer.Option(True, help="Stream the output of the command"), 

184 cron_schedule: str = typer.Option(None, help="Cron schedule for the flow"), 

185 timezone: str = typer.Option(None, help="Timezone for the schedule"), 

186 concurrency_limit: int = typer.Option( 

187 None, 

188 min=1, 

189 help="The maximum number of flow runs that can execute at the same time", 

190 ), 

191 run_once: bool = typer.Option( 

192 False, help="Run the agent loop once, instead of forever." 

193 ), 

194): 

195 """ 

196 Creates and serves a Prefect deployment that runs a specified shell command according to a cron schedule or ad hoc. 

197 

198 This function allows users to integrate shell command execution into Prefect workflows seamlessly. It provides options for 

199 scheduled execution via cron expressions, flow and deployment naming for better management, and the application of tags for 

200 easier categorization and filtering within the Prefect UI. Additionally, it supports streaming command output to Prefect logs, 

201 setting concurrency limits to control flow execution, and optionally running the deployment once for ad-hoc tasks. 

202 

203 Args: 

204 command (str): The shell command the flow will execute. 

205 name (str): The name assigned to the flow. This is required.. 

206 deployment_tags (List[str], optional): Optional tags for the deployment to facilitate filtering and organization. 

207 log_output (bool, optional): If True, streams the output of the shell command to the Prefect logs. Defaults to True. 

208 cron_schedule (str, optional): A cron expression that defines when the flow will run. If not provided, the flow can be triggered manually. 

209 timezone (str, optional): The timezone for the cron schedule. This is important if the schedule should align with local time. 

210 concurrency_limit (int, optional): The maximum number of instances of the flow that can run simultaneously. 

211 deployment_name (str, optional): The name of the deployment. This helps distinguish deployments within the Prefect platform. 

212 run_once (bool, optional): When True, the flow will only run once upon deployment initiation, rather than continuously. 

213 """ 

214 schedule = ( 

215 CronSchedule(cron=cron_schedule, timezone=timezone) if cron_schedule else None 

216 ) 

217 defined_flow = run_shell_process.with_options(name=flow_name) 

218 

219 runner_deployment = await defined_flow.to_deployment( 

220 name=deployment_name, 

221 parameters={ 

222 "command": command, 

223 "log_output": log_output, 

224 "stream_stdout": stream_stdout, 

225 }, 

226 entrypoint_type=EntrypointType.MODULE_PATH, 

227 schedules=[DeploymentScheduleCreate(schedule=schedule)] if schedule else [], 

228 tags=(deployment_tags or []) + ["shell"], 

229 ) 

230 

231 runner = Runner(name=flow_name) 

232 deployment_id = await runner.add_deployment(runner_deployment) 

233 help_message = ( 

234 f"[green]Your flow {runner_deployment.flow_name!r} is being served and polling" 

235 " for scheduled runs!\n[/]\nTo trigger a run for this flow, use the following" 

236 " command:\n[blue]\n\t$ prefect deployment run" 

237 f" '{runner_deployment.flow_name}/{deployment_name}'\n[/]" 

238 ) 

239 if ui_url := get_current_settings().ui_url: 

240 help_message += ( 

241 "\nYou can also run your flow via the Prefect UI:" 

242 f" [blue]{ui_url}/deployments/deployment/{deployment_id}[/]\n" 

243 ) 

244 

245 app.console.print(help_message, soft_wrap=True) 

246 await runner.start(run_once=run_once)