Coverage for /usr/local/lib/python3.12/site-packages/prefect/futures.py: 20%

338 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import abc 1a

4import concurrent.futures 1a

5import threading 1a

6import uuid 1a

7import warnings 1a

8from collections.abc import Generator, Iterator 1a

9from functools import partial 1a

10from typing import TYPE_CHECKING, Any, Callable, Generic 1a

11 

12from typing_extensions import NamedTuple, Self, TypeVar 1a

13 

14from prefect._waiters import FlowRunWaiter 1a

15from prefect.client.orchestration import get_client 1a

16from prefect.exceptions import ObjectNotFound 1a

17from prefect.logging.loggers import get_logger 1a

18from prefect.states import Pending, State 1a

19from prefect.task_runs import TaskRunWaiter 1a

20from prefect.utilities.annotations import quote 1a

21from prefect.utilities.asyncutils import run_coro_as_sync 1a

22from prefect.utilities.collections import StopVisiting, visit_collection 1a

23from prefect.utilities.timeout import timeout as timeout_context 1a

24 

25F = TypeVar("F") 1a

26R = TypeVar("R") 1a

27 

28if TYPE_CHECKING: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true1a

29 import logging 

30 

31logger: "logging.Logger" = get_logger(__name__) 1a

32 

33 

34class PrefectFuture(abc.ABC, Generic[R]): 1a

35 """ 

36 Abstract base class for Prefect futures. A Prefect future is a handle to the 

37 asynchronous execution of a run. It provides methods to wait for the 

38 to complete and to retrieve the result of the run. 

39 """ 

40 

41 def __init__(self, task_run_id: uuid.UUID): 1a

42 warnings.warn( 

43 "The __init__ method of PrefectFuture is deprecated and will be removed in a future release. " 

44 "If you are subclassing PrefectFuture, please implement the __init__ method in your subclass or " 

45 "subclass PrefectTaskRunFuture instead.", 

46 DeprecationWarning, 

47 ) 

48 self._task_run_id = task_run_id 

49 self._final_state: State[R] | None = None 

50 

51 @property 1a

52 def task_run_id(self) -> uuid.UUID: 1a

53 """The ID of the task run associated with this future""" 

54 warnings.warn( 

55 "The task_run_id property of PrefectFuture is deprecated and will be removed in a future release. " 

56 "If you are subclassing PrefectFuture, please implement the task_run_id property in your subclass or " 

57 "subclass PrefectTaskRunFuture instead.", 

58 DeprecationWarning, 

59 ) 

60 

61 return self._task_run_id 

62 

63 @property 1a

64 def state(self) -> State: 1a

65 """The current state of the task run associated with this future""" 

66 warnings.warn( 

67 "The state property of PrefectFuture is deprecated and will be removed in a future release. " 

68 "If you are subclassing PrefectFuture, please implement the state property in your subclass or " 

69 "subclass PrefectTaskRunFuture instead.", 

70 DeprecationWarning, 

71 ) 

72 

73 if self._final_state: 

74 return self._final_state 

75 client = get_client(sync_client=True) 

76 try: 

77 task_run = client.read_task_run(task_run_id=self.task_run_id) 

78 except ObjectNotFound: 

79 # We'll be optimistic and assume this task will eventually start 

80 # TODO: Consider using task run events to wait for the task to start 

81 return Pending() 

82 return task_run.state or Pending() 

83 

84 @abc.abstractmethod 1a

85 def wait(self, timeout: float | None = None) -> None: 1a

86 ... 

87 """ 

88 Wait for the task run to complete. 

89 

90 If the task run has already completed, this method will return immediately. 

91 

92 Args: 

93 timeout: The maximum number of seconds to wait for the task run to complete. 

94 If the task run has not completed after the timeout has elapsed, this method will return. 

95 """ 

96 

97 @abc.abstractmethod 1a

98 def result( 1a

99 self, 

100 timeout: float | None = None, 

101 raise_on_failure: bool = True, 

102 ) -> R: 

103 ... 

104 """ 

105 Get the result of the task run associated with this future. 

106 

107 If the task run has not completed, this method will wait for the task run to complete. 

108 

109 Args: 

110 timeout: The maximum number of seconds to wait for the task run to complete. 

111 If the task run has not completed after the timeout has elapsed, this method will return. 

112 raise_on_failure: If `True`, an exception will be raised if the task run fails. 

113 

114 Returns: 

115 The result of the task run. 

116 """ 

117 

118 @abc.abstractmethod 1a

119 def add_done_callback(self, fn: Callable[["PrefectFuture[R]"], None]) -> None: 1a

120 """ 

121 Add a callback to be run when the future completes or is cancelled. 

122 

123 Args: 

124 fn: A callable that will be called with this future as its only argument when the future completes or is cancelled. 

125 """ 

126 ... 

127 

128 

129class PrefectTaskRunFuture(PrefectFuture[R]): 1a

130 """ 

131 A Prefect future that represents the eventual execution of a task run. 

132 """ 

133 

134 def __init__(self, task_run_id: uuid.UUID): 1a

135 self._task_run_id = task_run_id 

136 self._final_state: State[R] | None = None 

137 

138 @property 1a

139 def task_run_id(self) -> uuid.UUID: 1a

140 """The ID of the task run associated with this future""" 

141 return self._task_run_id 

142 

143 @property 1a

144 def state(self) -> State: 1a

145 """The current state of the task run associated with this future""" 

146 if self._final_state: 

147 return self._final_state 

148 client = get_client(sync_client=True) 

149 try: 

150 task_run = client.read_task_run(task_run_id=self.task_run_id) 

151 except ObjectNotFound: 

152 # We'll be optimistic and assume this task will eventually start 

153 # TODO: Consider using task run events to wait for the task to start 

154 return Pending() 

155 return task_run.state or Pending() 

156 

157 

158class PrefectWrappedFuture(PrefectTaskRunFuture[R], abc.ABC, Generic[R, F]): 1a

159 """ 

160 A Prefect future that wraps another future object. 

161 

162 Type Parameters: 

163 R: The return type of the future 

164 F: The type of the wrapped future 

165 """ 

166 

167 def __init__(self, task_run_id: uuid.UUID, wrapped_future: F): 1a

168 self._wrapped_future: F = wrapped_future 

169 super().__init__(task_run_id) 

170 

171 @property 1a

172 def wrapped_future(self) -> F: 1a

173 """The underlying future object wrapped by this Prefect future""" 

174 return self._wrapped_future 

175 

176 def add_done_callback(self, fn: Callable[[PrefectFuture[R]], None]) -> None: 1a

177 """Add a callback to be executed when the future completes.""" 

178 if not self._final_state: 

179 

180 def call_with_self(future: F): 

181 """Call the callback with self as the argument, this is necessary to ensure we remove the future from the pending set""" 

182 fn(self) 

183 

184 self._wrapped_future.add_done_callback(call_with_self) 

185 return 

186 fn(self) 

187 

188 

189class PrefectConcurrentFuture(PrefectWrappedFuture[R, concurrent.futures.Future[R]]): 1a

190 """ 

191 A Prefect future that wraps a concurrent.futures.Future. This future is used 

192 when the task run is submitted to a ThreadPoolExecutor. 

193 """ 

194 

195 def wait(self, timeout: float | None = None) -> None: 1a

196 try: 

197 result = self._wrapped_future.result(timeout=timeout) 

198 except concurrent.futures.TimeoutError: 

199 return 

200 if isinstance(result, State): 

201 self._final_state = result 

202 

203 def result( 1a

204 self, 

205 timeout: float | None = None, 

206 raise_on_failure: bool = True, 

207 ) -> R: 

208 if not self._final_state: 

209 try: 

210 future_result = self._wrapped_future.result(timeout=timeout) 

211 except concurrent.futures.TimeoutError as exc: 

212 raise TimeoutError( 

213 f"Task run {self.task_run_id} did not complete within {timeout} seconds" 

214 ) from exc 

215 

216 if isinstance(future_result, State): 

217 self._final_state = future_result 

218 

219 else: 

220 return future_result 

221 

222 _result = self._final_state.result( 

223 raise_on_failure=raise_on_failure, _sync=True 

224 ) 

225 return _result 

226 

227 

228class PrefectDistributedFuture(PrefectTaskRunFuture[R]): 1a

229 """ 

230 Represents the result of a computation happening anywhere. 

231 

232 This class is typically used to interact with the result of a task run 

233 scheduled to run in a Prefect task worker but can be used to interact with 

234 any task run scheduled in Prefect's API. 

235 """ 

236 

237 done_callbacks: list[Callable[[PrefectFuture[R]], None]] = [] 1a

238 waiter = None 1a

239 

240 def wait(self, timeout: float | None = None) -> None: 1a

241 return run_coro_as_sync(self.wait_async(timeout=timeout)) 

242 

243 async def wait_async(self, timeout: float | None = None) -> None: 1a

244 if self._final_state: 

245 logger.debug( 

246 "Final state already set for %s. Returning...", self.task_run_id 

247 ) 

248 return 

249 

250 # Ask for the instance of TaskRunWaiter _now_ so that it's already running and 

251 # can catch the completion event if it happens before we start listening for it. 

252 TaskRunWaiter.instance() 

253 

254 # Read task run to see if it is still running 

255 async with get_client() as client: 

256 task_run = await client.read_task_run(task_run_id=self._task_run_id) 

257 if task_run.state is None: 

258 raise RuntimeError( 

259 f"Task run {self.task_run_id} has no state which means it hasn't started yet." 

260 ) 

261 if task_run.state.is_final(): 

262 logger.debug( 

263 "Task run %s already finished. Returning...", 

264 self.task_run_id, 

265 ) 

266 self._final_state = task_run.state 

267 return 

268 

269 # If still running, wait for a completed event from the server 

270 logger.debug( 

271 "Waiting for completed event for task run %s...", 

272 self.task_run_id, 

273 ) 

274 state_from_event = await TaskRunWaiter.wait_for_task_run( 

275 self._task_run_id, timeout=timeout 

276 ) 

277 

278 if state_from_event: 

279 # We got the final state directly from the event 

280 self._final_state = state_from_event 

281 logger.debug( 

282 "Task run %s completed with state from event: %s", 

283 self.task_run_id, 

284 state_from_event.type, 

285 ) 

286 return 

287 

288 def result( 1a

289 self, 

290 timeout: float | None = None, 

291 raise_on_failure: bool = True, 

292 ) -> R: 

293 return run_coro_as_sync( 

294 self.result_async(timeout=timeout, raise_on_failure=raise_on_failure) 

295 ) 

296 

297 async def result_async( 1a

298 self, 

299 timeout: float | None = None, 

300 raise_on_failure: bool = True, 

301 ) -> R: 

302 if not self._final_state: 

303 await self.wait_async(timeout=timeout) 

304 if not self._final_state: 

305 # If still no final state after wait, try reading it once more. 

306 # This should rarely happen since wait_async() now gets state from events. 

307 async with get_client() as client: 

308 task_run = await client.read_task_run(task_run_id=self._task_run_id) 

309 if task_run.state and task_run.state.is_final(): 

310 self._final_state = task_run.state 

311 else: 

312 raise TimeoutError( 

313 f"Task run {self.task_run_id} did not complete within {timeout} seconds" 

314 ) 

315 

316 return await self._final_state.aresult(raise_on_failure=raise_on_failure) 

317 

318 def add_done_callback(self, fn: Callable[[PrefectFuture[R]], None]) -> None: 1a

319 if self._final_state: 

320 fn(self) 

321 return 

322 TaskRunWaiter.instance() 

323 with get_client(sync_client=True) as client: 

324 task_run = client.read_task_run(task_run_id=self._task_run_id) 

325 if task_run.state.is_final(): 

326 self._final_state = task_run.state 

327 fn(self) 

328 return 

329 TaskRunWaiter.add_done_callback(self._task_run_id, partial(fn, self)) 

330 

331 def __eq__(self, other: Any) -> bool: 1a

332 if not isinstance(other, PrefectDistributedFuture): 

333 return False 

334 return self.task_run_id == other.task_run_id 

335 

336 def __hash__(self) -> int: 1a

337 return hash(self.task_run_id) 

338 

339 

340class PrefectFlowRunFuture(PrefectFuture[R]): 1a

341 """ 

342 A Prefect future that represents the eventual execution of a flow run. 

343 """ 

344 

345 def __init__(self, flow_run_id: uuid.UUID): 1a

346 self._flow_run_id = flow_run_id 

347 self._final_state: State[R] | None = None 

348 

349 @property 1a

350 def flow_run_id(self) -> uuid.UUID: 1a

351 """The ID of the flow run associated with this future""" 

352 return self._flow_run_id 

353 

354 @property 1a

355 def state(self) -> State: 1a

356 """The current state of the flow run associated with this future""" 

357 if self._final_state: 

358 return self._final_state 

359 client = get_client(sync_client=True) 

360 state = Pending() 

361 try: 

362 flow_run = client.read_flow_run(flow_run_id=self.flow_run_id) 

363 if flow_run.state: 

364 state = flow_run.state 

365 except ObjectNotFound: 

366 # We'll be optimistic and assume this flow run will eventually start 

367 pass 

368 return state 

369 

370 def wait(self, timeout: float | None = None) -> None: 1a

371 return run_coro_as_sync(self.wait_async(timeout=timeout)) 

372 

373 async def wait_async(self, timeout: float | None = None) -> None: 1a

374 if self._final_state: 

375 logger.debug( 

376 "Final state already set for %s. Returning...", self.task_run_id 

377 ) 

378 return 

379 

380 # Ask for the instance of FlowRunWaiter _now_ so that it's already running and 

381 # can catch the completion event if it happens before we start listening for it. 

382 FlowRunWaiter.instance() 

383 

384 # Read task run to see if it is still running 

385 async with get_client() as client: 

386 flow_run = await client.read_flow_run(flow_run_id=self._flow_run_id) 

387 if flow_run.state is None: 

388 raise RuntimeError( 

389 f"Flow run {self.flow_run_id} has no state which means it hasn't started yet." 

390 ) 

391 if flow_run.state and flow_run.state.is_final(): 

392 logger.debug( 

393 "Flow run %s already finished. Returning...", 

394 self.flow_run_id, 

395 ) 

396 self._final_state = flow_run.state 

397 return 

398 

399 # If still running, wait for a completed event from the server 

400 logger.debug( 

401 "Waiting for completed event for flow run %s...", 

402 self.flow_run_id, 

403 ) 

404 await FlowRunWaiter.wait_for_flow_run(self._flow_run_id, timeout=timeout) 

405 flow_run = await client.read_flow_run(flow_run_id=self._flow_run_id) 

406 if flow_run.state and flow_run.state.is_final(): 

407 self._final_state = flow_run.state 

408 return 

409 

410 def result( 1a

411 self, 

412 timeout: float | None = None, 

413 raise_on_failure: bool = True, 

414 ) -> R: 

415 return run_coro_as_sync( 

416 self.aresult(timeout=timeout, raise_on_failure=raise_on_failure) 

417 ) 

418 

419 async def aresult( 1a

420 self, 

421 timeout: float | None = None, 

422 raise_on_failure: bool = True, 

423 ) -> R: 

424 if not self._final_state: 

425 await self.wait_async(timeout=timeout) 

426 if not self._final_state: 

427 raise TimeoutError( 

428 f"Task run {self.task_run_id} did not complete within {timeout} seconds" 

429 ) 

430 

431 return await self._final_state.aresult(raise_on_failure=raise_on_failure) 

432 

433 def add_done_callback(self, fn: Callable[[PrefectFuture[R]], None]) -> None: 1a

434 if self._final_state: 

435 fn(self) 

436 return 

437 FlowRunWaiter.instance() 

438 with get_client(sync_client=True) as client: 

439 flow_run = client.read_flow_run(flow_run_id=self._flow_run_id) 

440 if flow_run.state and flow_run.state.is_final(): 

441 self._final_state = flow_run.state 

442 fn(self) 

443 return 

444 FlowRunWaiter.add_done_callback(self._flow_run_id, partial(fn, self)) 

445 

446 def __eq__(self, other: Any) -> bool: 1a

447 if not isinstance(other, PrefectFlowRunFuture): 

448 return False 

449 return self.flow_run_id == other.flow_run_id 

450 

451 def __hash__(self) -> int: 1a

452 return hash(self.flow_run_id) 

453 

454 

455class PrefectFutureList(list[PrefectFuture[R]], Iterator[PrefectFuture[R]]): 1a

456 """ 

457 A list of Prefect futures. 

458 

459 This class provides methods to wait for all futures 

460 in the list to complete and to retrieve the results of all task runs. 

461 """ 

462 

463 def wait(self, timeout: float | None = None) -> None: 1a

464 """ 

465 Wait for all futures in the list to complete. 

466 

467 Args: 

468 timeout: The maximum number of seconds to wait for all futures to 

469 complete. This method will not raise if the timeout is reached. 

470 """ 

471 wait(self, timeout=timeout) 

472 

473 def result( 1a

474 self: Self, 

475 timeout: float | None = None, 

476 raise_on_failure: bool = True, 

477 ) -> list[R]: 

478 """ 

479 Get the results of all task runs associated with the futures in the list. 

480 

481 Args: 

482 timeout: The maximum number of seconds to wait for all futures to 

483 complete. 

484 raise_on_failure: If `True`, an exception will be raised if any task run fails. 

485 

486 Returns: 

487 A list of results of the task runs. 

488 

489 Raises: 

490 TimeoutError: If the timeout is reached before all futures complete. 

491 """ 

492 try: 

493 with timeout_context(timeout): 

494 return [ 

495 future.result(raise_on_failure=raise_on_failure) for future in self 

496 ] 

497 except TimeoutError as exc: 

498 # timeout came from inside the task 

499 if "Scope timed out after {timeout} second(s)." not in str(exc): 

500 raise 

501 raise TimeoutError( 

502 f"Timed out waiting for all futures to complete within {timeout} seconds" 

503 ) from exc 

504 

505 

506def as_completed( 1a

507 futures: list[PrefectFuture[R]], timeout: float | None = None 

508) -> Generator[PrefectFuture[R], None]: 

509 unique_futures: set[PrefectFuture[R]] = set(futures) 

510 total_futures = len(unique_futures) 

511 pending = unique_futures 

512 try: 

513 with timeout_context(timeout): 

514 done = {f for f in unique_futures if f._final_state} # type: ignore[privateUsage] 

515 pending = unique_futures - done 

516 yield from done 

517 

518 finished_event = threading.Event() 

519 finished_lock = threading.Lock() 

520 finished_futures: list[PrefectFuture[R]] = [] 

521 

522 def add_to_done(future: PrefectFuture[R]): 

523 with finished_lock: 

524 finished_futures.append(future) 

525 finished_event.set() 

526 

527 for future in pending: 

528 future.add_done_callback(add_to_done) 

529 

530 while pending: 

531 finished_event.wait() 

532 with finished_lock: 

533 done = finished_futures 

534 finished_futures = [] 

535 finished_event.clear() 

536 

537 for future in done: 

538 pending.remove(future) 

539 yield future 

540 

541 except TimeoutError: 

542 raise TimeoutError( 

543 "%d (of %d) futures unfinished" % (len(pending), total_futures) 

544 ) 

545 

546 

547class DoneAndNotDoneFutures(NamedTuple, Generic[R]): 1a

548 """A named 2-tuple of sets. 

549 

550 multiple inheritance supported in 3.11+, use typing_extensions.NamedTuple 

551 """ 

552 

553 done: set[PrefectFuture[R]] 1a

554 not_done: set[PrefectFuture[R]] 1a

555 

556 

557def wait( 1a

558 futures: list[PrefectFuture[R]], timeout: float | None = None 

559) -> DoneAndNotDoneFutures[R]: 

560 """ 

561 Wait for the futures in the given sequence to complete. 

562 

563 Args: 

564 futures: The sequence of Futures to wait upon. 

565 timeout: The maximum number of seconds to wait. If None, then there 

566 is no limit on the wait time. 

567 

568 Returns: 

569 A named 2-tuple of sets. The first set, named 'done', contains the 

570 futures that completed (is finished or cancelled) before the wait 

571 completed. The second set, named 'not_done', contains uncompleted 

572 futures. Duplicate futures given to *futures* are removed and will be 

573 returned only once. 

574 

575 Examples: 

576 ```python 

577 @task 

578 def sleep_task(seconds): 

579 sleep(seconds) 

580 return 42 

581 

582 @flow 

583 def flow(): 

584 futures = random_task.map(range(10)) 

585 done, not_done = wait(futures, timeout=5) 

586 print(f"Done: {len(done)}") 

587 print(f"Not Done: {len(not_done)}") 

588 ``` 

589 """ 

590 _futures = set(futures) 

591 done = {f for f in _futures if f._final_state} 

592 not_done = _futures - done 

593 if len(done) == len(_futures): 

594 return DoneAndNotDoneFutures(done, not_done) 

595 

596 # If no timeout, wait for all futures sequentially 

597 if timeout is None: 

598 for future in not_done.copy(): 

599 future.wait() 

600 done.add(future) 

601 not_done.remove(future) 

602 return DoneAndNotDoneFutures(done, not_done) 

603 

604 # With timeout, monitor all futures concurrently 

605 try: 

606 with timeout_context(timeout): 

607 finished_event = threading.Event() 

608 finished_lock = threading.Lock() 

609 finished_futures: list[PrefectFuture[R]] = [] 

610 

611 def mark_done(future: PrefectFuture[R]): 

612 with finished_lock: 

613 finished_futures.append(future) 

614 finished_event.set() 

615 

616 # Add callbacks to all pending futures 

617 for future in not_done: 

618 future.add_done_callback(mark_done) 

619 

620 # Wait for futures to complete within timeout 

621 while not_done: 

622 # Wait for at least one future to complete 

623 finished_event.wait() 

624 with finished_lock: 

625 newly_done = finished_futures[:] 

626 finished_futures.clear() 

627 finished_event.clear() 

628 

629 # Move completed futures to done set 

630 for future in newly_done: 

631 if future in not_done: 

632 not_done.remove(future) 

633 done.add(future) 

634 

635 return DoneAndNotDoneFutures(done, not_done) 

636 except TimeoutError: 

637 logger.debug("Timed out waiting for all futures to complete.") 

638 return DoneAndNotDoneFutures(done, not_done) 

639 

640 

641def resolve_futures_to_states( 1a

642 expr: PrefectFuture[R] | Any, 

643) -> PrefectFuture[R] | Any: 

644 """ 

645 Given a Python built-in collection, recursively find `PrefectFutures` and build a 

646 new collection with the same structure with futures resolved to their final states. 

647 Resolving futures to their final states may wait for execution to complete. 

648 

649 Unsupported object types will be returned without modification. 

650 """ 

651 

652 def _resolve_state(future: PrefectFuture[R]): 

653 future.wait() 

654 return future.state 

655 

656 return _resolve_futures( 

657 expr, 

658 resolve_fn=_resolve_state, 

659 ) 

660 

661 

662def resolve_futures_to_results( 1a

663 expr: PrefectFuture[R] | Any, 

664) -> Any: 

665 """ 

666 Given a Python built-in collection, recursively find `PrefectFutures` and build a 

667 new collection with the same structure with futures resolved to their final results. 

668 Resolving futures to their final result may wait for execution to complete. 

669 

670 Unsupported object types will be returned without modification. 

671 """ 

672 

673 def _resolve_result(future: PrefectFuture[R]) -> Any: 

674 future.wait() 

675 if future.state.is_completed(): 

676 return future.result() 

677 else: 

678 raise Exception("At least one result did not complete successfully") 

679 

680 return _resolve_futures(expr, resolve_fn=_resolve_result) 

681 

682 

683def _resolve_futures( 1a

684 expr: PrefectFuture[R] | Any, 

685 resolve_fn: Callable[[PrefectFuture[R]], Any], 

686) -> Any: 

687 """Helper function to resolve PrefectFutures in a collection.""" 

688 futures: set[PrefectFuture[R]] = set() 

689 

690 visit_collection( 

691 expr, 

692 visit_fn=partial(_collect_futures, futures), 

693 return_data=False, 

694 context={}, 

695 ) 

696 

697 # If no futures were found, return the original expression 

698 if not futures: 

699 return expr 

700 

701 # Resolve each future using the provided resolve function 

702 resolved_values = {future: resolve_fn(future) for future in futures} 

703 

704 def replace_futures(expr: Any, context: Any) -> Any: 

705 # Expressions inside quotes should not be modified 

706 if isinstance(context.get("annotation"), quote): 

707 raise StopVisiting() 

708 

709 if isinstance(expr, PrefectFuture): 

710 return resolved_values[expr] 

711 else: 

712 return expr 

713 

714 return visit_collection( 

715 expr, 

716 visit_fn=replace_futures, 

717 return_data=True, 

718 context={}, 

719 ) 

720 

721 

722def _collect_futures( 1a

723 futures: set[PrefectFuture[R]], expr: Any | PrefectFuture[R], context: Any 

724) -> Any | PrefectFuture[R]: 

725 # Expressions inside quotes should not be traversed 

726 if isinstance(context.get("annotation"), quote): 

727 raise StopVisiting() 

728 

729 if isinstance(expr, PrefectFuture): 

730 futures.add(expr) 

731 

732 return expr