Coverage for /usr/local/lib/python3.12/site-packages/prefect/task_runners.py: 31%
318 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import abc 1a
4import asyncio 1a
5import concurrent.futures 1a
6import multiprocessing 1a
7import os 1a
8import sys 1a
9import threading 1a
10import uuid 1a
11from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor 1a
12from contextvars import copy_context 1a
13from types import CoroutineType 1a
14from typing import ( 1a
15 TYPE_CHECKING,
16 Any,
17 Callable,
18 Generic,
19 Iterable,
20 overload,
21)
23from typing_extensions import ParamSpec, Self, TypeVar 1a
25from prefect._internal.uuid7 import uuid7 1a
26from prefect.client.schemas.objects import RunInput 1a
27from prefect.exceptions import MappingLengthMismatch, MappingMissingIterable 1a
28from prefect.futures import ( 1a
29 PrefectConcurrentFuture,
30 PrefectDistributedFuture,
31 PrefectFuture,
32 PrefectFutureList,
33 wait,
34)
35from prefect.logging.loggers import get_logger, get_run_logger 1a
36from prefect.settings.context import get_current_settings 1a
37from prefect.utilities.annotations import allow_failure, quote, unmapped 1a
38from prefect.utilities.callables import ( 1a
39 cloudpickle_wrapped_call,
40 collapse_variadic_parameters,
41 explode_variadic_parameter,
42 get_parameter_defaults,
43)
44from prefect.utilities.collections import isiterable 1a
46if TYPE_CHECKING: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1a
47 import logging
49 from prefect.tasks import Task
51P = ParamSpec("P") 1a
52T = TypeVar("T") 1a
53R = TypeVar("R") 1a
54F = TypeVar("F", bound=PrefectFuture[Any], default=PrefectConcurrentFuture[Any]) 1a
57class TaskRunner(abc.ABC, Generic[F]): 1a
58 """
59 Abstract base class for task runners.
61 A task runner is responsible for submitting tasks to the task run engine running
62 in an execution environment. Submitted tasks are non-blocking and return a future
63 object that can be used to wait for the task to complete and retrieve the result.
65 Task runners are context managers and should be used in a `with` block to ensure
66 proper cleanup of resources.
67 """
69 def __init__(self): 1a
70 self.logger: "logging.Logger" = get_logger(f"task_runner.{self.name}") 1a
71 self._started = False 1a
73 @property 1a
74 def name(self) -> str: 1a
75 """The name of this task runner"""
76 return type(self).__name__.lower().replace("taskrunner", "") 1a
78 @abc.abstractmethod 1a
79 def duplicate(self) -> Self: 1a
80 """Return a new instance of this task runner with the same configuration."""
81 ...
83 @overload 1a
84 @abc.abstractmethod 1a
85 def submit( 85 ↛ exitline 85 didn't return from function 'submit' because 1a
86 self,
87 task: "Task[P, CoroutineType[Any, Any, R]]",
88 parameters: dict[str, Any],
89 wait_for: Iterable[PrefectFuture[Any]] | None = None,
90 dependencies: dict[str, set[RunInput]] | None = None,
91 ) -> F: ...
93 @overload 1a
94 @abc.abstractmethod 1a
95 def submit( 95 ↛ exitline 95 didn't return from function 'submit' because 1a
96 self,
97 task: "Task[Any, R]",
98 parameters: dict[str, Any],
99 wait_for: Iterable[PrefectFuture[Any]] | None = None,
100 dependencies: dict[str, set[RunInput]] | None = None,
101 ) -> F: ...
103 @abc.abstractmethod 1a
104 def submit( 104 ↛ exitline 104 didn't return from function 'submit' because 1a
105 self,
106 task: "Task[P, R | CoroutineType[Any, Any, R]]",
107 parameters: dict[str, Any],
108 wait_for: Iterable[PrefectFuture[Any]] | None = None,
109 dependencies: dict[str, set[RunInput]] | None = None,
110 ) -> F: ...
112 def map( 1a
113 self,
114 task: "Task[P, R | CoroutineType[Any, Any, R]]",
115 parameters: dict[str, Any | unmapped[Any] | allow_failure[Any]],
116 wait_for: Iterable[PrefectFuture[R]] | None = None,
117 ) -> PrefectFutureList[F]:
118 """
119 Submit multiple tasks to the task run engine.
121 Args:
122 task: The task to submit.
123 parameters: The parameters to use when running the task.
124 wait_for: A list of futures that the task depends on.
126 Returns:
127 An iterable of future objects that can be used to wait for the tasks to
128 complete and retrieve the results.
129 """
130 if not self._started:
131 raise RuntimeError(
132 "The task runner must be started before submitting work."
133 )
135 from prefect.utilities.engine import (
136 collect_task_run_inputs_sync,
137 resolve_inputs_sync,
138 )
140 # We need to resolve some futures to map over their data, collect the upstream
141 # links beforehand to retain relationship tracking.
142 task_inputs = {
143 k: collect_task_run_inputs_sync(v, max_depth=0)
144 for k, v in parameters.items()
145 }
147 # Resolve the top-level parameters in order to get mappable data of a known length.
148 # Nested parameters will be resolved in each mapped child where their relationships
149 # will also be tracked.
150 parameters = resolve_inputs_sync(parameters, max_depth=0)
152 # Ensure that any parameters in kwargs are expanded before this check
153 parameters = explode_variadic_parameter(task.fn, parameters)
155 iterable_parameters: dict[str, Any] = {}
156 static_parameters: dict[str, Any] = {}
157 annotated_parameters: dict[str, Any] = {}
158 for key, val in parameters.items():
159 if isinstance(val, (allow_failure, quote)):
160 # Unwrap annotated parameters to determine if they are iterable
161 annotated_parameters[key] = val
162 val = val.unwrap()
164 if isinstance(val, unmapped):
165 static_parameters[key] = val.value
166 elif isiterable(val):
167 iterable_parameters[key] = list(val)
168 else:
169 static_parameters[key] = val
171 if not len(iterable_parameters):
172 raise MappingMissingIterable(
173 "No iterable parameters were received. Parameters for map must "
174 f"include at least one iterable. Parameters: {parameters}"
175 )
177 iterable_parameter_lengths = {
178 key: len(val) for key, val in iterable_parameters.items()
179 }
180 lengths = set(iterable_parameter_lengths.values())
181 if len(lengths) > 1:
182 raise MappingLengthMismatch(
183 "Received iterable parameters with different lengths. Parameters for map"
184 f" must all be the same length. Got lengths: {iterable_parameter_lengths}"
185 )
187 map_length = list(lengths)[0]
189 futures: list[PrefectFuture[Any]] = []
190 for i in range(map_length):
191 call_parameters: dict[str, Any] = {
192 key: value[i] for key, value in iterable_parameters.items()
193 }
194 call_parameters.update(
195 {key: value for key, value in static_parameters.items()}
196 )
198 # Add default values for parameters; these are skipped earlier since they should
199 # not be mapped over
200 for key, value in get_parameter_defaults(task.fn).items():
201 call_parameters.setdefault(key, value)
203 # Re-apply annotations to each key again
204 for key, annotation in annotated_parameters.items():
205 call_parameters[key] = annotation.rewrap(call_parameters[key])
207 # Collapse any previously exploded kwargs
208 call_parameters = collapse_variadic_parameters(task.fn, call_parameters)
210 futures.append(
211 self.submit(
212 task=task,
213 parameters=call_parameters,
214 wait_for=wait_for,
215 dependencies=task_inputs,
216 )
217 )
219 return PrefectFutureList(futures)
221 def __enter__(self) -> Self: 1a
222 if self._started:
223 raise RuntimeError("This task runner is already started")
225 self.logger.debug("Starting task runner")
226 self._started = True
227 return self
229 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 1a
230 self.logger.debug("Stopping task runner")
231 self._started = False
234class ThreadPoolTaskRunner(TaskRunner[PrefectConcurrentFuture[R]]): 1a
235 """
236 A task runner that executes tasks in a separate thread pool.
238 Attributes:
239 max_workers: The maximum number of threads to use for executing tasks.
240 Defaults to `PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS` or `sys.maxsize`.
242 Note:
243 This runner uses `contextvars.copy_context()` for thread-safe context propagation.
244 However, because contextvars are thread-local, frequent task submissions
245 that modify context (e.g., using `prefect.tags` in a loop) can lead to
246 new thread creation per task. This may cause an increase in threads and
247 file descriptors, potentially hitting OS limits (`OSError: Too many open files`).
248 If this occurs, consider minimizing context changes within looped tasks or
249 adjusting system limits for open file descriptors.
251 Examples:
252 Use a thread pool task runner with a flow:
254 ```python
255 from prefect import flow, task
256 from prefect.task_runners import ThreadPoolTaskRunner
258 @task
259 def some_io_bound_task(x: int) -> int:
260 # making a query to a database, reading a file, etc.
261 return x * 2
263 @flow(task_runner=ThreadPoolTaskRunner(max_workers=3)) # use at most 3 threads at a time
264 def my_io_bound_flow():
265 futures = []
266 for i in range(10):
267 future = some_io_bound_task.submit(i * 100)
268 futures.append(future)
270 return [future.result() for future in futures]
271 ```
273 Use a thread pool task runner as a context manager:
275 ```python
276 from prefect.task_runners import ThreadPoolTaskRunner
278 @task
279 def some_io_bound_task(x: int) -> int:
280 # making a query to a database, reading a file, etc.
281 return x * 2
283 # Use the runner directly
284 with ThreadPoolTaskRunner(max_workers=2) as runner:
285 future1 = runner.submit(some_io_bound_task, {"x": 1})
286 future2 = runner.submit(some_io_bound_task, {"x": 2})
288 result1 = future1.result() # 2
289 result2 = future2.result() # 4
290 ```
292 Configure max workers via settings:
294 ```python
295 # Set via environment variable
296 # export PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS=8
298 from prefect import flow
299 from prefect.task_runners import ThreadPoolTaskRunner
301 @flow(task_runner=ThreadPoolTaskRunner()) # Uses 8 workers from setting
302 def my_flow():
303 ...
304 ```
306 """
308 def __init__(self, max_workers: int | None = None): 1a
309 super().__init__() 1a
310 current_settings = get_current_settings() 1a
311 self._executor: ThreadPoolExecutor | None = None 1a
312 self._max_workers = ( 1a
313 (current_settings.tasks.runner.thread_pool_max_workers or sys.maxsize)
314 if max_workers is None
315 else max_workers
316 )
317 self._cancel_events: dict[uuid.UUID, threading.Event] = {} 1a
319 def duplicate(self) -> "ThreadPoolTaskRunner[R]": 1a
320 return type(self)(max_workers=self._max_workers)
322 @overload 1a
323 def submit( 323 ↛ exitline 323 didn't return from function 'submit' because 1a
324 self,
325 task: "Task[P, CoroutineType[Any, Any, R]]",
326 parameters: dict[str, Any],
327 wait_for: Iterable[PrefectFuture[Any]] | None = None,
328 dependencies: dict[str, set[RunInput]] | None = None,
329 ) -> PrefectConcurrentFuture[R]: ...
331 @overload 1a
332 def submit( 332 ↛ exitline 332 didn't return from function 'submit' because 1a
333 self,
334 task: "Task[Any, R]",
335 parameters: dict[str, Any],
336 wait_for: Iterable[PrefectFuture[Any]] | None = None,
337 dependencies: dict[str, set[RunInput]] | None = None,
338 ) -> PrefectConcurrentFuture[R]: ...
340 def submit( 1a
341 self,
342 task: "Task[P, R | CoroutineType[Any, Any, R]]",
343 parameters: dict[str, Any],
344 wait_for: Iterable[PrefectFuture[Any]] | None = None,
345 dependencies: dict[str, set[RunInput]] | None = None,
346 ) -> PrefectConcurrentFuture[R]:
347 """
348 Submit a task to the task run engine running in a separate thread.
350 Args:
351 task: The task to submit.
352 parameters: The parameters to use when running the task.
353 wait_for: A list of futures that the task depends on.
355 Returns:
356 A future object that can be used to wait for the task to complete and
357 retrieve the result.
358 """
359 if not self._started or self._executor is None:
360 raise RuntimeError("Task runner is not started")
362 if wait_for and task.tags and (self._max_workers <= len(task.tags)):
363 self.logger.warning(
364 f"Task {task.name} has {len(task.tags)} tags but only {self._max_workers} workers available"
365 "This may lead to dead-locks. Consider increasing the value of `PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS` or `max_workers`."
366 )
368 from prefect.context import FlowRunContext
369 from prefect.task_engine import run_task_async, run_task_sync
371 task_run_id = uuid7()
372 cancel_event = threading.Event()
373 self._cancel_events[task_run_id] = cancel_event
374 context = copy_context()
376 flow_run_ctx = FlowRunContext.get()
377 if flow_run_ctx:
378 get_run_logger(flow_run_ctx).debug(
379 f"Submitting task {task.name} to thread pool executor..."
380 )
381 else:
382 self.logger.debug(f"Submitting task {task.name} to thread pool executor...")
384 submit_kwargs: dict[str, Any] = dict(
385 task=task,
386 task_run_id=task_run_id,
387 parameters=parameters,
388 wait_for=wait_for,
389 return_type="state",
390 dependencies=dependencies,
391 context=dict(cancel_event=cancel_event),
392 )
394 if task.isasync:
395 # TODO: Explore possibly using a long-lived thread with an event loop
396 # for better performance
397 future = self._executor.submit(
398 context.run,
399 asyncio.run,
400 run_task_async(**submit_kwargs),
401 )
402 else:
403 future = self._executor.submit(
404 context.run,
405 run_task_sync,
406 **submit_kwargs,
407 )
408 prefect_future = PrefectConcurrentFuture(
409 task_run_id=task_run_id, wrapped_future=future
410 )
411 return prefect_future
413 @overload 1a
414 def map( 414 ↛ exitline 414 didn't return from function 'map' because 1a
415 self,
416 task: "Task[P, CoroutineType[Any, Any, R]]",
417 parameters: dict[str, Any],
418 wait_for: Iterable[PrefectFuture[Any]] | None = None,
419 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: ...
421 @overload 1a
422 def map( 422 ↛ exitline 422 didn't return from function 'map' because 1a
423 self,
424 task: "Task[Any, R]",
425 parameters: dict[str, Any],
426 wait_for: Iterable[PrefectFuture[Any]] | None = None,
427 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: ...
429 def map( 1a
430 self,
431 task: "Task[P, R | CoroutineType[Any, Any, R]]",
432 parameters: dict[str, Any],
433 wait_for: Iterable[PrefectFuture[Any]] | None = None,
434 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]:
435 return super().map(task, parameters, wait_for)
437 def cancel_all(self) -> None: 1a
438 for event in self._cancel_events.values():
439 event.set()
440 self.logger.debug("Set cancel event")
442 if self._executor is not None:
443 self._executor.shutdown(cancel_futures=True)
444 self._executor = None
446 def __enter__(self) -> Self: 1a
447 super().__enter__()
448 self._executor = ThreadPoolExecutor(max_workers=self._max_workers)
449 return self
451 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 1a
452 self.cancel_all()
453 if self._executor is not None:
454 self._executor.shutdown(cancel_futures=True)
455 self._executor = None
456 super().__exit__(exc_type, exc_value, traceback)
458 def __eq__(self, value: object) -> bool: 1a
459 if not isinstance(value, ThreadPoolTaskRunner):
460 return False
461 return self._max_workers == value._max_workers
464# Here, we alias ConcurrentTaskRunner to ThreadPoolTaskRunner for backwards compatibility
465ConcurrentTaskRunner = ThreadPoolTaskRunner 1a
468def _run_task_in_subprocess( 1a
469 *args: Any,
470 env: dict[str, str] | None = None,
471 **kwargs: Any,
472) -> Any:
473 """
474 Wrapper function to update environment variables and settings before running a task in a subprocess.
475 """
476 from prefect.context import hydrated_context
477 from prefect.engine import handle_engine_signals
478 from prefect.task_engine import run_task_async, run_task_sync
480 # Update environment variables
481 os.environ.update(env or {})
483 # Extract context from kwargs
484 context = kwargs.pop("context", None)
486 with hydrated_context(context):
487 with handle_engine_signals(kwargs.get("task_run_id")):
488 # Determine if this is an async task
489 task = kwargs.get("task")
490 if task and task.isasync:
491 # For async tasks, we need to create a new event loop
492 import asyncio
494 maybe_coro = run_task_async(*args, **kwargs)
495 return asyncio.run(maybe_coro)
496 else:
497 return run_task_sync(*args, **kwargs)
500class _ChainedFuture(concurrent.futures.Future[bytes]): 1a
501 """Wraps a future-of-future and unwraps the result."""
503 def __init__( 1a
504 self,
505 resolution_future: concurrent.futures.Future[concurrent.futures.Future[bytes]],
506 ):
507 super().__init__()
508 self._resolution_future = resolution_future
509 self._process_future: concurrent.futures.Future[bytes] | None = None
511 # When resolution completes, hook up to the process future
512 def on_resolution_done(
513 fut: concurrent.futures.Future[concurrent.futures.Future[bytes]],
514 ) -> None:
515 try:
516 self._process_future = fut.result()
518 # Forward process future result to this future
519 def on_process_done(
520 process_fut: concurrent.futures.Future[bytes],
521 ) -> None:
522 try:
523 result = process_fut.result()
524 self.set_result(result)
525 except Exception as e:
526 self.set_exception(e)
528 self._process_future.add_done_callback(on_process_done)
529 except Exception as e:
530 self.set_exception(e)
532 resolution_future.add_done_callback(on_resolution_done)
534 def cancel(self) -> bool: 1a
535 if self._process_future:
536 return self._process_future.cancel()
537 return self._resolution_future.cancel()
539 def cancelled(self) -> bool: 1a
540 if self._process_future:
541 return self._process_future.cancelled()
542 return self._resolution_future.cancelled()
545class _UnpicklingFuture(concurrent.futures.Future[R]): 1a
546 """Wrapper for a Future that unpickles the result returned by cloudpickle_wrapped_call."""
548 def __init__(self, wrapped_future: concurrent.futures.Future[bytes]): 1a
549 self.wrapped_future = wrapped_future
551 def result(self, timeout: float | None = None) -> R: 1a
552 pickled_result = self.wrapped_future.result(timeout)
553 import cloudpickle
555 return cloudpickle.loads(pickled_result)
557 def exception(self, timeout: float | None = None) -> BaseException | None: 1a
558 return self.wrapped_future.exception(timeout)
560 def done(self) -> bool: 1a
561 return self.wrapped_future.done()
563 def cancelled(self) -> bool: 1a
564 return self.wrapped_future.cancelled()
566 def cancel(self) -> bool: 1a
567 return self.wrapped_future.cancel()
569 def add_done_callback( 1a
570 self, fn: Callable[[concurrent.futures.Future[R]], object]
571 ) -> None:
572 def _fn(wrapped_future: concurrent.futures.Future[bytes]) -> None:
573 import cloudpickle
575 result = cloudpickle.loads(wrapped_future.result())
576 fn(result)
578 return self.wrapped_future.add_done_callback(_fn)
581class ProcessPoolTaskRunner(TaskRunner[PrefectConcurrentFuture[Any]]): 1a
582 """
583 A task runner that executes tasks in a separate process pool.
585 This task runner uses `ProcessPoolExecutor` to run tasks in separate processes,
586 providing true parallelism for CPU-bound tasks and process isolation. Tasks
587 are executed with proper context propagation and error handling.
589 Attributes:
590 max_workers: The maximum number of processes to use for executing tasks.
591 Defaults to `multiprocessing.cpu_count()` if `PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS` is not set.
593 Examples:
594 Use a process pool task runner with a flow:
596 ```python
597 from prefect import flow, task
598 from prefect.task_runners import ProcessPoolTaskRunner
600 @task
601 def compute_heavy_task(n: int) -> int:
602 # CPU-intensive computation that benefits from process isolation
603 return sum(i ** 2 for i in range(n))
605 @flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
606 def my_flow():
607 futures = []
608 for i in range(10):
609 future = compute_heavy_task.submit(i * 1000)
610 futures.append(future)
612 return [future.result() for future in futures]
613 ```
615 Use a process pool task runner as a context manager:
617 ```python
618 from prefect.task_runners import ProcessPoolTaskRunner
620 @task
621 def my_task(x: int) -> int:
622 return x * 2
624 # Use the runner directly
625 with ProcessPoolTaskRunner(max_workers=2) as runner:
626 future1 = runner.submit(my_task, {"x": 1})
627 future2 = runner.submit(my_task, {"x": 2})
629 result1 = future1.result() # 2
630 result2 = future2.result() # 4
631 ```
633 Configure max workers via settings:
635 ```python
636 # Set via environment variable
637 # export PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS=8
639 from prefect import flow
640 from prefect.task_runners import ProcessPoolTaskRunner
642 @flow(task_runner=ProcessPoolTaskRunner()) # Uses 8 workers from setting
643 def my_flow():
644 ...
645 ```
647 Note:
648 Process pool task runners provide process isolation but have overhead for
649 inter-process communication. They are most beneficial for CPU-bound tasks
650 that can take advantage of multiple CPU cores. For I/O-bound tasks,
651 consider using `ThreadPoolTaskRunner` instead.
653 This runner uses the 'spawn' multiprocessing start method for cross-platform
654 consistency and to avoid issues with shared state between processes.
656 All task parameters and return values must be serializable with cloudpickle.
657 The runner automatically handles context propagation and environment
658 variable passing to subprocess workers.
659 """
661 def __init__(self, max_workers: int | None = None): 1a
662 super().__init__()
663 current_settings = get_current_settings()
664 self._executor: ProcessPoolExecutor | None = None
665 self._resolver_executor: ThreadPoolExecutor | None = None
666 self._max_workers = (
667 max_workers
668 or current_settings.tasks.runner.process_pool_max_workers
669 or multiprocessing.cpu_count()
670 )
671 self._cancel_events: dict[uuid.UUID, multiprocessing.Event] = {}
673 def duplicate(self) -> Self: 1a
674 return type(self)(max_workers=self._max_workers)
676 def _resolve_futures_and_submit( 1a
677 self,
678 task: "Task[P, R | CoroutineType[Any, Any, R]]",
679 task_run_id: uuid.UUID,
680 parameters: dict[str, Any],
681 wait_for: Iterable[PrefectFuture[Any]] | None,
682 dependencies: dict[str, set[RunInput]] | None,
683 context: dict[str, Any],
684 env: dict[str, str],
685 ) -> concurrent.futures.Future[bytes]:
686 """
687 Helper method that:
688 1. Waits for all futures in wait_for to complete
689 2. Resolves any futures in parameters to their actual values
690 3. Submits the task to the ProcessPoolExecutor with resolved values
692 This method runs in a background thread to keep submit() non-blocking.
693 """
694 from prefect.utilities.engine import resolve_inputs_sync
696 # Wait for all futures in wait_for to complete
697 if wait_for:
698 wait(list(wait_for))
700 # Resolve any futures in parameters to their actual values
701 resolved_parameters = resolve_inputs_sync(
702 parameters, return_data=True, max_depth=-1
703 )
705 # Now submit to the process pool with resolved values
706 submit_kwargs: dict[str, Any] = dict(
707 task=task,
708 task_run_id=task_run_id,
709 parameters=resolved_parameters,
710 wait_for=None, # Already waited, no need to pass futures to subprocess
711 return_type="state",
712 dependencies=dependencies,
713 context=context,
714 )
716 # Prepare the cloudpickle wrapped call for subprocess execution
717 wrapped_call = cloudpickle_wrapped_call(
718 _run_task_in_subprocess,
719 env=env,
720 **submit_kwargs,
721 )
723 # Submit to executor
724 return self._executor.submit(wrapped_call)
726 @overload 1a
727 def submit( 727 ↛ exitline 727 didn't return from function 'submit' because 1a
728 self,
729 task: "Task[P, CoroutineType[Any, Any, R]]",
730 parameters: dict[str, Any],
731 wait_for: Iterable[PrefectFuture[Any]] | None = None,
732 dependencies: dict[str, set[RunInput]] | None = None,
733 ) -> PrefectConcurrentFuture[R]: ...
735 @overload 1a
736 def submit( 736 ↛ exitline 736 didn't return from function 'submit' because 1a
737 self,
738 task: "Task[Any, R]",
739 parameters: dict[str, Any],
740 wait_for: Iterable[PrefectFuture[Any]] | None = None,
741 dependencies: dict[str, set[RunInput]] | None = None,
742 ) -> PrefectConcurrentFuture[R]: ...
744 def submit( 1a
745 self,
746 task: "Task[P, R | CoroutineType[Any, Any, R]]",
747 parameters: dict[str, Any],
748 wait_for: Iterable[PrefectFuture[Any]] | None = None,
749 dependencies: dict[str, set[RunInput]] | None = None,
750 ) -> PrefectConcurrentFuture[R]:
751 """
752 Submit a task to the task run engine running in a separate process.
754 Args:
755 task: The task to submit.
756 parameters: The parameters to use when running the task.
757 wait_for: A list of futures that the task depends on.
758 dependencies: A dictionary of dependencies for the task.
760 Returns:
761 A future object that can be used to wait for the task to complete and
762 retrieve the result.
763 """
764 if (
765 not self._started
766 or self._executor is None
767 or self._resolver_executor is None
768 ):
769 raise RuntimeError("Task runner is not started")
771 if wait_for and task.tags and (self._max_workers <= len(task.tags)):
772 self.logger.warning(
773 f"Task {task.name} has {len(task.tags)} tags but only {self._max_workers} workers available"
774 "This may lead to dead-locks. Consider increasing the value of `PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS` or `max_workers`."
775 )
777 from prefect.context import FlowRunContext
779 task_run_id = uuid7()
781 flow_run_ctx = FlowRunContext.get()
782 if flow_run_ctx:
783 get_run_logger(flow_run_ctx).debug(
784 f"Submitting task {task.name} to process pool executor..."
785 )
786 else:
787 self.logger.debug(
788 f"Submitting task {task.name} to process pool executor..."
789 )
791 # Serialize the current context for the subprocess
792 from prefect.context import serialize_context
794 context = serialize_context()
795 env = (
796 get_current_settings().to_environment_variables(exclude_unset=True)
797 | os.environ
798 )
800 # Submit the resolution and subprocess execution to a background thread
801 # This keeps submit() non-blocking while still resolving futures before pickling
802 resolution_future = self._resolver_executor.submit(
803 self._resolve_futures_and_submit,
804 task=task,
805 task_run_id=task_run_id,
806 parameters=parameters,
807 wait_for=wait_for,
808 dependencies=dependencies,
809 context=context,
810 env=env,
811 )
813 # Create a future that chains: thread resolution -> process execution -> unpickling
814 # We need to wrap the resolution_future's result (which will be a process future)
815 chained_future = _ChainedFuture(resolution_future)
817 # Create a PrefectConcurrentFuture that handles unpickling
818 prefect_future: PrefectConcurrentFuture[R] = PrefectConcurrentFuture(
819 task_run_id=task_run_id, wrapped_future=_UnpicklingFuture(chained_future)
820 )
821 return prefect_future
823 @overload 1a
824 def map( 824 ↛ exitline 824 didn't return from function 'map' because 1a
825 self,
826 task: "Task[P, CoroutineType[Any, Any, R]]",
827 parameters: dict[str, Any],
828 wait_for: Iterable[PrefectFuture[Any]] | None = None,
829 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: ...
831 @overload 1a
832 def map( 832 ↛ exitline 832 didn't return from function 'map' because 1a
833 self,
834 task: "Task[Any, R]",
835 parameters: dict[str, Any],
836 wait_for: Iterable[PrefectFuture[Any]] | None = None,
837 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]: ...
839 def map( 1a
840 self,
841 task: "Task[P, R | CoroutineType[Any, Any, R]]",
842 parameters: dict[str, Any],
843 wait_for: Iterable[PrefectFuture[Any]] | None = None,
844 ) -> PrefectFutureList[PrefectConcurrentFuture[R]]:
845 return super().map(task, parameters, wait_for)
847 def cancel_all(self) -> None: 1a
848 # Clear cancel events first to avoid resource tracking issues
849 events_to_set = list(self._cancel_events.values())
850 self._cancel_events.clear()
852 for event in events_to_set:
853 try:
854 event.set()
855 self.logger.debug("Set cancel event")
856 except (OSError, ValueError):
857 # Ignore errors if event is already closed/invalid
858 pass
860 if self._executor is not None:
861 self._executor.shutdown(cancel_futures=True, wait=True)
862 self._executor = None
864 def __enter__(self) -> Self: 1a
865 super().__enter__()
866 # Use spawn method for cross-platform consistency and avoiding shared state issues
867 mp_context = multiprocessing.get_context("spawn")
868 self._executor = ProcessPoolExecutor(
869 max_workers=self._max_workers, mp_context=mp_context
870 )
871 # Create a thread pool for resolving futures before submitting to process pool
872 self._resolver_executor = ThreadPoolExecutor(max_workers=self._max_workers)
873 return self
875 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 1a
876 self.cancel_all()
877 # cancel_all() already shuts down the executor, but double-check
878 if self._executor is not None:
879 self._executor.shutdown(cancel_futures=True, wait=True)
880 self._executor = None
881 if self._resolver_executor is not None:
882 self._resolver_executor.shutdown(cancel_futures=True, wait=True)
883 self._resolver_executor = None
884 super().__exit__(exc_type, exc_value, traceback)
886 def __eq__(self, value: object) -> bool: 1a
887 if not isinstance(value, ProcessPoolTaskRunner):
888 return False
889 return self._max_workers == value._max_workers
892class PrefectTaskRunner(TaskRunner[PrefectDistributedFuture[R]]): 1a
893 def __init__(self): 1a
894 super().__init__()
896 def duplicate(self) -> "PrefectTaskRunner[R]": 1a
897 return type(self)()
899 @overload 1a
900 def submit( 900 ↛ exitline 900 didn't return from function 'submit' because 1a
901 self,
902 task: "Task[P, CoroutineType[Any, Any, R]]",
903 parameters: dict[str, Any],
904 wait_for: Iterable[PrefectFuture[Any]] | None = None,
905 dependencies: dict[str, set[RunInput]] | None = None,
906 ) -> PrefectDistributedFuture[R]: ...
908 @overload 1a
909 def submit( 909 ↛ exitline 909 didn't return from function 'submit' because 1a
910 self,
911 task: "Task[Any, R]",
912 parameters: dict[str, Any],
913 wait_for: Iterable[PrefectFuture[Any]] | None = None,
914 dependencies: dict[str, set[RunInput]] | None = None,
915 ) -> PrefectDistributedFuture[R]: ...
917 def submit( 1a
918 self,
919 task: "Task[P, R | CoroutineType[Any, Any, R]]",
920 parameters: dict[str, Any],
921 wait_for: Iterable[PrefectFuture[Any]] | None = None,
922 dependencies: dict[str, set[RunInput]] | None = None,
923 ) -> PrefectDistributedFuture[R]:
924 """
925 Submit a task to the task run engine running in a separate thread.
927 Args:
928 task: The task to submit.
929 parameters: The parameters to use when running the task.
930 wait_for: A list of futures that the task depends on.
932 Returns:
933 A future object that can be used to wait for the task to complete and
934 retrieve the result.
935 """
936 if not self._started:
937 raise RuntimeError("Task runner is not started")
938 from prefect.context import FlowRunContext
940 flow_run_ctx = FlowRunContext.get()
941 if flow_run_ctx:
942 get_run_logger(flow_run_ctx).info(
943 f"Submitting task {task.name} to for execution by a Prefect task worker..."
944 )
945 else:
946 self.logger.info(
947 f"Submitting task {task.name} to for execution by a Prefect task worker..."
948 )
950 return task.apply_async(
951 kwargs=parameters, wait_for=wait_for, dependencies=dependencies
952 )
954 @overload 1a
955 def map( 955 ↛ exitline 955 didn't return from function 'map' because 1a
956 self,
957 task: "Task[P, CoroutineType[Any, Any, R]]",
958 parameters: dict[str, Any],
959 wait_for: Iterable[PrefectFuture[Any]] | None = None,
960 ) -> PrefectFutureList[PrefectDistributedFuture[R]]: ...
962 @overload 1a
963 def map( 963 ↛ exitline 963 didn't return from function 'map' because 1a
964 self,
965 task: "Task[Any, R]",
966 parameters: dict[str, Any],
967 wait_for: Iterable[PrefectFuture[Any]] | None = None,
968 ) -> PrefectFutureList[PrefectDistributedFuture[R]]: ...
970 def map( 1a
971 self,
972 task: "Task[P, R | CoroutineType[Any, Any, R]]",
973 parameters: dict[str, Any],
974 wait_for: Iterable[PrefectFuture[Any]] | None = None,
975 ) -> PrefectFutureList[PrefectDistributedFuture[R]]:
976 return super().map(task, parameters, wait_for)