Coverage for /usr/local/lib/python3.12/site-packages/prefect/task_runners.py: 31%

318 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import abc 1a

4import asyncio 1a

5import concurrent.futures 1a

6import multiprocessing 1a

7import os 1a

8import sys 1a

9import threading 1a

10import uuid 1a

11from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor 1a

12from contextvars import copy_context 1a

13from types import CoroutineType 1a

14from typing import ( 1a

15 TYPE_CHECKING, 

16 Any, 

17 Callable, 

18 Generic, 

19 Iterable, 

20 overload, 

21) 

22 

23from typing_extensions import ParamSpec, Self, TypeVar 1a

24 

25from prefect._internal.uuid7 import uuid7 1a

26from prefect.client.schemas.objects import RunInput 1a

27from prefect.exceptions import MappingLengthMismatch, MappingMissingIterable 1a

28from prefect.futures import ( 1a

29 PrefectConcurrentFuture, 

30 PrefectDistributedFuture, 

31 PrefectFuture, 

32 PrefectFutureList, 

33 wait, 

34) 

35from prefect.logging.loggers import get_logger, get_run_logger 1a

36from prefect.settings.context import get_current_settings 1a

37from prefect.utilities.annotations import allow_failure, quote, unmapped 1a

38from prefect.utilities.callables import ( 1a

39 cloudpickle_wrapped_call, 

40 collapse_variadic_parameters, 

41 explode_variadic_parameter, 

42 get_parameter_defaults, 

43) 

44from prefect.utilities.collections import isiterable 1a

45 

46if TYPE_CHECKING: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1a

47 import logging 

48 

49 from prefect.tasks import Task 

50 

51P = ParamSpec("P") 1a

52T = TypeVar("T") 1a

53R = TypeVar("R") 1a

54F = TypeVar("F", bound=PrefectFuture[Any], default=PrefectConcurrentFuture[Any]) 1a

55 

56 

57class TaskRunner(abc.ABC, Generic[F]): 1a

58 """ 

59 Abstract base class for task runners. 

60 

61 A task runner is responsible for submitting tasks to the task run engine running 

62 in an execution environment. Submitted tasks are non-blocking and return a future 

63 object that can be used to wait for the task to complete and retrieve the result. 

64 

65 Task runners are context managers and should be used in a `with` block to ensure 

66 proper cleanup of resources. 

67 """ 

68 

69 def __init__(self): 1a

70 self.logger: "logging.Logger" = get_logger(f"task_runner.{self.name}") 1a

71 self._started = False 1a

72 

73 @property 1a

74 def name(self) -> str: 1a

75 """The name of this task runner""" 

76 return type(self).__name__.lower().replace("taskrunner", "") 1a

77 

78 @abc.abstractmethod 1a

79 def duplicate(self) -> Self: 1a

80 """Return a new instance of this task runner with the same configuration.""" 

81 ... 

82 

83 @overload 1a

84 @abc.abstractmethod 1a

85 def submit( 85 ↛ exitline 85 didn't return from function 'submit' because 1a

86 self, 

87 task: "Task[P, CoroutineType[Any, Any, R]]", 

88 parameters: dict[str, Any], 

89 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

90 dependencies: dict[str, set[RunInput]] | None = None, 

91 ) -> F: ... 

92 

93 @overload 1a

94 @abc.abstractmethod 1a

95 def submit( 95 ↛ exitline 95 didn't return from function 'submit' because 1a

96 self, 

97 task: "Task[Any, R]", 

98 parameters: dict[str, Any], 

99 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

100 dependencies: dict[str, set[RunInput]] | None = None, 

101 ) -> F: ... 

102 

103 @abc.abstractmethod 1a

104 def submit( 104 ↛ exitline 104 didn't return from function 'submit' because 1a

105 self, 

106 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

107 parameters: dict[str, Any], 

108 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

109 dependencies: dict[str, set[RunInput]] | None = None, 

110 ) -> F: ... 

111 

112 def map( 1a

113 self, 

114 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

115 parameters: dict[str, Any | unmapped[Any] | allow_failure[Any]], 

116 wait_for: Iterable[PrefectFuture[R]] | None = None, 

117 ) -> PrefectFutureList[F]: 

118 """ 

119 Submit multiple tasks to the task run engine. 

120 

121 Args: 

122 task: The task to submit. 

123 parameters: The parameters to use when running the task. 

124 wait_for: A list of futures that the task depends on. 

125 

126 Returns: 

127 An iterable of future objects that can be used to wait for the tasks to 

128 complete and retrieve the results. 

129 """ 

130 if not self._started: 

131 raise RuntimeError( 

132 "The task runner must be started before submitting work." 

133 ) 

134 

135 from prefect.utilities.engine import ( 

136 collect_task_run_inputs_sync, 

137 resolve_inputs_sync, 

138 ) 

139 

140 # We need to resolve some futures to map over their data, collect the upstream 

141 # links beforehand to retain relationship tracking. 

142 task_inputs = { 

143 k: collect_task_run_inputs_sync(v, max_depth=0) 

144 for k, v in parameters.items() 

145 } 

146 

147 # Resolve the top-level parameters in order to get mappable data of a known length. 

148 # Nested parameters will be resolved in each mapped child where their relationships 

149 # will also be tracked. 

150 parameters = resolve_inputs_sync(parameters, max_depth=0) 

151 

152 # Ensure that any parameters in kwargs are expanded before this check 

153 parameters = explode_variadic_parameter(task.fn, parameters) 

154 

155 iterable_parameters: dict[str, Any] = {} 

156 static_parameters: dict[str, Any] = {} 

157 annotated_parameters: dict[str, Any] = {} 

158 for key, val in parameters.items(): 

159 if isinstance(val, (allow_failure, quote)): 

160 # Unwrap annotated parameters to determine if they are iterable 

161 annotated_parameters[key] = val 

162 val = val.unwrap() 

163 

164 if isinstance(val, unmapped): 

165 static_parameters[key] = val.value 

166 elif isiterable(val): 

167 iterable_parameters[key] = list(val) 

168 else: 

169 static_parameters[key] = val 

170 

171 if not len(iterable_parameters): 

172 raise MappingMissingIterable( 

173 "No iterable parameters were received. Parameters for map must " 

174 f"include at least one iterable. Parameters: {parameters}" 

175 ) 

176 

177 iterable_parameter_lengths = { 

178 key: len(val) for key, val in iterable_parameters.items() 

179 } 

180 lengths = set(iterable_parameter_lengths.values()) 

181 if len(lengths) > 1: 

182 raise MappingLengthMismatch( 

183 "Received iterable parameters with different lengths. Parameters for map" 

184 f" must all be the same length. Got lengths: {iterable_parameter_lengths}" 

185 ) 

186 

187 map_length = list(lengths)[0] 

188 

189 futures: list[PrefectFuture[Any]] = [] 

190 for i in range(map_length): 

191 call_parameters: dict[str, Any] = { 

192 key: value[i] for key, value in iterable_parameters.items() 

193 } 

194 call_parameters.update( 

195 {key: value for key, value in static_parameters.items()} 

196 ) 

197 

198 # Add default values for parameters; these are skipped earlier since they should 

199 # not be mapped over 

200 for key, value in get_parameter_defaults(task.fn).items(): 

201 call_parameters.setdefault(key, value) 

202 

203 # Re-apply annotations to each key again 

204 for key, annotation in annotated_parameters.items(): 

205 call_parameters[key] = annotation.rewrap(call_parameters[key]) 

206 

207 # Collapse any previously exploded kwargs 

208 call_parameters = collapse_variadic_parameters(task.fn, call_parameters) 

209 

210 futures.append( 

211 self.submit( 

212 task=task, 

213 parameters=call_parameters, 

214 wait_for=wait_for, 

215 dependencies=task_inputs, 

216 ) 

217 ) 

218 

219 return PrefectFutureList(futures) 

220 

221 def __enter__(self) -> Self: 1a

222 if self._started: 

223 raise RuntimeError("This task runner is already started") 

224 

225 self.logger.debug("Starting task runner") 

226 self._started = True 

227 return self 

228 

229 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 1a

230 self.logger.debug("Stopping task runner") 

231 self._started = False 

232 

233 

234class ThreadPoolTaskRunner(TaskRunner[PrefectConcurrentFuture[R]]): 1a

235 """ 

236 A task runner that executes tasks in a separate thread pool. 

237 

238 Attributes: 

239 max_workers: The maximum number of threads to use for executing tasks. 

240 Defaults to `PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS` or `sys.maxsize`. 

241 

242 Note: 

243 This runner uses `contextvars.copy_context()` for thread-safe context propagation. 

244 However, because contextvars are thread-local, frequent task submissions 

245 that modify context (e.g., using `prefect.tags` in a loop) can lead to 

246 new thread creation per task. This may cause an increase in threads and 

247 file descriptors, potentially hitting OS limits (`OSError: Too many open files`). 

248 If this occurs, consider minimizing context changes within looped tasks or 

249 adjusting system limits for open file descriptors. 

250 

251 Examples: 

252 Use a thread pool task runner with a flow: 

253 

254 ```python 

255 from prefect import flow, task 

256 from prefect.task_runners import ThreadPoolTaskRunner 

257 

258 @task 

259 def some_io_bound_task(x: int) -> int: 

260 # making a query to a database, reading a file, etc. 

261 return x * 2 

262 

263 @flow(task_runner=ThreadPoolTaskRunner(max_workers=3)) # use at most 3 threads at a time 

264 def my_io_bound_flow(): 

265 futures = [] 

266 for i in range(10): 

267 future = some_io_bound_task.submit(i * 100) 

268 futures.append(future) 

269 

270 return [future.result() for future in futures] 

271 ``` 

272 

273 Use a thread pool task runner as a context manager: 

274 

275 ```python 

276 from prefect.task_runners import ThreadPoolTaskRunner 

277 

278 @task 

279 def some_io_bound_task(x: int) -> int: 

280 # making a query to a database, reading a file, etc. 

281 return x * 2 

282 

283 # Use the runner directly 

284 with ThreadPoolTaskRunner(max_workers=2) as runner: 

285 future1 = runner.submit(some_io_bound_task, {"x": 1}) 

286 future2 = runner.submit(some_io_bound_task, {"x": 2}) 

287 

288 result1 = future1.result() # 2 

289 result2 = future2.result() # 4 

290 ``` 

291 

292 Configure max workers via settings: 

293 

294 ```python 

295 # Set via environment variable 

296 # export PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS=8 

297 

298 from prefect import flow 

299 from prefect.task_runners import ThreadPoolTaskRunner 

300 

301 @flow(task_runner=ThreadPoolTaskRunner()) # Uses 8 workers from setting 

302 def my_flow(): 

303 ... 

304 ``` 

305 

306 """ 

307 

308 def __init__(self, max_workers: int | None = None): 1a

309 super().__init__() 1a

310 current_settings = get_current_settings() 1a

311 self._executor: ThreadPoolExecutor | None = None 1a

312 self._max_workers = ( 1a

313 (current_settings.tasks.runner.thread_pool_max_workers or sys.maxsize) 

314 if max_workers is None 

315 else max_workers 

316 ) 

317 self._cancel_events: dict[uuid.UUID, threading.Event] = {} 1a

318 

319 def duplicate(self) -> "ThreadPoolTaskRunner[R]": 1a

320 return type(self)(max_workers=self._max_workers) 

321 

322 @overload 1a

323 def submit( 323 ↛ exitline 323 didn't return from function 'submit' because 1a

324 self, 

325 task: "Task[P, CoroutineType[Any, Any, R]]", 

326 parameters: dict[str, Any], 

327 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

328 dependencies: dict[str, set[RunInput]] | None = None, 

329 ) -> PrefectConcurrentFuture[R]: ... 

330 

331 @overload 1a

332 def submit( 332 ↛ exitline 332 didn't return from function 'submit' because 1a

333 self, 

334 task: "Task[Any, R]", 

335 parameters: dict[str, Any], 

336 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

337 dependencies: dict[str, set[RunInput]] | None = None, 

338 ) -> PrefectConcurrentFuture[R]: ... 

339 

340 def submit( 1a

341 self, 

342 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

343 parameters: dict[str, Any], 

344 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

345 dependencies: dict[str, set[RunInput]] | None = None, 

346 ) -> PrefectConcurrentFuture[R]: 

347 """ 

348 Submit a task to the task run engine running in a separate thread. 

349 

350 Args: 

351 task: The task to submit. 

352 parameters: The parameters to use when running the task. 

353 wait_for: A list of futures that the task depends on. 

354 

355 Returns: 

356 A future object that can be used to wait for the task to complete and 

357 retrieve the result. 

358 """ 

359 if not self._started or self._executor is None: 

360 raise RuntimeError("Task runner is not started") 

361 

362 if wait_for and task.tags and (self._max_workers <= len(task.tags)): 

363 self.logger.warning( 

364 f"Task {task.name} has {len(task.tags)} tags but only {self._max_workers} workers available" 

365 "This may lead to dead-locks. Consider increasing the value of `PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS` or `max_workers`." 

366 ) 

367 

368 from prefect.context import FlowRunContext 

369 from prefect.task_engine import run_task_async, run_task_sync 

370 

371 task_run_id = uuid7() 

372 cancel_event = threading.Event() 

373 self._cancel_events[task_run_id] = cancel_event 

374 context = copy_context() 

375 

376 flow_run_ctx = FlowRunContext.get() 

377 if flow_run_ctx: 

378 get_run_logger(flow_run_ctx).debug( 

379 f"Submitting task {task.name} to thread pool executor..." 

380 ) 

381 else: 

382 self.logger.debug(f"Submitting task {task.name} to thread pool executor...") 

383 

384 submit_kwargs: dict[str, Any] = dict( 

385 task=task, 

386 task_run_id=task_run_id, 

387 parameters=parameters, 

388 wait_for=wait_for, 

389 return_type="state", 

390 dependencies=dependencies, 

391 context=dict(cancel_event=cancel_event), 

392 ) 

393 

394 if task.isasync: 

395 # TODO: Explore possibly using a long-lived thread with an event loop 

396 # for better performance 

397 future = self._executor.submit( 

398 context.run, 

399 asyncio.run, 

400 run_task_async(**submit_kwargs), 

401 ) 

402 else: 

403 future = self._executor.submit( 

404 context.run, 

405 run_task_sync, 

406 **submit_kwargs, 

407 ) 

408 prefect_future = PrefectConcurrentFuture( 

409 task_run_id=task_run_id, wrapped_future=future 

410 ) 

411 return prefect_future 

412 

413 @overload 1a

414 def map( 414 ↛ exitline 414 didn't return from function 'map' because 1a

415 self, 

416 task: "Task[P, CoroutineType[Any, Any, R]]", 

417 parameters: dict[str, Any], 

418 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

419 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: ... 

420 

421 @overload 1a

422 def map( 422 ↛ exitline 422 didn't return from function 'map' because 1a

423 self, 

424 task: "Task[Any, R]", 

425 parameters: dict[str, Any], 

426 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

427 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: ... 

428 

429 def map( 1a

430 self, 

431 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

432 parameters: dict[str, Any], 

433 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

434 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: 

435 return super().map(task, parameters, wait_for) 

436 

437 def cancel_all(self) -> None: 1a

438 for event in self._cancel_events.values(): 

439 event.set() 

440 self.logger.debug("Set cancel event") 

441 

442 if self._executor is not None: 

443 self._executor.shutdown(cancel_futures=True) 

444 self._executor = None 

445 

446 def __enter__(self) -> Self: 1a

447 super().__enter__() 

448 self._executor = ThreadPoolExecutor(max_workers=self._max_workers) 

449 return self 

450 

451 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 1a

452 self.cancel_all() 

453 if self._executor is not None: 

454 self._executor.shutdown(cancel_futures=True) 

455 self._executor = None 

456 super().__exit__(exc_type, exc_value, traceback) 

457 

458 def __eq__(self, value: object) -> bool: 1a

459 if not isinstance(value, ThreadPoolTaskRunner): 

460 return False 

461 return self._max_workers == value._max_workers 

462 

463 

464# Here, we alias ConcurrentTaskRunner to ThreadPoolTaskRunner for backwards compatibility 

465ConcurrentTaskRunner = ThreadPoolTaskRunner 1a

466 

467 

468def _run_task_in_subprocess( 1a

469 *args: Any, 

470 env: dict[str, str] | None = None, 

471 **kwargs: Any, 

472) -> Any: 

473 """ 

474 Wrapper function to update environment variables and settings before running a task in a subprocess. 

475 """ 

476 from prefect.context import hydrated_context 

477 from prefect.engine import handle_engine_signals 

478 from prefect.task_engine import run_task_async, run_task_sync 

479 

480 # Update environment variables 

481 os.environ.update(env or {}) 

482 

483 # Extract context from kwargs 

484 context = kwargs.pop("context", None) 

485 

486 with hydrated_context(context): 

487 with handle_engine_signals(kwargs.get("task_run_id")): 

488 # Determine if this is an async task 

489 task = kwargs.get("task") 

490 if task and task.isasync: 

491 # For async tasks, we need to create a new event loop 

492 import asyncio 

493 

494 maybe_coro = run_task_async(*args, **kwargs) 

495 return asyncio.run(maybe_coro) 

496 else: 

497 return run_task_sync(*args, **kwargs) 

498 

499 

500class _ChainedFuture(concurrent.futures.Future[bytes]): 1a

501 """Wraps a future-of-future and unwraps the result.""" 

502 

503 def __init__( 1a

504 self, 

505 resolution_future: concurrent.futures.Future[concurrent.futures.Future[bytes]], 

506 ): 

507 super().__init__() 

508 self._resolution_future = resolution_future 

509 self._process_future: concurrent.futures.Future[bytes] | None = None 

510 

511 # When resolution completes, hook up to the process future 

512 def on_resolution_done( 

513 fut: concurrent.futures.Future[concurrent.futures.Future[bytes]], 

514 ) -> None: 

515 try: 

516 self._process_future = fut.result() 

517 

518 # Forward process future result to this future 

519 def on_process_done( 

520 process_fut: concurrent.futures.Future[bytes], 

521 ) -> None: 

522 try: 

523 result = process_fut.result() 

524 self.set_result(result) 

525 except Exception as e: 

526 self.set_exception(e) 

527 

528 self._process_future.add_done_callback(on_process_done) 

529 except Exception as e: 

530 self.set_exception(e) 

531 

532 resolution_future.add_done_callback(on_resolution_done) 

533 

534 def cancel(self) -> bool: 1a

535 if self._process_future: 

536 return self._process_future.cancel() 

537 return self._resolution_future.cancel() 

538 

539 def cancelled(self) -> bool: 1a

540 if self._process_future: 

541 return self._process_future.cancelled() 

542 return self._resolution_future.cancelled() 

543 

544 

545class _UnpicklingFuture(concurrent.futures.Future[R]): 1a

546 """Wrapper for a Future that unpickles the result returned by cloudpickle_wrapped_call.""" 

547 

548 def __init__(self, wrapped_future: concurrent.futures.Future[bytes]): 1a

549 self.wrapped_future = wrapped_future 

550 

551 def result(self, timeout: float | None = None) -> R: 1a

552 pickled_result = self.wrapped_future.result(timeout) 

553 import cloudpickle 

554 

555 return cloudpickle.loads(pickled_result) 

556 

557 def exception(self, timeout: float | None = None) -> BaseException | None: 1a

558 return self.wrapped_future.exception(timeout) 

559 

560 def done(self) -> bool: 1a

561 return self.wrapped_future.done() 

562 

563 def cancelled(self) -> bool: 1a

564 return self.wrapped_future.cancelled() 

565 

566 def cancel(self) -> bool: 1a

567 return self.wrapped_future.cancel() 

568 

569 def add_done_callback( 1a

570 self, fn: Callable[[concurrent.futures.Future[R]], object] 

571 ) -> None: 

572 def _fn(wrapped_future: concurrent.futures.Future[bytes]) -> None: 

573 import cloudpickle 

574 

575 result = cloudpickle.loads(wrapped_future.result()) 

576 fn(result) 

577 

578 return self.wrapped_future.add_done_callback(_fn) 

579 

580 

581class ProcessPoolTaskRunner(TaskRunner[PrefectConcurrentFuture[Any]]): 1a

582 """ 

583 A task runner that executes tasks in a separate process pool. 

584 

585 This task runner uses `ProcessPoolExecutor` to run tasks in separate processes, 

586 providing true parallelism for CPU-bound tasks and process isolation. Tasks 

587 are executed with proper context propagation and error handling. 

588 

589 Attributes: 

590 max_workers: The maximum number of processes to use for executing tasks. 

591 Defaults to `multiprocessing.cpu_count()` if `PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS` is not set. 

592 

593 Examples: 

594 Use a process pool task runner with a flow: 

595 

596 ```python 

597 from prefect import flow, task 

598 from prefect.task_runners import ProcessPoolTaskRunner 

599 

600 @task 

601 def compute_heavy_task(n: int) -> int: 

602 # CPU-intensive computation that benefits from process isolation 

603 return sum(i ** 2 for i in range(n)) 

604 

605 @flow(task_runner=ProcessPoolTaskRunner(max_workers=4)) 

606 def my_flow(): 

607 futures = [] 

608 for i in range(10): 

609 future = compute_heavy_task.submit(i * 1000) 

610 futures.append(future) 

611 

612 return [future.result() for future in futures] 

613 ``` 

614 

615 Use a process pool task runner as a context manager: 

616 

617 ```python 

618 from prefect.task_runners import ProcessPoolTaskRunner 

619 

620 @task 

621 def my_task(x: int) -> int: 

622 return x * 2 

623 

624 # Use the runner directly 

625 with ProcessPoolTaskRunner(max_workers=2) as runner: 

626 future1 = runner.submit(my_task, {"x": 1}) 

627 future2 = runner.submit(my_task, {"x": 2}) 

628 

629 result1 = future1.result() # 2 

630 result2 = future2.result() # 4 

631 ``` 

632 

633 Configure max workers via settings: 

634 

635 ```python 

636 # Set via environment variable 

637 # export PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS=8 

638 

639 from prefect import flow 

640 from prefect.task_runners import ProcessPoolTaskRunner 

641 

642 @flow(task_runner=ProcessPoolTaskRunner()) # Uses 8 workers from setting 

643 def my_flow(): 

644 ... 

645 ``` 

646 

647 Note: 

648 Process pool task runners provide process isolation but have overhead for 

649 inter-process communication. They are most beneficial for CPU-bound tasks 

650 that can take advantage of multiple CPU cores. For I/O-bound tasks, 

651 consider using `ThreadPoolTaskRunner` instead. 

652 

653 This runner uses the 'spawn' multiprocessing start method for cross-platform 

654 consistency and to avoid issues with shared state between processes. 

655 

656 All task parameters and return values must be serializable with cloudpickle. 

657 The runner automatically handles context propagation and environment 

658 variable passing to subprocess workers. 

659 """ 

660 

661 def __init__(self, max_workers: int | None = None): 1a

662 super().__init__() 

663 current_settings = get_current_settings() 

664 self._executor: ProcessPoolExecutor | None = None 

665 self._resolver_executor: ThreadPoolExecutor | None = None 

666 self._max_workers = ( 

667 max_workers 

668 or current_settings.tasks.runner.process_pool_max_workers 

669 or multiprocessing.cpu_count() 

670 ) 

671 self._cancel_events: dict[uuid.UUID, multiprocessing.Event] = {} 

672 

673 def duplicate(self) -> Self: 1a

674 return type(self)(max_workers=self._max_workers) 

675 

676 def _resolve_futures_and_submit( 1a

677 self, 

678 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

679 task_run_id: uuid.UUID, 

680 parameters: dict[str, Any], 

681 wait_for: Iterable[PrefectFuture[Any]] | None, 

682 dependencies: dict[str, set[RunInput]] | None, 

683 context: dict[str, Any], 

684 env: dict[str, str], 

685 ) -> concurrent.futures.Future[bytes]: 

686 """ 

687 Helper method that: 

688 1. Waits for all futures in wait_for to complete 

689 2. Resolves any futures in parameters to their actual values 

690 3. Submits the task to the ProcessPoolExecutor with resolved values 

691 

692 This method runs in a background thread to keep submit() non-blocking. 

693 """ 

694 from prefect.utilities.engine import resolve_inputs_sync 

695 

696 # Wait for all futures in wait_for to complete 

697 if wait_for: 

698 wait(list(wait_for)) 

699 

700 # Resolve any futures in parameters to their actual values 

701 resolved_parameters = resolve_inputs_sync( 

702 parameters, return_data=True, max_depth=-1 

703 ) 

704 

705 # Now submit to the process pool with resolved values 

706 submit_kwargs: dict[str, Any] = dict( 

707 task=task, 

708 task_run_id=task_run_id, 

709 parameters=resolved_parameters, 

710 wait_for=None, # Already waited, no need to pass futures to subprocess 

711 return_type="state", 

712 dependencies=dependencies, 

713 context=context, 

714 ) 

715 

716 # Prepare the cloudpickle wrapped call for subprocess execution 

717 wrapped_call = cloudpickle_wrapped_call( 

718 _run_task_in_subprocess, 

719 env=env, 

720 **submit_kwargs, 

721 ) 

722 

723 # Submit to executor 

724 return self._executor.submit(wrapped_call) 

725 

726 @overload 1a

727 def submit( 727 ↛ exitline 727 didn't return from function 'submit' because 1a

728 self, 

729 task: "Task[P, CoroutineType[Any, Any, R]]", 

730 parameters: dict[str, Any], 

731 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

732 dependencies: dict[str, set[RunInput]] | None = None, 

733 ) -> PrefectConcurrentFuture[R]: ... 

734 

735 @overload 1a

736 def submit( 736 ↛ exitline 736 didn't return from function 'submit' because 1a

737 self, 

738 task: "Task[Any, R]", 

739 parameters: dict[str, Any], 

740 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

741 dependencies: dict[str, set[RunInput]] | None = None, 

742 ) -> PrefectConcurrentFuture[R]: ... 

743 

744 def submit( 1a

745 self, 

746 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

747 parameters: dict[str, Any], 

748 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

749 dependencies: dict[str, set[RunInput]] | None = None, 

750 ) -> PrefectConcurrentFuture[R]: 

751 """ 

752 Submit a task to the task run engine running in a separate process. 

753 

754 Args: 

755 task: The task to submit. 

756 parameters: The parameters to use when running the task. 

757 wait_for: A list of futures that the task depends on. 

758 dependencies: A dictionary of dependencies for the task. 

759 

760 Returns: 

761 A future object that can be used to wait for the task to complete and 

762 retrieve the result. 

763 """ 

764 if ( 

765 not self._started 

766 or self._executor is None 

767 or self._resolver_executor is None 

768 ): 

769 raise RuntimeError("Task runner is not started") 

770 

771 if wait_for and task.tags and (self._max_workers <= len(task.tags)): 

772 self.logger.warning( 

773 f"Task {task.name} has {len(task.tags)} tags but only {self._max_workers} workers available" 

774 "This may lead to dead-locks. Consider increasing the value of `PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS` or `max_workers`." 

775 ) 

776 

777 from prefect.context import FlowRunContext 

778 

779 task_run_id = uuid7() 

780 

781 flow_run_ctx = FlowRunContext.get() 

782 if flow_run_ctx: 

783 get_run_logger(flow_run_ctx).debug( 

784 f"Submitting task {task.name} to process pool executor..." 

785 ) 

786 else: 

787 self.logger.debug( 

788 f"Submitting task {task.name} to process pool executor..." 

789 ) 

790 

791 # Serialize the current context for the subprocess 

792 from prefect.context import serialize_context 

793 

794 context = serialize_context() 

795 env = ( 

796 get_current_settings().to_environment_variables(exclude_unset=True) 

797 | os.environ 

798 ) 

799 

800 # Submit the resolution and subprocess execution to a background thread 

801 # This keeps submit() non-blocking while still resolving futures before pickling 

802 resolution_future = self._resolver_executor.submit( 

803 self._resolve_futures_and_submit, 

804 task=task, 

805 task_run_id=task_run_id, 

806 parameters=parameters, 

807 wait_for=wait_for, 

808 dependencies=dependencies, 

809 context=context, 

810 env=env, 

811 ) 

812 

813 # Create a future that chains: thread resolution -> process execution -> unpickling 

814 # We need to wrap the resolution_future's result (which will be a process future) 

815 chained_future = _ChainedFuture(resolution_future) 

816 

817 # Create a PrefectConcurrentFuture that handles unpickling 

818 prefect_future: PrefectConcurrentFuture[R] = PrefectConcurrentFuture( 

819 task_run_id=task_run_id, wrapped_future=_UnpicklingFuture(chained_future) 

820 ) 

821 return prefect_future 

822 

823 @overload 1a

824 def map( 824 ↛ exitline 824 didn't return from function 'map' because 1a

825 self, 

826 task: "Task[P, CoroutineType[Any, Any, R]]", 

827 parameters: dict[str, Any], 

828 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

829 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: ... 

830 

831 @overload 1a

832 def map( 832 ↛ exitline 832 didn't return from function 'map' because 1a

833 self, 

834 task: "Task[Any, R]", 

835 parameters: dict[str, Any], 

836 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

837 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: ... 

838 

839 def map( 1a

840 self, 

841 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

842 parameters: dict[str, Any], 

843 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

844 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: 

845 return super().map(task, parameters, wait_for) 

846 

847 def cancel_all(self) -> None: 1a

848 # Clear cancel events first to avoid resource tracking issues 

849 events_to_set = list(self._cancel_events.values()) 

850 self._cancel_events.clear() 

851 

852 for event in events_to_set: 

853 try: 

854 event.set() 

855 self.logger.debug("Set cancel event") 

856 except (OSError, ValueError): 

857 # Ignore errors if event is already closed/invalid 

858 pass 

859 

860 if self._executor is not None: 

861 self._executor.shutdown(cancel_futures=True, wait=True) 

862 self._executor = None 

863 

864 def __enter__(self) -> Self: 1a

865 super().__enter__() 

866 # Use spawn method for cross-platform consistency and avoiding shared state issues 

867 mp_context = multiprocessing.get_context("spawn") 

868 self._executor = ProcessPoolExecutor( 

869 max_workers=self._max_workers, mp_context=mp_context 

870 ) 

871 # Create a thread pool for resolving futures before submitting to process pool 

872 self._resolver_executor = ThreadPoolExecutor(max_workers=self._max_workers) 

873 return self 

874 

875 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 1a

876 self.cancel_all() 

877 # cancel_all() already shuts down the executor, but double-check 

878 if self._executor is not None: 

879 self._executor.shutdown(cancel_futures=True, wait=True) 

880 self._executor = None 

881 if self._resolver_executor is not None: 

882 self._resolver_executor.shutdown(cancel_futures=True, wait=True) 

883 self._resolver_executor = None 

884 super().__exit__(exc_type, exc_value, traceback) 

885 

886 def __eq__(self, value: object) -> bool: 1a

887 if not isinstance(value, ProcessPoolTaskRunner): 

888 return False 

889 return self._max_workers == value._max_workers 

890 

891 

892class PrefectTaskRunner(TaskRunner[PrefectDistributedFuture[R]]): 1a

893 def __init__(self): 1a

894 super().__init__() 

895 

896 def duplicate(self) -> "PrefectTaskRunner[R]": 1a

897 return type(self)() 

898 

899 @overload 1a

900 def submit( 900 ↛ exitline 900 didn't return from function 'submit' because 1a

901 self, 

902 task: "Task[P, CoroutineType[Any, Any, R]]", 

903 parameters: dict[str, Any], 

904 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

905 dependencies: dict[str, set[RunInput]] | None = None, 

906 ) -> PrefectDistributedFuture[R]: ... 

907 

908 @overload 1a

909 def submit( 909 ↛ exitline 909 didn't return from function 'submit' because 1a

910 self, 

911 task: "Task[Any, R]", 

912 parameters: dict[str, Any], 

913 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

914 dependencies: dict[str, set[RunInput]] | None = None, 

915 ) -> PrefectDistributedFuture[R]: ... 

916 

917 def submit( 1a

918 self, 

919 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

920 parameters: dict[str, Any], 

921 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

922 dependencies: dict[str, set[RunInput]] | None = None, 

923 ) -> PrefectDistributedFuture[R]: 

924 """ 

925 Submit a task to the task run engine running in a separate thread. 

926 

927 Args: 

928 task: The task to submit. 

929 parameters: The parameters to use when running the task. 

930 wait_for: A list of futures that the task depends on. 

931 

932 Returns: 

933 A future object that can be used to wait for the task to complete and 

934 retrieve the result. 

935 """ 

936 if not self._started: 

937 raise RuntimeError("Task runner is not started") 

938 from prefect.context import FlowRunContext 

939 

940 flow_run_ctx = FlowRunContext.get() 

941 if flow_run_ctx: 

942 get_run_logger(flow_run_ctx).info( 

943 f"Submitting task {task.name} to for execution by a Prefect task worker..." 

944 ) 

945 else: 

946 self.logger.info( 

947 f"Submitting task {task.name} to for execution by a Prefect task worker..." 

948 ) 

949 

950 return task.apply_async( 

951 kwargs=parameters, wait_for=wait_for, dependencies=dependencies 

952 ) 

953 

954 @overload 1a

955 def map( 955 ↛ exitline 955 didn't return from function 'map' because 1a

956 self, 

957 task: "Task[P, CoroutineType[Any, Any, R]]", 

958 parameters: dict[str, Any], 

959 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

960 ) -> PrefectFutureList[PrefectDistributedFuture[R]]: ... 

961 

962 @overload 1a

963 def map( 963 ↛ exitline 963 didn't return from function 'map' because 1a

964 self, 

965 task: "Task[Any, R]", 

966 parameters: dict[str, Any], 

967 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

968 ) -> PrefectFutureList[PrefectDistributedFuture[R]]: ... 

969 

970 def map( 1a

971 self, 

972 task: "Task[P, R | CoroutineType[Any, Any, R]]", 

973 parameters: dict[str, Any], 

974 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

975 ) -> PrefectFutureList[PrefectDistributedFuture[R]]: 

976 return super().map(task, parameters, wait_for)