Coverage for /usr/local/lib/python3.12/site-packages/prefect/runner/runner.py: 13%

652 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Runners are responsible for managing the execution of all deployments. 

3 

4When creating a deployment using either `flow.serve` or the `serve` utility, 

5they also will poll for scheduled runs. 

6 

7Example: 

8 ```python 

9 import time 

10 from prefect import flow, serve 

11 

12 

13 @flow 

14 def slow_flow(sleep: int = 60): 

15 "Sleepy flow - sleeps the provided amount of time (in seconds)." 

16 time.sleep(sleep) 

17 

18 

19 @flow 

20 def fast_flow(): 

21 "Fastest flow this side of the Mississippi." 

22 return 

23 

24 

25 if __name__ == "__main__": 

26 slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45) 

27 fast_deploy = fast_flow.to_deployment(name="fast") 

28 

29 # serve generates a Runner instance 

30 serve(slow_deploy, fast_deploy) 

31 ``` 

32 

33""" 

34 

35from __future__ import annotations 1a

36 

37import asyncio 1a

38import datetime 1a

39import inspect 1a

40import logging 1a

41import multiprocessing.context 1a

42import os 1a

43import shlex 1a

44import shutil 1a

45import signal 1a

46import subprocess 1a

47import sys 1a

48import tempfile 1a

49import threading 1a

50import uuid 1a

51from contextlib import AsyncExitStack 1a

52from copy import deepcopy 1a

53from functools import partial 1a

54from pathlib import Path 1a

55from typing import ( 1a

56 TYPE_CHECKING, 

57 Any, 

58 Callable, 

59 Coroutine, 

60 Dict, 

61 Iterable, 

62 List, 

63 Optional, 

64 TypedDict, 

65 Union, 

66) 

67from uuid import UUID, uuid4 1a

68 

69import anyio 1a

70import anyio.abc 1a

71import anyio.to_thread 1a

72from cachetools import LRUCache 1a

73from typing_extensions import Self 1a

74 

75from prefect._experimental.bundles import ( 1a

76 SerializedBundle, 

77 execute_bundle_in_subprocess, 

78 extract_flow_from_bundle, 

79) 

80from prefect._internal.concurrency.api import ( 1a

81 create_call, 

82 from_async, 

83 from_sync, 

84) 

85from prefect.client.orchestration import PrefectClient, get_client 1a

86from prefect.client.schemas.objects import ( 1a

87 ConcurrencyLimitConfig, 

88 State, 

89 StateType, 

90) 

91from prefect.client.schemas.objects import Flow as APIFlow 1a

92from prefect.events import DeploymentTriggerTypes, TriggerTypes 1a

93from prefect.events.clients import EventsClient, get_events_client 1a

94from prefect.events.related import tags_as_related_resources 1a

95from prefect.events.schemas.events import Event, RelatedResource, Resource 1a

96from prefect.exceptions import Abort, ObjectNotFound 1a

97from prefect.flow_engine import run_flow_in_subprocess 1a

98from prefect.flows import Flow, FlowStateHook, load_flow_from_flow_run 1a

99from prefect.logging.loggers import PrefectLogAdapter, flow_run_logger, get_logger 1a

100from prefect.runner._observers import FlowRunCancellingObserver 1a

101from prefect.runner.storage import RunnerStorage 1a

102from prefect.schedules import Schedule 1a

103from prefect.settings import ( 1a

104 PREFECT_API_URL, 

105 PREFECT_RUNNER_SERVER_ENABLE, 

106 get_current_settings, 

107) 

108from prefect.states import ( 1a

109 AwaitingRetry, 

110 Crashed, 

111 Pending, 

112 exception_to_failed_state, 

113) 

114from prefect.types._datetime import now 1a

115from prefect.types.entrypoint import EntrypointType 1a

116from prefect.utilities.annotations import NotSet 1a

117from prefect.utilities.asyncutils import ( 1a

118 asyncnullcontext, 

119 is_async_fn, 

120 sync_compatible, 

121) 

122from prefect.utilities.engine import propose_state, propose_state_sync 1a

123from prefect.utilities.processutils import ( 1a

124 get_sys_executable, 

125 run_process, 

126) 

127from prefect.utilities.services import ( 1a

128 critical_service_loop, 

129 start_client_metrics_server, 

130) 

131from prefect.utilities.slugify import slugify 1a

132 

133if TYPE_CHECKING: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true1a

134 import concurrent.futures 

135 

136 from prefect.client.schemas.objects import FlowRun 

137 from prefect.client.schemas.responses import DeploymentResponse 

138 from prefect.client.types.flexible_schedule_list import FlexibleScheduleList 

139 from prefect.deployments.runner import RunnerDeployment 

140 

141__all__ = ["Runner"] 1a

142 

143 

144class ProcessMapEntry(TypedDict): 1a

145 flow_run: "FlowRun" 1a

146 pid: int 1a

147 

148 

149class Runner: 1a

150 def __init__( 1a

151 self, 

152 name: Optional[str] = None, 

153 query_seconds: Optional[float] = None, 

154 prefetch_seconds: float = 10, 

155 heartbeat_seconds: Optional[float] = None, 

156 limit: int | type[NotSet] | None = NotSet, 

157 pause_on_shutdown: bool = True, 

158 webserver: bool = False, 

159 ): 

160 """ 

161 Responsible for managing the execution of remotely initiated flow runs. 

162 

163 Args: 

164 name: The name of the runner. If not provided, a random one 

165 will be generated. If provided, it cannot contain '/' or '%'. 

166 query_seconds: The number of seconds to wait between querying for 

167 scheduled flow runs; defaults to `PREFECT_RUNNER_POLL_FREQUENCY` 

168 prefetch_seconds: The number of seconds to prefetch flow runs for. 

169 heartbeat_seconds: The number of seconds to wait between emitting 

170 flow run heartbeats. The runner will not emit heartbeats if the value is None. 

171 Defaults to `PREFECT_RUNNER_HEARTBEAT_FREQUENCY`. 

172 limit: The maximum number of flow runs this runner should be running at. Provide `None` for no limit. 

173 If not provided, the runner will use the value of `PREFECT_RUNNER_PROCESS_LIMIT`. 

174 pause_on_shutdown: A boolean for whether or not to automatically pause 

175 deployment schedules on shutdown; defaults to `True` 

176 webserver: a boolean flag for whether to start a webserver for this runner 

177 

178 Examples: 

179 Set up a Runner to manage the execute of scheduled flow runs for two flows: 

180 ```python 

181 import asyncio 

182 from prefect import flow, Runner 

183 

184 @flow 

185 def hello_flow(name): 

186 print(f"hello {name}") 

187 

188 @flow 

189 def goodbye_flow(name): 

190 print(f"goodbye {name}") 

191 

192 if __name__ == "__main__" 

193 runner = Runner(name="my-runner") 

194 

195 # Will be runnable via the API 

196 runner.add_flow(hello_flow) 

197 

198 # Run on a cron schedule 

199 runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"}) 

200 

201 asyncio.run(runner.start()) 

202 ``` 

203 """ 

204 settings = get_current_settings() 

205 

206 if name and ("/" in name or "%" in name): 

207 raise ValueError("Runner name cannot contain '/' or '%'") 

208 self.name: str = Path(name).stem if name is not None else f"runner-{uuid4()}" 

209 self._logger: "logging.Logger" = get_logger("runner") 

210 

211 self.started: bool = False 

212 self.stopping: bool = False 

213 self.pause_on_shutdown: bool = pause_on_shutdown 

214 self.limit: int | None = ( 

215 settings.runner.process_limit 

216 if limit is NotSet or isinstance(limit, type) 

217 else limit 

218 ) 

219 self.webserver: bool = webserver 

220 

221 self.query_seconds: float = query_seconds or settings.runner.poll_frequency 

222 self._prefetch_seconds: float = prefetch_seconds 

223 self.heartbeat_seconds: float | None = ( 

224 heartbeat_seconds or settings.runner.heartbeat_frequency 

225 ) 

226 if self.heartbeat_seconds is not None and self.heartbeat_seconds < 30: 

227 raise ValueError("Heartbeat must be 30 seconds or greater.") 

228 self._heartbeat_task: asyncio.Task[None] | None = None 

229 self._events_client: EventsClient = get_events_client(checkpoint_every=1) 

230 

231 self._exit_stack = AsyncExitStack() 

232 self._limiter: anyio.CapacityLimiter | None = None 

233 self._cancelling_observer: FlowRunCancellingObserver | None = None 

234 self._client: PrefectClient = get_client() 

235 self._submitting_flow_run_ids: set[UUID] = set() 

236 self._cancelling_flow_run_ids: set[UUID] = set() 

237 self._scheduled_task_scopes: set[anyio.abc.CancelScope] = set() 

238 self._deployment_ids: set[UUID] = set() 

239 self._flow_run_process_map: dict[UUID, ProcessMapEntry] = dict() 

240 self.__flow_run_process_map_lock: asyncio.Lock | None = None 

241 self._flow_run_bundle_map: dict[UUID, SerializedBundle] = dict() 

242 # Flip to True when we are rescheduling flow runs to avoid marking flow runs as crashed 

243 self._rescheduling: bool = False 

244 

245 self._tmp_dir: Path = ( 

246 Path(tempfile.gettempdir()) / "runner_storage" / str(uuid4()) 

247 ) 

248 self._storage_objs: list[RunnerStorage] = [] 

249 self._deployment_storage_map: dict[UUID, RunnerStorage] = {} 

250 

251 self._loop: Optional[asyncio.AbstractEventLoop] = None 

252 

253 # Caching 

254 self._deployment_cache: LRUCache[UUID, "DeploymentResponse"] = LRUCache( 

255 maxsize=100 

256 ) 

257 self._flow_cache: LRUCache[UUID, "APIFlow"] = LRUCache(maxsize=100) 

258 

259 # Keep track of added flows so we can run them directly in a subprocess 

260 self._deployment_flow_map: dict[UUID, "Flow[Any, Any]"] = dict() 

261 

262 @property 1a

263 def _flow_run_process_map_lock(self) -> asyncio.Lock: 1a

264 if self.__flow_run_process_map_lock is None: 

265 self.__flow_run_process_map_lock = asyncio.Lock() 

266 return self.__flow_run_process_map_lock 

267 

268 async def _add_flow_run_process_map_entry( 1a

269 self, flow_run_id: UUID, process_map_entry: ProcessMapEntry 

270 ): 

271 async with self._flow_run_process_map_lock: 

272 self._flow_run_process_map[flow_run_id] = process_map_entry 

273 

274 if TYPE_CHECKING: 

275 assert self._cancelling_observer is not None 

276 self._cancelling_observer.add_in_flight_flow_run_id(flow_run_id) 

277 

278 async def _remove_flow_run_process_map_entry(self, flow_run_id: UUID): 1a

279 async with self._flow_run_process_map_lock: 

280 self._flow_run_process_map.pop(flow_run_id, None) 

281 

282 if TYPE_CHECKING: 

283 assert self._cancelling_observer is not None 

284 self._cancelling_observer.remove_in_flight_flow_run_id(flow_run_id) 

285 

286 @sync_compatible 1a

287 async def add_deployment( 1a

288 self, 

289 deployment: "RunnerDeployment", 

290 ) -> UUID: 

291 """ 

292 Registers the deployment with the Prefect API and will monitor for work once 

293 the runner is started. 

294 

295 Args: 

296 deployment: A deployment for the runner to register. 

297 """ 

298 apply_coro = deployment.apply() 

299 if TYPE_CHECKING: 

300 assert inspect.isawaitable(apply_coro) 

301 deployment_id = await apply_coro 

302 storage = deployment.storage 

303 if storage is not None: 

304 add_storage_coro = self._add_storage(storage) 

305 if TYPE_CHECKING: 

306 assert inspect.isawaitable(add_storage_coro) 

307 storage = await add_storage_coro 

308 self._deployment_storage_map[deployment_id] = storage 

309 self._deployment_ids.add(deployment_id) 

310 

311 return deployment_id 

312 

313 @sync_compatible 1a

314 async def add_flow( 1a

315 self, 

316 flow: Flow[Any, Any], 

317 name: Optional[str] = None, 

318 interval: Optional[ 

319 Union[ 

320 Iterable[Union[int, float, datetime.timedelta]], 

321 int, 

322 float, 

323 datetime.timedelta, 

324 ] 

325 ] = None, 

326 cron: Optional[Union[Iterable[str], str]] = None, 

327 rrule: Optional[Union[Iterable[str], str]] = None, 

328 paused: Optional[bool] = None, 

329 schedule: Optional[Schedule] = None, 

330 schedules: Optional["FlexibleScheduleList"] = None, 

331 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

332 parameters: Optional[dict[str, Any]] = None, 

333 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

334 description: Optional[str] = None, 

335 tags: Optional[List[str]] = None, 

336 version: Optional[str] = None, 

337 enforce_parameter_schema: bool = True, 

338 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, 

339 ) -> UUID: 

340 """ 

341 Provides a flow to the runner to be run based on the provided configuration. 

342 

343 Will create a deployment for the provided flow and register the deployment 

344 with the runner. 

345 

346 Args: 

347 flow: A flow for the runner to run. 

348 name: The name to give the created deployment. Will default to the name 

349 of the runner. 

350 interval: An interval on which to execute the current flow. Accepts either a number 

351 or a timedelta object. If a number is given, it will be interpreted as seconds. 

352 cron: A cron schedule of when to execute runs of this flow. 

353 rrule: An rrule schedule of when to execute runs of this flow. 

354 paused: Whether or not to set the created deployment as paused. 

355 schedule: A schedule object defining when to execute runs of this deployment. 

356 Used to provide additional scheduling options like `timezone` or `parameters`. 

357 schedules: A list of schedule objects defining when to execute runs of this flow. 

358 Used to define multiple schedules or additional scheduling options like `timezone`. 

359 concurrency_limit: The maximum number of concurrent runs of this flow to allow. 

360 triggers: A list of triggers that should kick of a run of this flow. 

361 parameters: A dictionary of default parameter values to pass to runs of this flow. 

362 description: A description for the created deployment. Defaults to the flow's 

363 description if not provided. 

364 tags: A list of tags to associate with the created deployment for organizational 

365 purposes. 

366 version: A version for the created deployment. Defaults to the flow's version. 

367 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path 

368 entrypoint, ensure that the module will be importable in the execution environment. 

369 """ 

370 api = PREFECT_API_URL.value() 

371 if any([interval, cron, rrule, schedule, schedules]) and not api: 

372 self._logger.warning( 

373 "Cannot schedule flows on an ephemeral server; run `prefect server" 

374 " start` to start the scheduler." 

375 ) 

376 name = self.name if name is None else name 

377 

378 to_deployment_coro = flow.to_deployment( 

379 name=name, 

380 interval=interval, 

381 cron=cron, 

382 rrule=rrule, 

383 schedule=schedule, 

384 schedules=schedules, 

385 paused=paused, 

386 triggers=triggers, 

387 parameters=parameters, 

388 description=description, 

389 tags=tags, 

390 version=version, 

391 enforce_parameter_schema=enforce_parameter_schema, 

392 entrypoint_type=entrypoint_type, 

393 concurrency_limit=concurrency_limit, 

394 ) 

395 if TYPE_CHECKING: 

396 assert inspect.isawaitable(to_deployment_coro) 

397 deployment = await to_deployment_coro 

398 

399 add_deployment_coro = self.add_deployment(deployment) 

400 if TYPE_CHECKING: 

401 assert inspect.isawaitable(add_deployment_coro) 

402 deployment_id = await add_deployment_coro 

403 

404 # Only add the flow to the map if it is not loaded from storage 

405 # Further work is needed to support directly running flows created using `flow.from_source` 

406 if not getattr(flow, "_storage", None): 

407 self._deployment_flow_map[deployment_id] = flow 

408 return deployment_id 

409 

410 @sync_compatible 1a

411 async def _add_storage(self, storage: RunnerStorage) -> RunnerStorage: 1a

412 """ 

413 Adds a storage object to the runner. The storage object will be used to pull 

414 code to the runner's working directory before the runner starts. 

415 

416 Args: 

417 storage: The storage object to add to the runner. 

418 Returns: 

419 The updated storage object that was added to the runner. 

420 """ 

421 if storage not in self._storage_objs: 

422 storage_copy = deepcopy(storage) 

423 storage_copy.set_base_path(self._tmp_dir) 

424 

425 self._logger.debug( 

426 f"Adding storage {storage_copy!r} to runner at" 

427 f" {str(storage_copy.destination)!r}" 

428 ) 

429 self._storage_objs.append(storage_copy) 

430 

431 return storage_copy 

432 else: 

433 return next(s for s in self._storage_objs if s == storage) 

434 

435 def handle_sigterm(self, *args: Any, **kwargs: Any) -> None: 1a

436 """ 

437 Gracefully shuts down the runner when a SIGTERM is received. 

438 """ 

439 self._logger.info("SIGTERM received, initiating graceful shutdown...") 

440 from_sync.call_in_loop_thread(create_call(self.stop)) 

441 

442 sys.exit(0) 

443 

444 async def start( 1a

445 self, run_once: bool = False, webserver: Optional[bool] = None 

446 ) -> None: 

447 """ 

448 Starts a runner. 

449 

450 The runner will begin monitoring for and executing any scheduled work for all added flows. 

451 

452 Args: 

453 run_once: If True, the runner will through one query loop and then exit. 

454 webserver: a boolean for whether to start a webserver for this runner. If provided, 

455 overrides the default on the runner 

456 

457 Examples: 

458 Initialize a Runner, add two flows, and serve them by starting the Runner: 

459 

460 ```python 

461 import asyncio 

462 from prefect import flow, Runner 

463 

464 @flow 

465 def hello_flow(name): 

466 print(f"hello {name}") 

467 

468 @flow 

469 def goodbye_flow(name): 

470 print(f"goodbye {name}") 

471 

472 if __name__ == "__main__" 

473 runner = Runner(name="my-runner") 

474 

475 # Will be runnable via the API 

476 runner.add_flow(hello_flow) 

477 

478 # Run on a cron schedule 

479 runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"}) 

480 

481 asyncio.run(runner.start()) 

482 ``` 

483 """ 

484 from prefect.runner.server import start_webserver 

485 

486 if threading.current_thread() is threading.main_thread(): 

487 signal.signal(signal.SIGTERM, self.handle_sigterm) 

488 

489 webserver = webserver if webserver is not None else self.webserver 

490 

491 if webserver or PREFECT_RUNNER_SERVER_ENABLE.value(): 

492 # we'll start the ASGI server in a separate thread so that 

493 # uvicorn does not block the main thread 

494 server_thread = threading.Thread( 

495 name="runner-server-thread", 

496 target=partial( 

497 start_webserver, 

498 runner=self, 

499 ), 

500 daemon=True, 

501 ) 

502 server_thread.start() 

503 

504 start_client_metrics_server() 

505 

506 async with self as runner: 

507 # This task group isn't included in the exit stack because we want to 

508 # stay in this function until the runner is told to stop 

509 async with self._loops_task_group as loops_task_group: 

510 for storage in self._storage_objs: 

511 if storage.pull_interval: 

512 loops_task_group.start_soon( 

513 partial( 

514 critical_service_loop, 

515 workload=storage.pull_code, 

516 interval=storage.pull_interval, 

517 run_once=run_once, 

518 jitter_range=0.3, 

519 ) 

520 ) 

521 else: 

522 loops_task_group.start_soon(storage.pull_code) 

523 loops_task_group.start_soon( 

524 partial( 

525 critical_service_loop, 

526 workload=runner._get_and_submit_flow_runs, 

527 interval=self.query_seconds, 

528 run_once=run_once, 

529 jitter_range=0.3, 

530 ) 

531 ) 

532 

533 def execute_in_background( 1a

534 self, func: Callable[..., Any], *args: Any, **kwargs: Any 

535 ) -> "concurrent.futures.Future[Any]": 

536 """ 

537 Executes a function in the background. 

538 """ 

539 if TYPE_CHECKING: 

540 assert self._loop is not None 

541 

542 return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop) 

543 

544 async def cancel_all(self) -> None: 1a

545 runs_to_cancel: list["FlowRun"] = [] 

546 

547 # done to avoid dictionary size changing during iteration 

548 for info in self._flow_run_process_map.values(): 

549 runs_to_cancel.append(info["flow_run"]) 

550 if runs_to_cancel: 

551 for run in runs_to_cancel: 

552 try: 

553 await self._cancel_run(run, state_msg="Runner is shutting down.") 

554 except Exception: 

555 self._logger.exception( 

556 f"Exception encountered while cancelling {run.id}", 

557 exc_info=True, 

558 ) 

559 

560 @sync_compatible 1a

561 async def stop(self): 1a

562 """Stops the runner's polling cycle.""" 

563 if not self.started: 

564 raise RuntimeError( 

565 "Runner has not yet started. Please start the runner by calling" 

566 " .start()" 

567 ) 

568 

569 self.started = False 

570 self.stopping = True 

571 await self.cancel_all() 

572 try: 

573 self._loops_task_group.cancel_scope.cancel() 

574 except Exception: 

575 self._logger.exception( 

576 "Exception encountered while shutting down", exc_info=True 

577 ) 

578 

579 async def execute_flow_run( 1a

580 self, 

581 flow_run_id: UUID, 

582 entrypoint: str | None = None, 

583 command: str | None = None, 

584 cwd: Path | str | None = None, 

585 env: dict[str, str | None] | None = None, 

586 task_status: anyio.abc.TaskStatus[int] = anyio.TASK_STATUS_IGNORED, 

587 stream_output: bool = True, 

588 ) -> anyio.abc.Process | multiprocessing.context.SpawnProcess | None: 

589 """ 

590 Executes a single flow run with the given ID. 

591 

592 Execution will wait to monitor for cancellation requests. Exits once 

593 the flow run process has exited. 

594 

595 Returns: 

596 The flow run process. 

597 """ 

598 self.pause_on_shutdown = False 

599 context = self if not self.started else asyncnullcontext() 

600 

601 async with context: 

602 if not self._acquire_limit_slot(flow_run_id): 

603 return 

604 

605 self._submitting_flow_run_ids.add(flow_run_id) 

606 flow_run = await self._client.read_flow_run(flow_run_id) 

607 

608 process: ( 

609 anyio.abc.Process | multiprocessing.context.SpawnProcess | Exception 

610 ) = await self._runs_task_group.start( 

611 partial( 

612 self._submit_run_and_capture_errors, 

613 flow_run=flow_run, 

614 entrypoint=entrypoint, 

615 command=command, 

616 cwd=cwd, 

617 env=env, 

618 stream_output=stream_output, 

619 ), 

620 ) 

621 if isinstance(process, Exception): 

622 return 

623 

624 if process.pid is None: 

625 raise RuntimeError("Process has no PID") 

626 

627 task_status.started(process.pid) 

628 

629 if self.heartbeat_seconds is not None: 

630 await self._emit_flow_run_heartbeat(flow_run) 

631 

632 # Only add the process to the map if it is still running 

633 # The process may be a multiprocessing.context.SpawnProcess, in which case it will have an `exitcode`` attribute 

634 # but no `returncode` attribute 

635 if ( 

636 getattr(process, "returncode", None) 

637 or getattr(process, "exitcode", None) 

638 ) is None: 

639 await self._add_flow_run_process_map_entry( 

640 flow_run.id, ProcessMapEntry(pid=process.pid, flow_run=flow_run) 

641 ) 

642 

643 while True: 

644 # Wait until flow run execution is complete and the process has been removed from the map 

645 await anyio.sleep(0.1) 

646 if self._flow_run_process_map.get(flow_run.id) is None: 

647 break 

648 

649 return process 

650 

651 async def execute_bundle( 1a

652 self, 

653 bundle: SerializedBundle, 

654 cwd: Path | str | None = None, 

655 env: dict[str, str | None] | None = None, 

656 ) -> None: 

657 """ 

658 Executes a bundle in a subprocess. 

659 """ 

660 from prefect.client.schemas.objects import FlowRun 

661 

662 self.pause_on_shutdown = False 

663 context = self if not self.started else asyncnullcontext() 

664 

665 flow_run = FlowRun.model_validate(bundle["flow_run"]) 

666 

667 async with context: 

668 if not self._acquire_limit_slot(flow_run.id): 

669 return 

670 

671 process = execute_bundle_in_subprocess(bundle, cwd=cwd, env=env) 

672 

673 if process.pid is None: 

674 # This shouldn't happen because `execute_bundle_in_subprocess` starts the process 

675 # but we'll handle it gracefully anyway 

676 msg = "Failed to start process for flow execution. No PID returned." 

677 await self._propose_crashed_state(flow_run, msg) 

678 raise RuntimeError(msg) 

679 

680 if self.heartbeat_seconds is not None: 

681 await self._emit_flow_run_heartbeat(flow_run) 

682 

683 await self._add_flow_run_process_map_entry( 

684 flow_run.id, ProcessMapEntry(pid=process.pid, flow_run=flow_run) 

685 ) 

686 self._flow_run_bundle_map[flow_run.id] = bundle 

687 

688 await anyio.to_thread.run_sync(process.join) 

689 

690 await self._remove_flow_run_process_map_entry(flow_run.id) 

691 

692 flow_run_logger = self._get_flow_run_logger(flow_run) 

693 if process.exitcode is None: 

694 raise RuntimeError("Process has no exit code") 

695 

696 if process.exitcode: 

697 help_message = None 

698 level = logging.ERROR 

699 if process.exitcode == -9: 

700 level = logging.INFO 

701 help_message = ( 

702 "This indicates that the process exited due to a SIGKILL signal. " 

703 "Typically, this is either caused by manual cancellation or " 

704 "high memory usage causing the operating system to " 

705 "terminate the process." 

706 ) 

707 if process.exitcode == -15: 

708 level = logging.INFO 

709 help_message = ( 

710 "This indicates that the process exited due to a SIGTERM signal. " 

711 "Typically, this is caused by manual cancellation." 

712 ) 

713 elif process.exitcode == 247: 

714 help_message = ( 

715 "This indicates that the process was terminated due to high " 

716 "memory usage." 

717 ) 

718 elif ( 

719 sys.platform == "win32" 

720 and process.exitcode == STATUS_CONTROL_C_EXIT 

721 ): 

722 level = logging.INFO 

723 help_message = ( 

724 "Process was terminated due to a Ctrl+C or Ctrl+Break signal. " 

725 "Typically, this is caused by manual cancellation." 

726 ) 

727 

728 flow_run_logger.log( 

729 level, 

730 f"Process for flow run {flow_run.name!r} exited with status code:" 

731 f" {process.exitcode}" 

732 + (f"; {help_message}" if help_message else ""), 

733 ) 

734 terminal_state = await self._propose_crashed_state( 

735 flow_run, help_message or "Process exited with non-zero exit code" 

736 ) 

737 if terminal_state: 

738 await self._run_on_crashed_hooks( 

739 flow_run=flow_run, state=terminal_state 

740 ) 

741 else: 

742 flow_run_logger.info( 

743 f"Process for flow run {flow_run.name!r} exited cleanly." 

744 ) 

745 

746 def _get_flow_run_logger(self, flow_run: "FlowRun") -> PrefectLogAdapter: 1a

747 return flow_run_logger(flow_run=flow_run).getChild( 

748 "runner", 

749 extra={ 

750 "runner_name": self.name, 

751 }, 

752 ) 

753 

754 async def _run_process( 1a

755 self, 

756 flow_run: "FlowRun", 

757 task_status: anyio.abc.TaskStatus[ 

758 anyio.abc.Process | multiprocessing.context.SpawnProcess 

759 ] = anyio.TASK_STATUS_IGNORED, 

760 entrypoint: str | None = None, 

761 command: str | None = None, 

762 cwd: Path | str | None = None, 

763 env: dict[str, str | None] | None = None, 

764 stream_output: bool = True, 

765 ) -> int | None: 

766 """ 

767 Runs the given flow run in a subprocess. 

768 

769 Args: 

770 flow_run: Flow run to execute via process. The ID of this flow run 

771 is stored in the PREFECT__FLOW_RUN_ID environment variable to 

772 allow the engine to retrieve the corresponding flow's code and 

773 begin execution. 

774 task_status: anyio task status used to send a message to the caller 

775 than the flow run process has started. 

776 """ 

777 # If we have an instance of the flow for this deployment, run it directly in a subprocess 

778 if flow_run.deployment_id is not None: 

779 flow = self._deployment_flow_map.get(flow_run.deployment_id) 

780 if flow: 

781 process = run_flow_in_subprocess(flow, flow_run=flow_run) 

782 task_status.started(process) 

783 await anyio.to_thread.run_sync(process.join) 

784 return process.exitcode 

785 

786 # Otherwise, we'll need to run a `python -m prefect.engine` command to load and run the flow 

787 if command is None: 

788 runner_command = [get_sys_executable(), "-m", "prefect.engine"] 

789 else: 

790 runner_command = shlex.split(command, posix=(os.name != "nt")) 

791 

792 flow_run_logger = self._get_flow_run_logger(flow_run) 

793 

794 # We must add creationflags to a dict so it is only passed as a function 

795 # parameter on Windows, because the presence of creationflags causes 

796 # errors on Unix even if set to None 

797 kwargs: Dict[str, object] = {} 

798 if sys.platform == "win32": 

799 kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP 

800 

801 flow_run_logger.info("Opening process...") 

802 

803 if env is None: 

804 env = {} 

805 env.update(get_current_settings().to_environment_variables(exclude_unset=True)) 

806 env.update( 

807 { 

808 **{ 

809 "PREFECT__FLOW_RUN_ID": str(flow_run.id), 

810 "PREFECT__STORAGE_BASE_PATH": str(self._tmp_dir), 

811 "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS": "false", 

812 }, 

813 **({"PREFECT__FLOW_ENTRYPOINT": entrypoint} if entrypoint else {}), 

814 } 

815 ) 

816 env.update(**os.environ) # is this really necessary?? 

817 

818 storage = ( 

819 self._deployment_storage_map.get(flow_run.deployment_id) 

820 if flow_run.deployment_id 

821 else None 

822 ) 

823 if storage and storage.pull_interval: 

824 # perform an adhoc pull of code before running the flow if an 

825 # adhoc pull hasn't been performed in the last pull_interval 

826 # TODO: Explore integrating this behavior with global concurrency. 

827 last_adhoc_pull = getattr(storage, "last_adhoc_pull", None) 

828 if ( 

829 last_adhoc_pull is None 

830 or last_adhoc_pull 

831 < datetime.datetime.now() 

832 - datetime.timedelta(seconds=storage.pull_interval) 

833 ): 

834 self._logger.debug( 

835 "Performing adhoc pull of code for flow run %s with storage %r", 

836 flow_run.id, 

837 storage, 

838 ) 

839 await storage.pull_code() 

840 setattr(storage, "last_adhoc_pull", datetime.datetime.now()) 

841 

842 process = await run_process( 

843 command=runner_command, 

844 stream_output=stream_output, 

845 task_status=task_status, 

846 task_status_handler=lambda process: process, 

847 env=env, 

848 cwd=storage.destination if storage else cwd, 

849 **kwargs, 

850 ) 

851 

852 return process.returncode 

853 

854 async def _kill_process( 1a

855 self, 

856 pid: int, 

857 grace_seconds: int = 30, 

858 ): 

859 """ 

860 Kills a given flow run process. 

861 

862 Args: 

863 pid: ID of the process to kill 

864 grace_seconds: Number of seconds to wait for the process to end. 

865 """ 

866 # In a non-windows environment first send a SIGTERM, then, after 

867 # `grace_seconds` seconds have passed subsequent send SIGKILL. In 

868 # Windows we use CTRL_BREAK_EVENT as SIGTERM is useless: 

869 # https://bugs.python.org/issue26350 

870 if sys.platform == "win32": 

871 try: 

872 os.kill(pid, signal.CTRL_BREAK_EVENT) 

873 except (ProcessLookupError, WindowsError): 

874 raise RuntimeError( 

875 f"Unable to kill process {pid!r}: The process was not found." 

876 ) 

877 else: 

878 try: 

879 os.kill(pid, signal.SIGTERM) 

880 except ProcessLookupError: 

881 raise RuntimeError( 

882 f"Unable to kill process {pid!r}: The process was not found." 

883 ) 

884 

885 # Throttle how often we check if the process is still alive to keep 

886 # from making too many system calls in a short period of time. 

887 check_interval = max(grace_seconds / 10, 1) 

888 

889 with anyio.move_on_after(grace_seconds): 

890 while True: 

891 await anyio.sleep(check_interval) 

892 

893 # Detect if the process is still alive. If not do an early 

894 # return as the process respected the SIGTERM from above. 

895 try: 

896 os.kill(pid, 0) 

897 except ProcessLookupError: 

898 return 

899 

900 try: 

901 os.kill(pid, signal.SIGKILL) 

902 except OSError: 

903 # We shouldn't ever end up here, but it's possible that the 

904 # process ended right after the check above. 

905 return 

906 

907 def reschedule_current_flow_runs( 1a

908 self, 

909 ) -> None: 

910 """ 

911 Reschedules all flow runs that are currently running. 

912 

913 This should only be called when the runner is shutting down because it kill all 

914 child processes and short-circuit the crash detection logic. 

915 """ 

916 self._rescheduling = True 

917 # Create a new sync client because this will often run in a separate thread 

918 # as part of a signal handler. 

919 with get_client(sync_client=True) as client: 

920 self._logger.info("Rescheduling flow runs...") 

921 for process_info in self._flow_run_process_map.values(): 

922 flow_run = process_info["flow_run"] 

923 run_logger = self._get_flow_run_logger(flow_run) 

924 run_logger.info( 

925 "Rescheduling flow run for resubmission in response to SIGTERM" 

926 ) 

927 try: 

928 propose_state_sync(client, AwaitingRetry(), flow_run_id=flow_run.id) 

929 os.kill(process_info["pid"], signal.SIGTERM) 

930 run_logger.info("Rescheduled flow run for resubmission") 

931 except ProcessLookupError: 

932 # Process may have already exited 

933 pass 

934 except Abort as exc: 

935 run_logger.info( 

936 ( 

937 "Aborted submission of flow run. " 

938 f"Server sent an abort signal: {exc}" 

939 ), 

940 ) 

941 except Exception: 

942 run_logger.exception( 

943 "Failed to reschedule flow run", 

944 ) 

945 

946 async def _pause_schedules(self): 1a

947 """ 

948 Pauses all deployment schedules. 

949 """ 

950 self._logger.info("Pausing all deployments...") 

951 for deployment_id in self._deployment_ids: 

952 await self._client.pause_deployment(deployment_id) 

953 self._logger.debug(f"Paused deployment '{deployment_id}'") 

954 

955 self._logger.info("All deployments have been paused!") 

956 

957 async def _get_and_submit_flow_runs(self): 1a

958 if self.stopping: 

959 return 

960 runs_response = await self._get_scheduled_flow_runs() 

961 self.last_polled: datetime.datetime = now("UTC") 

962 return await self._submit_scheduled_flow_runs(flow_run_response=runs_response) 

963 

964 async def _cancel_run( 1a

965 self, flow_run: "FlowRun | uuid.UUID", state_msg: Optional[str] = None 

966 ): 

967 if isinstance(flow_run, uuid.UUID): 

968 flow_run = await self._client.read_flow_run(flow_run) 

969 run_logger = self._get_flow_run_logger(flow_run) 

970 

971 process_map_entry = self._flow_run_process_map.get(flow_run.id) 

972 

973 pid = process_map_entry.get("pid") if process_map_entry else None 

974 if not pid: 

975 self._logger.debug( 

976 "Received cancellation request for flow run %s but no process was found.", 

977 flow_run.id, 

978 ) 

979 return 

980 

981 try: 

982 await self._kill_process(pid) 

983 except RuntimeError as exc: 

984 self._logger.warning(f"{exc} Marking flow run as cancelled.") 

985 if flow_run.state: 

986 await self._run_on_cancellation_hooks(flow_run, flow_run.state) 

987 await self._mark_flow_run_as_cancelled(flow_run) 

988 except Exception: 

989 run_logger.exception( 

990 "Encountered exception while killing process for flow run " 

991 f"'{flow_run.id}'. Flow run may not be cancelled." 

992 ) 

993 # We will try again on generic exceptions 

994 self._cancelling_flow_run_ids.remove(flow_run.id) 

995 else: 

996 if flow_run.state: 

997 await self._run_on_cancellation_hooks(flow_run, flow_run.state) 

998 await self._mark_flow_run_as_cancelled( 

999 flow_run, 

1000 state_updates={ 

1001 "message": state_msg or "Flow run was cancelled successfully." 

1002 }, 

1003 ) 

1004 

1005 flow, deployment = await self._get_flow_and_deployment(flow_run) 

1006 await self._emit_flow_run_cancelled_event( 

1007 flow_run=flow_run, flow=flow, deployment=deployment 

1008 ) 

1009 run_logger.info(f"Cancelled flow run '{flow_run.name}'!") 

1010 

1011 async def _get_flow_and_deployment( 1a

1012 self, flow_run: "FlowRun" 

1013 ) -> tuple[Optional["APIFlow"], Optional["DeploymentResponse"]]: 

1014 deployment: Optional["DeploymentResponse"] = ( 

1015 self._deployment_cache.get(flow_run.deployment_id) 

1016 if flow_run.deployment_id 

1017 else None 

1018 ) 

1019 flow: Optional["APIFlow"] = self._flow_cache.get(flow_run.flow_id) 

1020 if not deployment and flow_run.deployment_id is not None: 

1021 try: 

1022 deployment = await self._client.read_deployment(flow_run.deployment_id) 

1023 self._deployment_cache[flow_run.deployment_id] = deployment 

1024 except ObjectNotFound: 

1025 deployment = None 

1026 if not flow: 

1027 try: 

1028 flow = await self._client.read_flow(flow_run.flow_id) 

1029 self._flow_cache[flow_run.flow_id] = flow 

1030 except ObjectNotFound: 

1031 flow = None 

1032 return flow, deployment 

1033 

1034 async def _emit_flow_run_heartbeats(self): 1a

1035 coros: list[Coroutine[Any, Any, Any]] = [] 

1036 for entry in self._flow_run_process_map.values(): 

1037 coros.append(self._emit_flow_run_heartbeat(entry["flow_run"])) 

1038 await asyncio.gather(*coros) 

1039 

1040 async def _emit_flow_run_heartbeat(self, flow_run: "FlowRun"): 1a

1041 from prefect import __version__ 

1042 

1043 related: list[RelatedResource] = [] 

1044 tags: list[str] = [] 

1045 

1046 flow, deployment = await self._get_flow_and_deployment(flow_run) 

1047 if deployment: 

1048 related.append(deployment.as_related_resource()) 

1049 tags.extend(deployment.tags) 

1050 if flow: 

1051 related.append( 

1052 RelatedResource( 

1053 { 

1054 "prefect.resource.id": f"prefect.flow.{flow.id}", 

1055 "prefect.resource.role": "flow", 

1056 "prefect.resource.name": flow.name, 

1057 } 

1058 ) 

1059 ) 

1060 tags.extend(flow_run.tags) 

1061 

1062 related = [RelatedResource.model_validate(r) for r in related] 

1063 related += tags_as_related_resources(set(tags)) 

1064 

1065 await self._events_client.emit( 

1066 Event( 

1067 event="prefect.flow-run.heartbeat", 

1068 resource=Resource( 

1069 { 

1070 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}", 

1071 "prefect.resource.name": flow_run.name, 

1072 "prefect.version": __version__, 

1073 } 

1074 ), 

1075 related=related, 

1076 ) 

1077 ) 

1078 

1079 def _event_resource(self): 1a

1080 from prefect import __version__ 

1081 

1082 return { 

1083 "prefect.resource.id": f"prefect.runner.{slugify(self.name)}", 

1084 "prefect.resource.name": self.name, 

1085 "prefect.version": __version__, 

1086 } 

1087 

1088 async def _emit_flow_run_cancelled_event( 1a

1089 self, 

1090 flow_run: "FlowRun", 

1091 flow: "Optional[APIFlow]", 

1092 deployment: "Optional[DeploymentResponse]", 

1093 ): 

1094 related: list[RelatedResource] = [] 

1095 tags: list[str] = [] 

1096 if deployment: 

1097 related.append(deployment.as_related_resource()) 

1098 tags.extend(deployment.tags) 

1099 if flow: 

1100 related.append( 

1101 RelatedResource( 

1102 { 

1103 "prefect.resource.id": f"prefect.flow.{flow.id}", 

1104 "prefect.resource.role": "flow", 

1105 "prefect.resource.name": flow.name, 

1106 } 

1107 ) 

1108 ) 

1109 related.append( 

1110 RelatedResource( 

1111 { 

1112 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}", 

1113 "prefect.resource.role": "flow-run", 

1114 "prefect.resource.name": flow_run.name, 

1115 } 

1116 ) 

1117 ) 

1118 tags.extend(flow_run.tags) 

1119 

1120 related = [RelatedResource.model_validate(r) for r in related] 

1121 related += tags_as_related_resources(set(tags)) 

1122 

1123 await self._events_client.emit( 

1124 Event( 

1125 event="prefect.runner.cancelled-flow-run", 

1126 resource=Resource(self._event_resource()), 

1127 related=related, 

1128 ) 

1129 ) 

1130 self._logger.debug(f"Emitted flow run heartbeat event for {flow_run.id}") 

1131 

1132 async def _get_scheduled_flow_runs( 1a

1133 self, 

1134 ) -> list["FlowRun"]: 

1135 """ 

1136 Retrieve scheduled flow runs for this runner. 

1137 """ 

1138 scheduled_before = now("UTC") + datetime.timedelta( 

1139 seconds=int(self._prefetch_seconds) 

1140 ) 

1141 self._logger.debug( 

1142 f"Querying for flow runs scheduled before {scheduled_before}" 

1143 ) 

1144 

1145 scheduled_flow_runs = ( 

1146 await self._client.get_scheduled_flow_runs_for_deployments( 

1147 deployment_ids=list(self._deployment_ids), 

1148 scheduled_before=scheduled_before, 

1149 ) 

1150 ) 

1151 self._logger.debug(f"Discovered {len(scheduled_flow_runs)} scheduled_flow_runs") 

1152 return scheduled_flow_runs 

1153 

1154 def has_slots_available(self) -> bool: 1a

1155 """ 

1156 Determine if the flow run limit has been reached. 

1157 

1158 Returns: 

1159 - bool: True if the limit has not been reached, False otherwise. 

1160 """ 

1161 if not self._limiter: 

1162 return False 

1163 return self._limiter.available_tokens > 0 

1164 

1165 def _acquire_limit_slot(self, flow_run_id: UUID) -> bool: 1a

1166 """ 

1167 Enforces flow run limit set on runner. 

1168 

1169 Returns: 

1170 - bool: True if a slot was acquired, False otherwise. 

1171 """ 

1172 try: 

1173 if self._limiter: 

1174 self._limiter.acquire_on_behalf_of_nowait(flow_run_id) 

1175 self._logger.debug("Limit slot acquired for flow run '%s'", flow_run_id) 

1176 return True 

1177 except RuntimeError as exc: 

1178 if ( 

1179 "this borrower is already holding one of this CapacityLimiter's tokens" 

1180 in str(exc) 

1181 ): 

1182 self._logger.warning( 

1183 f"Duplicate submission of flow run '{flow_run_id}' detected. Runner" 

1184 " will not re-submit flow run." 

1185 ) 

1186 return False 

1187 else: 

1188 raise 

1189 except anyio.WouldBlock: 

1190 if TYPE_CHECKING: 

1191 assert self._limiter is not None 

1192 self._logger.debug( 

1193 f"Flow run limit reached; {self._limiter.borrowed_tokens} flow runs" 

1194 " in progress. You can control this limit by adjusting the " 

1195 "PREFECT_RUNNER_PROCESS_LIMIT setting." 

1196 ) 

1197 return False 

1198 

1199 def _release_limit_slot(self, flow_run_id: UUID) -> None: 1a

1200 """ 

1201 Frees up a slot taken by the given flow run id. 

1202 """ 

1203 if self._limiter: 

1204 self._limiter.release_on_behalf_of(flow_run_id) 

1205 self._logger.debug("Limit slot released for flow run '%s'", flow_run_id) 

1206 

1207 async def _submit_scheduled_flow_runs( 1a

1208 self, 

1209 flow_run_response: list["FlowRun"], 

1210 entrypoints: list[str] | None = None, 

1211 ) -> list["FlowRun"]: 

1212 """ 

1213 Takes a list of FlowRuns and submits the referenced flow runs 

1214 for execution by the runner. 

1215 """ 

1216 submittable_flow_runs = sorted( 

1217 flow_run_response, 

1218 key=lambda run: run.next_scheduled_start_time or datetime.datetime.max, 

1219 ) 

1220 

1221 for i, flow_run in enumerate(submittable_flow_runs): 

1222 if flow_run.id in self._submitting_flow_run_ids: 

1223 continue 

1224 

1225 if self._acquire_limit_slot(flow_run.id): 

1226 run_logger = self._get_flow_run_logger(flow_run) 

1227 run_logger.info( 

1228 f"Runner '{self.name}' submitting flow run '{flow_run.id}'" 

1229 ) 

1230 self._submitting_flow_run_ids.add(flow_run.id) 

1231 self._runs_task_group.start_soon( 

1232 partial( 

1233 self._submit_run, 

1234 flow_run=flow_run, 

1235 entrypoint=( 

1236 entrypoints[i] if entrypoints else None 

1237 ), # TODO: avoid relying on index 

1238 ) 

1239 ) 

1240 else: 

1241 break 

1242 

1243 return list( 

1244 filter( 

1245 lambda run: run.id in self._submitting_flow_run_ids, 

1246 submittable_flow_runs, 

1247 ) 

1248 ) 

1249 

1250 async def _submit_run(self, flow_run: "FlowRun", entrypoint: Optional[str] = None): 1a

1251 """ 

1252 Submits a given flow run for execution by the runner. 

1253 """ 

1254 run_logger = self._get_flow_run_logger(flow_run) 

1255 

1256 ready_to_submit = await self._propose_pending_state(flow_run) 

1257 

1258 if ready_to_submit: 

1259 readiness_result: ( 

1260 anyio.abc.Process | Exception 

1261 ) = await self._runs_task_group.start( 

1262 partial( 

1263 self._submit_run_and_capture_errors, 

1264 flow_run=flow_run, 

1265 entrypoint=entrypoint, 

1266 ), 

1267 ) 

1268 

1269 if readiness_result and not isinstance(readiness_result, Exception): 

1270 await self._add_flow_run_process_map_entry( 

1271 flow_run.id, 

1272 ProcessMapEntry(pid=readiness_result.pid, flow_run=flow_run), 

1273 ) 

1274 # Heartbeats are opt-in and only emitted if a heartbeat frequency is set 

1275 if self.heartbeat_seconds is not None: 

1276 await self._emit_flow_run_heartbeat(flow_run) 

1277 

1278 run_logger.info(f"Completed submission of flow run '{flow_run.id}'") 

1279 else: 

1280 # If the run is not ready to submit, release the concurrency slot 

1281 self._release_limit_slot(flow_run.id) 

1282 

1283 self._submitting_flow_run_ids.discard(flow_run.id) 

1284 

1285 async def _submit_run_and_capture_errors( 1a

1286 self, 

1287 flow_run: "FlowRun", 

1288 task_status: anyio.abc.TaskStatus[ 

1289 anyio.abc.Process | multiprocessing.context.SpawnProcess | Exception 

1290 ], 

1291 entrypoint: str | None = None, 

1292 command: str | None = None, 

1293 cwd: Path | str | None = None, 

1294 env: dict[str, str | None] | None = None, 

1295 stream_output: bool = True, 

1296 ) -> Union[Optional[int], Exception]: 

1297 run_logger = self._get_flow_run_logger(flow_run) 

1298 

1299 try: 

1300 exit_code = await self._run_process( 

1301 flow_run=flow_run, 

1302 task_status=task_status, 

1303 entrypoint=entrypoint, 

1304 command=command, 

1305 cwd=cwd, 

1306 env=env, 

1307 stream_output=stream_output, 

1308 ) 

1309 flow_run_logger = self._get_flow_run_logger(flow_run) 

1310 if exit_code: 

1311 help_message = None 

1312 level = logging.ERROR 

1313 if exit_code == -9: 

1314 level = logging.INFO 

1315 help_message = ( 

1316 "This indicates that the process exited due to a SIGKILL signal. " 

1317 "Typically, this is either caused by manual cancellation or " 

1318 "high memory usage causing the operating system to " 

1319 "terminate the process." 

1320 ) 

1321 if exit_code == -15: 

1322 level = logging.INFO 

1323 help_message = ( 

1324 "This indicates that the process exited due to a SIGTERM signal. " 

1325 "Typically, this is caused by manual cancellation." 

1326 ) 

1327 elif exit_code == 247: 

1328 help_message = ( 

1329 "This indicates that the process was terminated due to high " 

1330 "memory usage." 

1331 ) 

1332 elif sys.platform == "win32" and exit_code == STATUS_CONTROL_C_EXIT: 

1333 level = logging.INFO 

1334 help_message = ( 

1335 "Process was terminated due to a Ctrl+C or Ctrl+Break signal. " 

1336 "Typically, this is caused by manual cancellation." 

1337 ) 

1338 

1339 flow_run_logger.log( 

1340 level, 

1341 f"Process for flow run {flow_run.name!r} exited with status code:" 

1342 f" {exit_code}" + (f"; {help_message}" if help_message else ""), 

1343 ) 

1344 else: 

1345 flow_run_logger.info( 

1346 f"Process for flow run {flow_run.name!r} exited cleanly." 

1347 ) 

1348 except Exception as exc: 

1349 if not task_status._future.done(): # type: ignore 

1350 # This flow run was being submitted and did not start successfully 

1351 run_logger.exception( 

1352 f"Failed to start process for flow run '{flow_run.id}'." 

1353 ) 

1354 # Mark the task as started to prevent runner crash 

1355 task_status.started(exc) 

1356 message = f"Flow run process could not be started:\n{exc!r}" 

1357 await self._propose_crashed_state(flow_run, message) 

1358 else: 

1359 run_logger.exception( 

1360 f"An error occurred while monitoring flow run '{flow_run.id}'. " 

1361 "The flow run will not be marked as failed, but an issue may have " 

1362 "occurred." 

1363 ) 

1364 return exc 

1365 finally: 

1366 self._release_limit_slot(flow_run.id) 

1367 

1368 await self._remove_flow_run_process_map_entry(flow_run.id) 

1369 

1370 if exit_code != 0 and not self._rescheduling: 

1371 await self._propose_crashed_state( 

1372 flow_run, 

1373 f"Flow run process exited with non-zero status code {exit_code}.", 

1374 ) 

1375 

1376 try: 

1377 api_flow_run = await self._client.read_flow_run(flow_run_id=flow_run.id) 

1378 terminal_state = api_flow_run.state 

1379 if terminal_state and terminal_state.is_crashed(): 

1380 await self._run_on_crashed_hooks( 

1381 flow_run=flow_run, state=terminal_state 

1382 ) 

1383 except ObjectNotFound: 

1384 # Flow run was deleted - log it but don't crash the runner 

1385 run_logger = self._get_flow_run_logger(flow_run) 

1386 run_logger.debug( 

1387 f"Flow run '{flow_run.id}' was deleted before final state could be checked" 

1388 ) 

1389 

1390 return exit_code 

1391 

1392 async def _propose_pending_state(self, flow_run: "FlowRun") -> bool: 1a

1393 run_logger = self._get_flow_run_logger(flow_run) 

1394 state = flow_run.state 

1395 try: 

1396 state = await propose_state( 

1397 self._client, Pending(), flow_run_id=flow_run.id 

1398 ) 

1399 except Abort as exc: 

1400 run_logger.info( 

1401 ( 

1402 f"Aborted submission of flow run '{flow_run.id}'. " 

1403 f"Server sent an abort signal: {exc}" 

1404 ), 

1405 ) 

1406 return False 

1407 except Exception: 

1408 run_logger.exception( 

1409 f"Failed to update state of flow run '{flow_run.id}'", 

1410 ) 

1411 return False 

1412 

1413 if not state.is_pending(): 

1414 run_logger.info( 

1415 ( 

1416 f"Aborted submission of flow run '{flow_run.id}': " 

1417 f"Server returned a non-pending state {state.type.value!r}" 

1418 ), 

1419 ) 

1420 return False 

1421 

1422 return True 

1423 

1424 async def _propose_failed_state(self, flow_run: "FlowRun", exc: Exception) -> None: 1a

1425 run_logger = self._get_flow_run_logger(flow_run) 

1426 try: 

1427 await propose_state( 

1428 self._client, 

1429 await exception_to_failed_state(message="Submission failed.", exc=exc), 

1430 flow_run_id=flow_run.id, 

1431 ) 

1432 except Abort: 

1433 # We've already failed, no need to note the abort but we don't want it to 

1434 # raise in the agent process 

1435 pass 

1436 except Exception: 

1437 run_logger.error( 

1438 f"Failed to update state of flow run '{flow_run.id}'", 

1439 exc_info=True, 

1440 ) 

1441 

1442 async def _propose_crashed_state( 1a

1443 self, flow_run: "FlowRun", message: str 

1444 ) -> State[Any] | None: 

1445 run_logger = self._get_flow_run_logger(flow_run) 

1446 state = None 

1447 try: 

1448 state = await propose_state( 

1449 self._client, 

1450 Crashed(message=message), 

1451 flow_run_id=flow_run.id, 

1452 ) 

1453 except Abort: 

1454 # Flow run already marked as failed 

1455 pass 

1456 except ObjectNotFound: 

1457 # Flow run was deleted - log it but don't crash the runner 

1458 run_logger.debug( 

1459 f"Flow run '{flow_run.id}' was deleted before state could be updated" 

1460 ) 

1461 except Exception: 

1462 run_logger.exception(f"Failed to update state of flow run '{flow_run.id}'") 

1463 else: 

1464 if state.is_crashed(): 

1465 run_logger.info( 

1466 f"Reported flow run '{flow_run.id}' as crashed: {message}" 

1467 ) 

1468 return state 

1469 

1470 async def _mark_flow_run_as_cancelled( 1a

1471 self, flow_run: "FlowRun", state_updates: Optional[dict[str, Any]] = None 

1472 ) -> None: 

1473 state_updates = state_updates or {} 

1474 state_updates.setdefault("name", "Cancelled") 

1475 state_updates.setdefault("type", StateType.CANCELLED) 

1476 state = ( 

1477 flow_run.state.model_copy(update=state_updates) if flow_run.state else None 

1478 ) 

1479 if not state: 

1480 self._logger.warning( 

1481 f"Could not find state for flow run {flow_run.id} and cancellation cannot be guaranteed." 

1482 ) 

1483 return 

1484 

1485 try: 

1486 await self._client.set_flow_run_state(flow_run.id, state, force=True) 

1487 except ObjectNotFound: 

1488 # Flow run was deleted - log it but don't crash the runner 

1489 run_logger = self._get_flow_run_logger(flow_run) 

1490 run_logger.debug( 

1491 f"Flow run '{flow_run.id}' was deleted before it could be marked as cancelled" 

1492 ) 

1493 

1494 async def _run_on_cancellation_hooks( 1a

1495 self, 

1496 flow_run: "FlowRun", 

1497 state: State, 

1498 ) -> None: 

1499 """ 

1500 Run the hooks for a flow. 

1501 """ 

1502 run_logger = self._get_flow_run_logger(flow_run) 

1503 if state.is_cancelling(): 

1504 try: 

1505 if flow_run.id in self._flow_run_bundle_map: 

1506 flow = extract_flow_from_bundle( 

1507 self._flow_run_bundle_map[flow_run.id] 

1508 ) 

1509 elif flow_run.deployment_id and self._deployment_flow_map.get( 

1510 flow_run.deployment_id 

1511 ): 

1512 flow = self._deployment_flow_map[flow_run.deployment_id] 

1513 else: 

1514 run_logger.info("Loading flow to check for on_cancellation hooks") 

1515 flow = await load_flow_from_flow_run( 

1516 flow_run, storage_base_path=str(self._tmp_dir) 

1517 ) 

1518 hooks = flow.on_cancellation_hooks or [] 

1519 

1520 await _run_hooks(hooks, flow_run, flow, state) 

1521 except Exception: 

1522 run_logger.warning( 

1523 f"Runner failed to retrieve flow to execute on_cancellation hooks for flow run {flow_run.id!r}.", 

1524 exc_info=True, 

1525 ) 

1526 

1527 async def _run_on_crashed_hooks( 1a

1528 self, 

1529 flow_run: "FlowRun", 

1530 state: State, 

1531 ) -> None: 

1532 """ 

1533 Run the hooks for a flow. 

1534 """ 

1535 run_logger = self._get_flow_run_logger(flow_run) 

1536 if state.is_crashed(): 

1537 try: 

1538 if flow_run.id in self._flow_run_bundle_map: 

1539 flow = extract_flow_from_bundle( 

1540 self._flow_run_bundle_map[flow_run.id] 

1541 ) 

1542 elif flow_run.deployment_id and self._deployment_flow_map.get( 

1543 flow_run.deployment_id 

1544 ): 

1545 flow = self._deployment_flow_map[flow_run.deployment_id] 

1546 else: 

1547 run_logger.info("Loading flow to check for on_crashed hooks") 

1548 flow = await load_flow_from_flow_run( 

1549 flow_run, storage_base_path=str(self._tmp_dir) 

1550 ) 

1551 hooks = flow.on_crashed_hooks or [] 

1552 

1553 await _run_hooks(hooks, flow_run, flow, state) 

1554 except Exception: 

1555 run_logger.warning( 

1556 f"Runner failed to retrieve flow to execute on_crashed hooks for flow run {flow_run.id!r}.", 

1557 exc_info=True, 

1558 ) 

1559 

1560 async def __aenter__(self) -> Self: 1a

1561 self._logger.debug("Starting runner...") 

1562 self._client = get_client() 

1563 # Be tolerant to concurrent/duplicate initialization attempts 

1564 self._tmp_dir.mkdir(parents=True, exist_ok=True) 

1565 

1566 self._limiter = anyio.CapacityLimiter(self.limit) if self.limit else None 

1567 

1568 if not hasattr(self, "_loop") or not self._loop: 

1569 self._loop = asyncio.get_event_loop() 

1570 

1571 self._cancelling_observer = await self._exit_stack.enter_async_context( 

1572 FlowRunCancellingObserver( 

1573 on_cancelling=lambda flow_run_id: self._runs_task_group.start_soon( 

1574 self._cancel_run, flow_run_id 

1575 ), 

1576 polling_interval=self.query_seconds, 

1577 ) 

1578 ) 

1579 await self._exit_stack.enter_async_context(self._client) 

1580 await self._exit_stack.enter_async_context(self._events_client) 

1581 

1582 if not hasattr(self, "_runs_task_group") or not self._runs_task_group: 

1583 self._runs_task_group: anyio.abc.TaskGroup = anyio.create_task_group() 

1584 await self._exit_stack.enter_async_context(self._runs_task_group) 

1585 

1586 if not hasattr(self, "_loops_task_group") or not self._loops_task_group: 

1587 self._loops_task_group: anyio.abc.TaskGroup = anyio.create_task_group() 

1588 

1589 if self.heartbeat_seconds is not None: 

1590 self._heartbeat_task = asyncio.create_task( 

1591 critical_service_loop( 

1592 workload=self._emit_flow_run_heartbeats, 

1593 interval=self.heartbeat_seconds, 

1594 jitter_range=0.3, 

1595 ) 

1596 ) 

1597 

1598 self.started = True 

1599 return self 

1600 

1601 async def __aexit__(self, *exc_info: Any) -> None: 1a

1602 self._logger.debug("Stopping runner...") 

1603 if self.pause_on_shutdown: 

1604 await self._pause_schedules() 

1605 self.started = False 

1606 

1607 for scope in self._scheduled_task_scopes: 

1608 scope.cancel() 

1609 

1610 await self._exit_stack.__aexit__(*exc_info) 

1611 

1612 # Be tolerant to already-removed temp directories 

1613 shutil.rmtree(str(self._tmp_dir), ignore_errors=True) 

1614 del self._runs_task_group, self._loops_task_group 

1615 

1616 if self._heartbeat_task: 

1617 self._heartbeat_task.cancel() 

1618 try: 

1619 await self._heartbeat_task 

1620 except asyncio.CancelledError: 

1621 pass 

1622 

1623 def __repr__(self) -> str: 1a

1624 return f"Runner(name={self.name!r})" 

1625 

1626 

1627if sys.platform == "win32": 1627 ↛ 1629line 1627 didn't jump to line 1629 because the condition on line 1627 was never true1a

1628 # exit code indicating that the process was terminated by Ctrl+C or Ctrl+Break 

1629 STATUS_CONTROL_C_EXIT = 0xC000013A 

1630 

1631 

1632async def _run_hooks( 1a

1633 hooks: list[FlowStateHook[Any, Any]], 

1634 flow_run: "FlowRun", 

1635 flow: "Flow[..., Any]", 

1636 state: State, 

1637): 

1638 logger = flow_run_logger(flow_run, flow) 

1639 for hook in hooks: 

1640 try: 

1641 logger.info( 

1642 f"Running hook {hook.__name__!r} in response to entering state" 

1643 f" {state.name!r}" 

1644 ) 

1645 if is_async_fn(hook): 

1646 await hook(flow=flow, flow_run=flow_run, state=state) 

1647 else: 

1648 await from_async.call_in_new_thread( 

1649 create_call(hook, flow=flow, flow_run=flow_run, state=state) 

1650 ) 

1651 except Exception: 

1652 logger.error( 

1653 f"An error was encountered while running hook {hook.__name__!r}", 

1654 exc_info=True, 

1655 ) 

1656 else: 

1657 logger.info(f"Hook {hook.__name__!r} finished running successfully")