Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/processutils.py: 17%

191 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import asyncio 1a

2import os 1a

3import signal 1a

4import subprocess 1a

5import sys 1a

6import threading 1a

7from collections.abc import AsyncGenerator, Mapping 1a

8from contextlib import asynccontextmanager 1a

9from dataclasses import dataclass 1a

10from functools import partial 1a

11from types import FrameType 1a

12from typing import ( 1a

13 IO, 

14 TYPE_CHECKING, 

15 Any, 

16 AnyStr, 

17 Callable, 

18 Optional, 

19 TextIO, 

20 Union, 

21 cast, 

22 overload, 

23) 

24 

25import anyio 1a

26import anyio.abc 1a

27from anyio.streams.text import TextReceiveStream, TextSendStream 1a

28from typing_extensions import TypeAlias, TypeVar 1a

29 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a

31 from _typeshed import StrOrBytesPath 

32 

33TextSink: TypeAlias = Union[anyio.AsyncFile[AnyStr], TextIO, TextSendStream] 1a

34PrintFn: TypeAlias = Callable[[str], object] 1a

35T = TypeVar("T", infer_variance=True) 1a

36 

37if sys.platform == "win32": 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true1a

38 from ctypes import WINFUNCTYPE, c_int, c_uint, windll 

39 

40 _windows_process_group_pids = set() 

41 

42 @WINFUNCTYPE(c_int, c_uint) 

43 def _win32_ctrl_handler(dwCtrlType: object) -> int: 

44 """ 

45 A callback function for handling CTRL events cleanly on Windows. When called, 

46 this function will terminate all running win32 subprocesses the current 

47 process started in new process groups. 

48 """ 

49 for pid in _windows_process_group_pids: 

50 try: 

51 os.kill(pid, signal.CTRL_BREAK_EVENT) 

52 except OSError: 

53 # process is already terminated 

54 pass 

55 

56 # returning 0 lets the next handler in the chain handle the signal 

57 return 0 

58 

59 # anyio process wrapper classes 

60 @dataclass(eq=False) 

61 class StreamReaderWrapper(anyio.abc.ByteReceiveStream): 

62 _stream: asyncio.StreamReader 

63 

64 async def receive(self, max_bytes: int = 65536) -> bytes: 

65 data = await self._stream.read(max_bytes) 

66 if data: 

67 return data 

68 else: 

69 raise anyio.EndOfStream 

70 

71 async def aclose(self) -> None: 

72 self._stream.feed_eof() 

73 

74 @dataclass(eq=False) 

75 class StreamWriterWrapper(anyio.abc.ByteSendStream): 

76 _stream: asyncio.StreamWriter 

77 

78 async def send(self, item: bytes) -> None: 

79 self._stream.write(item) 

80 await self._stream.drain() 

81 

82 async def aclose(self) -> None: 

83 self._stream.close() 

84 

85 @dataclass(eq=False) 

86 class Process(anyio.abc.Process): 

87 _process: asyncio.subprocess.Process 

88 _stdin: Union[StreamWriterWrapper, None] 

89 _stdout: Union[StreamReaderWrapper, None] 

90 _stderr: Union[StreamReaderWrapper, None] 

91 

92 async def aclose(self) -> None: 

93 if self._stdin: 

94 await self._stdin.aclose() 

95 if self._stdout: 

96 await self._stdout.aclose() 

97 if self._stderr: 

98 await self._stderr.aclose() 

99 

100 await self.wait() 

101 

102 async def wait(self) -> int: 

103 return await self._process.wait() 

104 

105 def terminate(self) -> None: 

106 self._process.terminate() 

107 

108 def kill(self) -> None: 

109 self._process.kill() 

110 

111 def send_signal(self, signal: int) -> None: 

112 self._process.send_signal(signal) 

113 

114 @property 

115 def pid(self) -> int: 

116 return self._process.pid 

117 

118 @property 

119 def returncode(self) -> Union[int, None]: 

120 return self._process.returncode 

121 

122 @property 

123 def stdin(self) -> Union[anyio.abc.ByteSendStream, None]: 

124 return self._stdin 

125 

126 @property 

127 def stdout(self) -> Union[anyio.abc.ByteReceiveStream, None]: 

128 return self._stdout 

129 

130 @property 

131 def stderr(self) -> Union[anyio.abc.ByteReceiveStream, None]: 

132 return self._stderr 

133 

134 async def _open_anyio_process( 

135 command: Union[str, bytes, list["StrOrBytesPath"]], 

136 *, 

137 stdin: Union[int, IO[Any], None] = None, 

138 stdout: Union[int, IO[Any], None] = None, 

139 stderr: Union[int, IO[Any], None] = None, 

140 cwd: Optional["StrOrBytesPath"] = None, 

141 env: Optional[Mapping[str, str]] = None, 

142 start_new_session: bool = False, 

143 **kwargs: Any, 

144 ) -> Process: 

145 """ 

146 Open a subprocess and return a `Process` object. 

147 

148 Args: 

149 command: The command to run 

150 kwargs: Additional arguments to pass to `asyncio.create_subprocess_exec` 

151 

152 Returns: 

153 A `Process` object 

154 """ 

155 # call either asyncio.create_subprocess_exec or asyncio.create_subprocess_shell 

156 # depending on whether the command is a list or a string 

157 if isinstance(command, list): 

158 process = await asyncio.create_subprocess_exec( 

159 *command, 

160 stdin=stdin, 

161 stdout=stdout, 

162 stderr=stderr, 

163 cwd=cwd, 

164 env=env, 

165 start_new_session=start_new_session, 

166 **kwargs, 

167 ) 

168 else: 

169 process = await asyncio.create_subprocess_shell( 

170 command, 

171 stdin=stdin, 

172 stdout=stdout, 

173 stderr=stderr, 

174 cwd=cwd, 

175 env=env, 

176 start_new_session=start_new_session, 

177 **kwargs, 

178 ) 

179 

180 return Process( 

181 process, 

182 StreamWriterWrapper(process.stdin) if process.stdin else None, 

183 StreamReaderWrapper(process.stdout) if process.stdout else None, 

184 StreamReaderWrapper(process.stderr) if process.stderr else None, 

185 ) 

186 

187 

188@asynccontextmanager 1a

189async def open_process( 1a

190 command: list[str], **kwargs: Any 

191) -> AsyncGenerator[anyio.abc.Process, Any]: 

192 """ 

193 Like `anyio.open_process` but with: 

194 - Support for Windows command joining 

195 - Termination of the process on exception during yield 

196 - Forced cleanup of process resources during cancellation 

197 """ 

198 # Passing a string to open_process is equivalent to shell=True which is 

199 # generally necessary for Unix-like commands on Windows but otherwise should 

200 # be avoided 

201 if not TYPE_CHECKING: 

202 if not isinstance(command, list): 

203 raise TypeError( 

204 "The command passed to open process must be a list. You passed the command" 

205 f"'{command}', which is type '{type(command)}'." 

206 ) 

207 

208 if sys.platform == "win32": 

209 command = " ".join(command) 

210 process = await _open_anyio_process(command, **kwargs) 

211 else: 

212 process = await anyio.open_process(command, **kwargs) 

213 

214 # if there's a creationflags kwarg and it contains CREATE_NEW_PROCESS_GROUP, 

215 # use SetConsoleCtrlHandler to handle CTRL-C 

216 win32_process_group = False 

217 if ( 

218 sys.platform == "win32" 

219 and "creationflags" in kwargs 

220 and kwargs["creationflags"] & subprocess.CREATE_NEW_PROCESS_GROUP 

221 ): 

222 win32_process_group = True 

223 _windows_process_group_pids.add(process.pid) 

224 # Add a handler for CTRL-C. Re-adding the handler is safe as Windows 

225 # will not add a duplicate handler if _win32_ctrl_handler is 

226 # already registered. 

227 windll.kernel32.SetConsoleCtrlHandler(_win32_ctrl_handler, 1) 

228 

229 try: 

230 async with process: 

231 yield process 

232 finally: 

233 try: 

234 process.terminate() 

235 if sys.platform == "win32" and win32_process_group: 

236 _windows_process_group_pids.remove(process.pid) 

237 

238 except OSError: 

239 # Occurs if the process is already terminated 

240 pass 

241 

242 # Ensure the process resource is closed. If not shielded from cancellation, 

243 # this resource can be left open and the subprocess output can appear after 

244 # the parent process has exited. 

245 with anyio.CancelScope(shield=True): 

246 await process.aclose() 

247 

248 

249@overload 1a

250async def run_process( 250 ↛ exitline 250 didn't return from function 'run_process' because 1a

251 command: list[str], 

252 *, 

253 stream_output: Union[ 

254 bool, tuple[Optional[TextSink[str]], Optional[TextSink[str]]] 

255 ] = ..., 

256 task_status: anyio.abc.TaskStatus[T] = ..., 

257 task_status_handler: Callable[[anyio.abc.Process], T] = ..., 

258 **kwargs: Any, 

259) -> anyio.abc.Process: ... 

260 

261 

262@overload 1a

263async def run_process( 263 ↛ exitline 263 didn't return from function 'run_process' because 1a

264 command: list[str], 

265 *, 

266 stream_output: Union[ 

267 bool, tuple[Optional[TextSink[str]], Optional[TextSink[str]]] 

268 ] = ..., 

269 task_status: Optional[anyio.abc.TaskStatus[int]] = ..., 

270 task_status_handler: None = None, 

271 **kwargs: Any, 

272) -> anyio.abc.Process: ... 

273 

274 

275@overload 1a

276async def run_process( 276 ↛ exitline 276 didn't return from function 'run_process' because 1a

277 command: list[str], 

278 *, 

279 stream_output: Union[ 

280 bool, tuple[Optional[TextSink[str]], Optional[TextSink[str]]] 

281 ] = False, 

282 task_status: Optional[anyio.abc.TaskStatus[T]] = None, 

283 task_status_handler: Optional[Callable[[anyio.abc.Process], T]] = None, 

284 **kwargs: Any, 

285) -> anyio.abc.Process: ... 

286 

287 

288async def run_process( 1a

289 command: list[str], 

290 *, 

291 stream_output: Union[ 

292 bool, tuple[Optional[TextSink[str]], Optional[TextSink[str]]] 

293 ] = False, 

294 task_status: Optional[anyio.abc.TaskStatus[T]] = None, 

295 task_status_handler: Optional[Callable[[anyio.abc.Process], T]] = None, 

296 **kwargs: Any, 

297) -> anyio.abc.Process: 

298 """ 

299 Like `anyio.run_process` but with: 

300 

301 - Use of our `open_process` utility to ensure resources are cleaned up 

302 - Simple `stream_output` support to connect the subprocess to the parent stdout/err 

303 - Support for submission with `TaskGroup.start` marking as 'started' after the 

304 process has been created. When used, the PID is returned to the task status. 

305 

306 """ 

307 if stream_output is True: 

308 stream_output = (sys.stdout, sys.stderr) 

309 

310 async with open_process( 

311 command, 

312 stdout=subprocess.PIPE if stream_output else subprocess.DEVNULL, 

313 stderr=subprocess.PIPE if stream_output else subprocess.DEVNULL, 

314 **kwargs, 

315 ) as process: 

316 if task_status is not None: 

317 value: T = cast(T, process.pid) 

318 if task_status_handler: 

319 value = task_status_handler(process) 

320 task_status.started(value) 

321 

322 if stream_output: 

323 await consume_process_output( 

324 process, stdout_sink=stream_output[0], stderr_sink=stream_output[1] 

325 ) 

326 

327 await process.wait() 

328 

329 return process 

330 

331 

332async def consume_process_output( 1a

333 process: anyio.abc.Process, 

334 stdout_sink: Optional[TextSink[str]] = None, 

335 stderr_sink: Optional[TextSink[str]] = None, 

336) -> None: 

337 async with anyio.create_task_group() as tg: 

338 if process.stdout is not None: 

339 tg.start_soon( 

340 stream_text, 

341 TextReceiveStream(process.stdout), 

342 stdout_sink, 

343 ) 

344 if process.stderr is not None: 

345 tg.start_soon( 

346 stream_text, 

347 TextReceiveStream(process.stderr), 

348 stderr_sink, 

349 ) 

350 

351 

352async def stream_text( 1a

353 source: TextReceiveStream, *sinks: Optional[TextSink[str]] 

354) -> None: 

355 wrapped_sinks = [ 

356 ( 

357 anyio.wrap_file(cast(IO[str], sink)) 

358 if hasattr(sink, "write") and hasattr(sink, "flush") 

359 else sink 

360 ) 

361 for sink in sinks 

362 if sink is not None 

363 ] 

364 async for item in source: 

365 for sink in wrapped_sinks: 

366 if isinstance(sink, TextSendStream): 

367 await sink.send(item) 

368 elif isinstance(sink, anyio.AsyncFile): 

369 await sink.write(item) 

370 await sink.flush() 

371 

372 

373def _register_signal( 1a

374 signum: int, 

375 handler: Optional[ 

376 Union[Callable[[int, Optional[FrameType]], Any], int, signal.Handlers] 

377 ], 

378) -> None: 

379 if threading.current_thread() is threading.main_thread(): 

380 signal.signal(signum, handler) 

381 

382 

383def forward_signal_handler( 1a

384 pid: int, signum: int, *signums: int, process_name: str, print_fn: PrintFn 

385) -> None: 

386 """Forward subsequent signum events (e.g. interrupts) to respective signums.""" 

387 current_signal, future_signals = signums[0], signums[1:] 

388 

389 # avoid RecursionError when setting up a direct signal forward to the same signal for the main pid 

390 original_handler = None 

391 avoid_infinite_recursion = signum == current_signal and pid == os.getpid() 

392 if avoid_infinite_recursion: 

393 # store the vanilla handler so it can be temporarily restored below 

394 original_handler = signal.getsignal(current_signal) 

395 

396 def handler(*arg: Any) -> None: 

397 print_fn( 

398 f"Received {getattr(signum, 'name', signum)}. " 

399 f"Sending {getattr(current_signal, 'name', current_signal)} to" 

400 f" {process_name} (PID {pid})..." 

401 ) 

402 if avoid_infinite_recursion: 

403 signal.signal(current_signal, original_handler) 

404 os.kill(pid, current_signal) 

405 if future_signals: 

406 forward_signal_handler( 

407 pid, 

408 signum, 

409 *future_signals, 

410 process_name=process_name, 

411 print_fn=print_fn, 

412 ) 

413 

414 # register current and future signal handlers 

415 _register_signal(signum, handler) 

416 

417 

418def setup_signal_handlers_server( 1a

419 pid: int, process_name: str, print_fn: PrintFn 

420) -> None: 

421 """Handle interrupts of the server gracefully.""" 

422 setup_handler = partial( 

423 forward_signal_handler, pid, process_name=process_name, print_fn=print_fn 

424 ) 

425 # when server receives a signal, it needs to be propagated to the uvicorn subprocess 

426 if sys.platform == "win32": 

427 # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless: 

428 # https://bugs.python.org/issue26350 

429 setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT) 

430 else: 

431 # first interrupt: SIGTERM, second interrupt: SIGKILL 

432 setup_handler(signal.SIGINT, signal.SIGTERM, signal.SIGKILL) 

433 # forward first SIGTERM directly, send SIGKILL on subsequent SIGTERM 

434 setup_handler(signal.SIGTERM, signal.SIGTERM, signal.SIGKILL) 

435 

436 

437def setup_signal_handlers_agent(pid: int, process_name: str, print_fn: PrintFn) -> None: 1a

438 """Handle interrupts of the agent gracefully.""" 

439 setup_handler = partial( 

440 forward_signal_handler, pid, process_name=process_name, print_fn=print_fn 

441 ) 

442 # when agent receives SIGINT, it stops dequeueing new FlowRuns, and runs until the subprocesses finish 

443 # the signal is not forwarded to subprocesses, so they can continue to run and hopefully still complete 

444 if sys.platform == "win32": 

445 # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless: 

446 # https://bugs.python.org/issue26350 

447 setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT) 

448 else: 

449 # forward first SIGINT directly, send SIGKILL on subsequent interrupt 

450 setup_handler(signal.SIGINT, signal.SIGINT, signal.SIGKILL) 

451 # first SIGTERM: send SIGINT, send SIGKILL on subsequent SIGTERM 

452 setup_handler(signal.SIGTERM, signal.SIGINT, signal.SIGKILL) 

453 

454 

455def setup_signal_handlers_worker( 1a

456 pid: int, process_name: str, print_fn: PrintFn 

457) -> None: 

458 """Handle interrupts of workers gracefully.""" 

459 setup_handler = partial( 

460 forward_signal_handler, pid, process_name=process_name, print_fn=print_fn 

461 ) 

462 # when agent receives SIGINT, it stops dequeueing new FlowRuns, and runs until the subprocesses finish 

463 # the signal is not forwarded to subprocesses, so they can continue to run and hopefully still complete 

464 if sys.platform == "win32": 

465 # on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless: 

466 # https://bugs.python.org/issue26350 

467 setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT) 

468 else: 

469 # forward first SIGINT directly, send SIGKILL on subsequent interrupt 

470 setup_handler(signal.SIGINT, signal.SIGINT, signal.SIGKILL) 

471 # first SIGTERM: send SIGINT, send SIGKILL on subsequent SIGTERM 

472 setup_handler(signal.SIGTERM, signal.SIGINT, signal.SIGKILL) 

473 

474 

475def get_sys_executable() -> str: 1a

476 # python executable needs to be quotable on windows 

477 if os.name == "nt": 

478 executable_path = f'"{sys.executable}"' 

479 else: 

480 executable_path = sys.executable 

481 

482 return executable_path