Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/processutils.py: 17%
191 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import asyncio 1a
2import os 1a
3import signal 1a
4import subprocess 1a
5import sys 1a
6import threading 1a
7from collections.abc import AsyncGenerator, Mapping 1a
8from contextlib import asynccontextmanager 1a
9from dataclasses import dataclass 1a
10from functools import partial 1a
11from types import FrameType 1a
12from typing import ( 1a
13 IO,
14 TYPE_CHECKING,
15 Any,
16 AnyStr,
17 Callable,
18 Optional,
19 TextIO,
20 Union,
21 cast,
22 overload,
23)
25import anyio 1a
26import anyio.abc 1a
27from anyio.streams.text import TextReceiveStream, TextSendStream 1a
28from typing_extensions import TypeAlias, TypeVar 1a
30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a
31 from _typeshed import StrOrBytesPath
33TextSink: TypeAlias = Union[anyio.AsyncFile[AnyStr], TextIO, TextSendStream] 1a
34PrintFn: TypeAlias = Callable[[str], object] 1a
35T = TypeVar("T", infer_variance=True) 1a
37if sys.platform == "win32": 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true1a
38 from ctypes import WINFUNCTYPE, c_int, c_uint, windll
40 _windows_process_group_pids = set()
42 @WINFUNCTYPE(c_int, c_uint)
43 def _win32_ctrl_handler(dwCtrlType: object) -> int:
44 """
45 A callback function for handling CTRL events cleanly on Windows. When called,
46 this function will terminate all running win32 subprocesses the current
47 process started in new process groups.
48 """
49 for pid in _windows_process_group_pids:
50 try:
51 os.kill(pid, signal.CTRL_BREAK_EVENT)
52 except OSError:
53 # process is already terminated
54 pass
56 # returning 0 lets the next handler in the chain handle the signal
57 return 0
59 # anyio process wrapper classes
60 @dataclass(eq=False)
61 class StreamReaderWrapper(anyio.abc.ByteReceiveStream):
62 _stream: asyncio.StreamReader
64 async def receive(self, max_bytes: int = 65536) -> bytes:
65 data = await self._stream.read(max_bytes)
66 if data:
67 return data
68 else:
69 raise anyio.EndOfStream
71 async def aclose(self) -> None:
72 self._stream.feed_eof()
74 @dataclass(eq=False)
75 class StreamWriterWrapper(anyio.abc.ByteSendStream):
76 _stream: asyncio.StreamWriter
78 async def send(self, item: bytes) -> None:
79 self._stream.write(item)
80 await self._stream.drain()
82 async def aclose(self) -> None:
83 self._stream.close()
85 @dataclass(eq=False)
86 class Process(anyio.abc.Process):
87 _process: asyncio.subprocess.Process
88 _stdin: Union[StreamWriterWrapper, None]
89 _stdout: Union[StreamReaderWrapper, None]
90 _stderr: Union[StreamReaderWrapper, None]
92 async def aclose(self) -> None:
93 if self._stdin:
94 await self._stdin.aclose()
95 if self._stdout:
96 await self._stdout.aclose()
97 if self._stderr:
98 await self._stderr.aclose()
100 await self.wait()
102 async def wait(self) -> int:
103 return await self._process.wait()
105 def terminate(self) -> None:
106 self._process.terminate()
108 def kill(self) -> None:
109 self._process.kill()
111 def send_signal(self, signal: int) -> None:
112 self._process.send_signal(signal)
114 @property
115 def pid(self) -> int:
116 return self._process.pid
118 @property
119 def returncode(self) -> Union[int, None]:
120 return self._process.returncode
122 @property
123 def stdin(self) -> Union[anyio.abc.ByteSendStream, None]:
124 return self._stdin
126 @property
127 def stdout(self) -> Union[anyio.abc.ByteReceiveStream, None]:
128 return self._stdout
130 @property
131 def stderr(self) -> Union[anyio.abc.ByteReceiveStream, None]:
132 return self._stderr
134 async def _open_anyio_process(
135 command: Union[str, bytes, list["StrOrBytesPath"]],
136 *,
137 stdin: Union[int, IO[Any], None] = None,
138 stdout: Union[int, IO[Any], None] = None,
139 stderr: Union[int, IO[Any], None] = None,
140 cwd: Optional["StrOrBytesPath"] = None,
141 env: Optional[Mapping[str, str]] = None,
142 start_new_session: bool = False,
143 **kwargs: Any,
144 ) -> Process:
145 """
146 Open a subprocess and return a `Process` object.
148 Args:
149 command: The command to run
150 kwargs: Additional arguments to pass to `asyncio.create_subprocess_exec`
152 Returns:
153 A `Process` object
154 """
155 # call either asyncio.create_subprocess_exec or asyncio.create_subprocess_shell
156 # depending on whether the command is a list or a string
157 if isinstance(command, list):
158 process = await asyncio.create_subprocess_exec(
159 *command,
160 stdin=stdin,
161 stdout=stdout,
162 stderr=stderr,
163 cwd=cwd,
164 env=env,
165 start_new_session=start_new_session,
166 **kwargs,
167 )
168 else:
169 process = await asyncio.create_subprocess_shell(
170 command,
171 stdin=stdin,
172 stdout=stdout,
173 stderr=stderr,
174 cwd=cwd,
175 env=env,
176 start_new_session=start_new_session,
177 **kwargs,
178 )
180 return Process(
181 process,
182 StreamWriterWrapper(process.stdin) if process.stdin else None,
183 StreamReaderWrapper(process.stdout) if process.stdout else None,
184 StreamReaderWrapper(process.stderr) if process.stderr else None,
185 )
188@asynccontextmanager 1a
189async def open_process( 1a
190 command: list[str], **kwargs: Any
191) -> AsyncGenerator[anyio.abc.Process, Any]:
192 """
193 Like `anyio.open_process` but with:
194 - Support for Windows command joining
195 - Termination of the process on exception during yield
196 - Forced cleanup of process resources during cancellation
197 """
198 # Passing a string to open_process is equivalent to shell=True which is
199 # generally necessary for Unix-like commands on Windows but otherwise should
200 # be avoided
201 if not TYPE_CHECKING:
202 if not isinstance(command, list):
203 raise TypeError(
204 "The command passed to open process must be a list. You passed the command"
205 f"'{command}', which is type '{type(command)}'."
206 )
208 if sys.platform == "win32":
209 command = " ".join(command)
210 process = await _open_anyio_process(command, **kwargs)
211 else:
212 process = await anyio.open_process(command, **kwargs)
214 # if there's a creationflags kwarg and it contains CREATE_NEW_PROCESS_GROUP,
215 # use SetConsoleCtrlHandler to handle CTRL-C
216 win32_process_group = False
217 if (
218 sys.platform == "win32"
219 and "creationflags" in kwargs
220 and kwargs["creationflags"] & subprocess.CREATE_NEW_PROCESS_GROUP
221 ):
222 win32_process_group = True
223 _windows_process_group_pids.add(process.pid)
224 # Add a handler for CTRL-C. Re-adding the handler is safe as Windows
225 # will not add a duplicate handler if _win32_ctrl_handler is
226 # already registered.
227 windll.kernel32.SetConsoleCtrlHandler(_win32_ctrl_handler, 1)
229 try:
230 async with process:
231 yield process
232 finally:
233 try:
234 process.terminate()
235 if sys.platform == "win32" and win32_process_group:
236 _windows_process_group_pids.remove(process.pid)
238 except OSError:
239 # Occurs if the process is already terminated
240 pass
242 # Ensure the process resource is closed. If not shielded from cancellation,
243 # this resource can be left open and the subprocess output can appear after
244 # the parent process has exited.
245 with anyio.CancelScope(shield=True):
246 await process.aclose()
249@overload 1a
250async def run_process( 250 ↛ exitline 250 didn't return from function 'run_process' because 1a
251 command: list[str],
252 *,
253 stream_output: Union[
254 bool, tuple[Optional[TextSink[str]], Optional[TextSink[str]]]
255 ] = ...,
256 task_status: anyio.abc.TaskStatus[T] = ...,
257 task_status_handler: Callable[[anyio.abc.Process], T] = ...,
258 **kwargs: Any,
259) -> anyio.abc.Process: ...
262@overload 1a
263async def run_process( 263 ↛ exitline 263 didn't return from function 'run_process' because 1a
264 command: list[str],
265 *,
266 stream_output: Union[
267 bool, tuple[Optional[TextSink[str]], Optional[TextSink[str]]]
268 ] = ...,
269 task_status: Optional[anyio.abc.TaskStatus[int]] = ...,
270 task_status_handler: None = None,
271 **kwargs: Any,
272) -> anyio.abc.Process: ...
275@overload 1a
276async def run_process( 276 ↛ exitline 276 didn't return from function 'run_process' because 1a
277 command: list[str],
278 *,
279 stream_output: Union[
280 bool, tuple[Optional[TextSink[str]], Optional[TextSink[str]]]
281 ] = False,
282 task_status: Optional[anyio.abc.TaskStatus[T]] = None,
283 task_status_handler: Optional[Callable[[anyio.abc.Process], T]] = None,
284 **kwargs: Any,
285) -> anyio.abc.Process: ...
288async def run_process( 1a
289 command: list[str],
290 *,
291 stream_output: Union[
292 bool, tuple[Optional[TextSink[str]], Optional[TextSink[str]]]
293 ] = False,
294 task_status: Optional[anyio.abc.TaskStatus[T]] = None,
295 task_status_handler: Optional[Callable[[anyio.abc.Process], T]] = None,
296 **kwargs: Any,
297) -> anyio.abc.Process:
298 """
299 Like `anyio.run_process` but with:
301 - Use of our `open_process` utility to ensure resources are cleaned up
302 - Simple `stream_output` support to connect the subprocess to the parent stdout/err
303 - Support for submission with `TaskGroup.start` marking as 'started' after the
304 process has been created. When used, the PID is returned to the task status.
306 """
307 if stream_output is True:
308 stream_output = (sys.stdout, sys.stderr)
310 async with open_process(
311 command,
312 stdout=subprocess.PIPE if stream_output else subprocess.DEVNULL,
313 stderr=subprocess.PIPE if stream_output else subprocess.DEVNULL,
314 **kwargs,
315 ) as process:
316 if task_status is not None:
317 value: T = cast(T, process.pid)
318 if task_status_handler:
319 value = task_status_handler(process)
320 task_status.started(value)
322 if stream_output:
323 await consume_process_output(
324 process, stdout_sink=stream_output[0], stderr_sink=stream_output[1]
325 )
327 await process.wait()
329 return process
332async def consume_process_output( 1a
333 process: anyio.abc.Process,
334 stdout_sink: Optional[TextSink[str]] = None,
335 stderr_sink: Optional[TextSink[str]] = None,
336) -> None:
337 async with anyio.create_task_group() as tg:
338 if process.stdout is not None:
339 tg.start_soon(
340 stream_text,
341 TextReceiveStream(process.stdout),
342 stdout_sink,
343 )
344 if process.stderr is not None:
345 tg.start_soon(
346 stream_text,
347 TextReceiveStream(process.stderr),
348 stderr_sink,
349 )
352async def stream_text( 1a
353 source: TextReceiveStream, *sinks: Optional[TextSink[str]]
354) -> None:
355 wrapped_sinks = [
356 (
357 anyio.wrap_file(cast(IO[str], sink))
358 if hasattr(sink, "write") and hasattr(sink, "flush")
359 else sink
360 )
361 for sink in sinks
362 if sink is not None
363 ]
364 async for item in source:
365 for sink in wrapped_sinks:
366 if isinstance(sink, TextSendStream):
367 await sink.send(item)
368 elif isinstance(sink, anyio.AsyncFile):
369 await sink.write(item)
370 await sink.flush()
373def _register_signal( 1a
374 signum: int,
375 handler: Optional[
376 Union[Callable[[int, Optional[FrameType]], Any], int, signal.Handlers]
377 ],
378) -> None:
379 if threading.current_thread() is threading.main_thread():
380 signal.signal(signum, handler)
383def forward_signal_handler( 1a
384 pid: int, signum: int, *signums: int, process_name: str, print_fn: PrintFn
385) -> None:
386 """Forward subsequent signum events (e.g. interrupts) to respective signums."""
387 current_signal, future_signals = signums[0], signums[1:]
389 # avoid RecursionError when setting up a direct signal forward to the same signal for the main pid
390 original_handler = None
391 avoid_infinite_recursion = signum == current_signal and pid == os.getpid()
392 if avoid_infinite_recursion:
393 # store the vanilla handler so it can be temporarily restored below
394 original_handler = signal.getsignal(current_signal)
396 def handler(*arg: Any) -> None:
397 print_fn(
398 f"Received {getattr(signum, 'name', signum)}. "
399 f"Sending {getattr(current_signal, 'name', current_signal)} to"
400 f" {process_name} (PID {pid})..."
401 )
402 if avoid_infinite_recursion:
403 signal.signal(current_signal, original_handler)
404 os.kill(pid, current_signal)
405 if future_signals:
406 forward_signal_handler(
407 pid,
408 signum,
409 *future_signals,
410 process_name=process_name,
411 print_fn=print_fn,
412 )
414 # register current and future signal handlers
415 _register_signal(signum, handler)
418def setup_signal_handlers_server( 1a
419 pid: int, process_name: str, print_fn: PrintFn
420) -> None:
421 """Handle interrupts of the server gracefully."""
422 setup_handler = partial(
423 forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
424 )
425 # when server receives a signal, it needs to be propagated to the uvicorn subprocess
426 if sys.platform == "win32":
427 # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
428 # https://bugs.python.org/issue26350
429 setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
430 else:
431 # first interrupt: SIGTERM, second interrupt: SIGKILL
432 setup_handler(signal.SIGINT, signal.SIGTERM, signal.SIGKILL)
433 # forward first SIGTERM directly, send SIGKILL on subsequent SIGTERM
434 setup_handler(signal.SIGTERM, signal.SIGTERM, signal.SIGKILL)
437def setup_signal_handlers_agent(pid: int, process_name: str, print_fn: PrintFn) -> None: 1a
438 """Handle interrupts of the agent gracefully."""
439 setup_handler = partial(
440 forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
441 )
442 # when agent receives SIGINT, it stops dequeueing new FlowRuns, and runs until the subprocesses finish
443 # the signal is not forwarded to subprocesses, so they can continue to run and hopefully still complete
444 if sys.platform == "win32":
445 # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
446 # https://bugs.python.org/issue26350
447 setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
448 else:
449 # forward first SIGINT directly, send SIGKILL on subsequent interrupt
450 setup_handler(signal.SIGINT, signal.SIGINT, signal.SIGKILL)
451 # first SIGTERM: send SIGINT, send SIGKILL on subsequent SIGTERM
452 setup_handler(signal.SIGTERM, signal.SIGINT, signal.SIGKILL)
455def setup_signal_handlers_worker( 1a
456 pid: int, process_name: str, print_fn: PrintFn
457) -> None:
458 """Handle interrupts of workers gracefully."""
459 setup_handler = partial(
460 forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
461 )
462 # when agent receives SIGINT, it stops dequeueing new FlowRuns, and runs until the subprocesses finish
463 # the signal is not forwarded to subprocesses, so they can continue to run and hopefully still complete
464 if sys.platform == "win32":
465 # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
466 # https://bugs.python.org/issue26350
467 setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
468 else:
469 # forward first SIGINT directly, send SIGKILL on subsequent interrupt
470 setup_handler(signal.SIGINT, signal.SIGINT, signal.SIGKILL)
471 # first SIGTERM: send SIGINT, send SIGKILL on subsequent SIGTERM
472 setup_handler(signal.SIGTERM, signal.SIGINT, signal.SIGKILL)
475def get_sys_executable() -> str: 1a
476 # python executable needs to be quotable on windows
477 if os.name == "nt":
478 executable_path = f'"{sys.executable}"'
479 else:
480 executable_path = sys.executable
482 return executable_path