Coverage for /usr/local/lib/python3.12/site-packages/prefect/workers/process.py: 37%

113 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Module containing the Process worker used for executing flow runs as subprocesses. 

3 

4To start a Process worker, run the following command: 

5 

6```bash 

7prefect worker start --pool 'my-work-pool' --type process 

8``` 

9 

10Replace `my-work-pool` with the name of the work pool you want the worker 

11to poll for flow runs. 

12 

13For more information about work pools and workers, 

14checkout out the [Prefect docs](https://docs.prefect.io/v3/concepts/work-pools/). 

15""" 

16 

17from __future__ import annotations 1a

18 

19import contextlib 1a

20import os 1a

21import tempfile 1a

22import threading 1a

23from functools import partial 1a

24from pathlib import Path 1a

25from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar 1a

26 

27import anyio 1a

28import anyio.abc 1a

29from pydantic import Field, field_validator 1a

30 

31from prefect._internal.schemas.validators import validate_working_dir 1a

32from prefect.client.schemas.objects import Flow as APIFlow 1a

33from prefect.runner.runner import Runner 1a

34from prefect.settings import PREFECT_WORKER_QUERY_SECONDS 1a

35from prefect.states import Pending 1a

36from prefect.utilities.processutils import get_sys_executable 1a

37from prefect.utilities.services import ( 1a

38 critical_service_loop, 

39 start_client_metrics_server, 

40 stop_client_metrics_server, 

41) 

42from prefect.workers.base import ( 1a

43 BaseJobConfiguration, 

44 BaseVariables, 

45 BaseWorker, 

46 BaseWorkerResult, 

47) 

48 

49if TYPE_CHECKING: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true1a

50 from prefect.client.schemas.objects import FlowRun, WorkPool 

51 from prefect.client.schemas.responses import DeploymentResponse 

52 from prefect.flows import Flow 

53 

54FR = TypeVar("FR") # used to capture the return type of a flow 1a

55 

56 

57class ProcessJobConfiguration(BaseJobConfiguration): 1a

58 stream_output: bool = Field(default=True) 1a

59 working_dir: Optional[Path] = Field(default=None) 1a

60 

61 @field_validator("working_dir") 1a

62 @classmethod 1a

63 def validate_working_dir(cls, v: Path | str | None) -> Path | None: 1a

64 if isinstance(v, str): 

65 return validate_working_dir(v) 

66 return v 

67 

68 def prepare_for_flow_run( 1a

69 self, 

70 flow_run: "FlowRun", 

71 deployment: "DeploymentResponse | None" = None, 

72 flow: "APIFlow | None" = None, 

73 work_pool: "WorkPool | None" = None, 

74 worker_name: str | None = None, 

75 ) -> None: 

76 super().prepare_for_flow_run(flow_run, deployment, flow, work_pool, worker_name) 

77 

78 self.env: dict[str, str | None] = {**os.environ, **self.env} 

79 self.command: str | None = ( 

80 f"{get_sys_executable()} -m prefect.engine" 

81 if self.command == self._base_flow_run_command() 

82 else self.command 

83 ) 

84 

85 @staticmethod 1a

86 def _base_flow_run_command() -> str: 1a

87 """ 

88 Override the base flow run command because enhanced cancellation doesn't 

89 work with the process worker. 

90 """ 

91 return "python -m prefect.engine" 

92 

93 

94class ProcessVariables(BaseVariables): 1a

95 stream_output: bool = Field( 1a

96 default=True, 

97 description=( 

98 "If enabled, workers will stream output from flow run processes to " 

99 "local standard output." 

100 ), 

101 ) 

102 working_dir: Optional[Path] = Field( 1a

103 default=None, 

104 title="Working Directory", 

105 description=( 

106 "If provided, workers will open flow run processes within the " 

107 "specified path as the working directory. Otherwise, a temporary " 

108 "directory will be created." 

109 ), 

110 ) 

111 

112 

113class ProcessWorkerResult(BaseWorkerResult): 1a

114 """Contains information about the final state of a completed process""" 

115 

116 

117class ProcessWorker( 1a

118 BaseWorker[ProcessJobConfiguration, ProcessVariables, ProcessWorkerResult] 

119): 

120 type = "process" 1a

121 job_configuration: type[ProcessJobConfiguration] = ProcessJobConfiguration 1a

122 job_configuration_variables: type[ProcessVariables] | None = ProcessVariables 1a

123 

124 _description = ( 1a

125 "Execute flow runs as subprocesses on a worker. Works well for local execution" 

126 " when first getting started." 

127 ) 

128 _display_name = "Process" 1a

129 _documentation_url = "https://docs.prefect.io/latest/get-started/quickstart" 1a

130 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/356e6766a91baf20e1d08bbe16e8b5aaef4d8643-48x48.png" 1a

131 

132 async def start( 1a

133 self, 

134 run_once: bool = False, 

135 with_healthcheck: bool = False, 

136 printer: Callable[..., None] = print, 

137 ) -> None: 

138 """ 

139 Starts the worker and runs the main worker loops. 

140 

141 By default, the worker will run loops to poll for scheduled/cancelled flow 

142 runs and sync with the Prefect API server. 

143 

144 If `run_once` is set, the worker will only run each loop once and then return. 

145 

146 If `with_healthcheck` is set, the worker will start a healthcheck server which 

147 can be used to determine if the worker is still polling for flow runs and restart 

148 the worker if necessary. 

149 

150 Args: 

151 run_once: If set, the worker will only run each loop once then return. 

152 with_healthcheck: If set, the worker will start a healthcheck server. 

153 printer: A `print`-like function where logs will be reported. 

154 """ 

155 healthcheck_server = None 

156 healthcheck_thread = None 

157 try: 

158 async with self as worker: 

159 # wait for an initial heartbeat to configure the worker 

160 await worker.sync_with_backend() 

161 # schedule the scheduled flow run polling loop 

162 async with anyio.create_task_group() as loops_task_group: 

163 loops_task_group.start_soon( 

164 partial( 

165 critical_service_loop, 

166 workload=self.get_and_submit_flow_runs, 

167 interval=PREFECT_WORKER_QUERY_SECONDS.value(), 

168 run_once=run_once, 

169 jitter_range=0.3, 

170 backoff=4, # Up to ~1 minute interval during backoff 

171 ) 

172 ) 

173 # schedule the sync loop 

174 loops_task_group.start_soon( 

175 partial( 

176 critical_service_loop, 

177 workload=self.sync_with_backend, 

178 interval=self.heartbeat_interval_seconds, 

179 run_once=run_once, 

180 jitter_range=0.3, 

181 backoff=4, 

182 ) 

183 ) 

184 

185 self._started_event = await self._emit_worker_started_event() 

186 

187 start_client_metrics_server() 

188 

189 if with_healthcheck: 

190 from prefect.workers.server import build_healthcheck_server 

191 

192 # we'll start the ASGI server in a separate thread so that 

193 # uvicorn does not block the main thread 

194 healthcheck_server = build_healthcheck_server( 

195 worker=worker, 

196 query_interval_seconds=PREFECT_WORKER_QUERY_SECONDS.value(), 

197 ) 

198 healthcheck_thread = threading.Thread( 

199 name="healthcheck-server-thread", 

200 target=healthcheck_server.run, 

201 daemon=True, 

202 ) 

203 healthcheck_thread.start() 

204 printer(f"Worker {worker.name!r} started!") 

205 

206 # If running once, wait for active runs to complete before exiting 

207 if run_once and self._limiter: 

208 while self.limiter.borrowed_tokens > 0: 

209 self._logger.debug( 

210 "Waiting for %s active run(s) to finish before shutdown...", 

211 self.limiter.borrowed_tokens, 

212 ) 

213 await anyio.sleep(0.1) 

214 finally: 

215 stop_client_metrics_server() 

216 

217 if healthcheck_server and healthcheck_thread: 

218 self._logger.debug("Stopping healthcheck server...") 

219 healthcheck_server.should_exit = True 

220 healthcheck_thread.join() 

221 self._logger.debug("Healthcheck server stopped.") 

222 

223 printer(f"Worker {worker.name!r} stopped!") 

224 

225 async def run( 1a

226 self, 

227 flow_run: "FlowRun", 

228 configuration: ProcessJobConfiguration, 

229 task_status: Optional[anyio.abc.TaskStatus[int]] = None, 

230 ) -> ProcessWorkerResult: 

231 if task_status is None: 

232 task_status = anyio.TASK_STATUS_IGNORED 

233 

234 working_dir_ctx = ( 

235 tempfile.TemporaryDirectory(suffix="prefect") 

236 if not configuration.working_dir 

237 else contextlib.nullcontext(configuration.working_dir) 

238 ) 

239 with working_dir_ctx as working_dir: 

240 process = await self._runner.execute_flow_run( 

241 flow_run_id=flow_run.id, 

242 command=configuration.command, 

243 cwd=working_dir, 

244 env=configuration.env, 

245 stream_output=configuration.stream_output, 

246 task_status=task_status, 

247 ) 

248 

249 status_code = ( 

250 getattr(process, "returncode", None) 

251 if getattr(process, "returncode", None) is not None 

252 else getattr(process, "exitcode", None) 

253 ) 

254 

255 if process is None or status_code is None: 

256 raise RuntimeError("Failed to start flow run process.") 

257 

258 return ProcessWorkerResult(status_code=status_code, identifier=str(process.pid)) 

259 

260 async def _submit_adhoc_run( 1a

261 self, 

262 flow: "Flow[..., FR]", 

263 parameters: dict[str, Any] | None = None, 

264 job_variables: dict[str, Any] | None = None, 

265 task_status: anyio.abc.TaskStatus["FlowRun"] | None = None, 

266 ): 

267 from prefect._experimental.bundles import ( 

268 create_bundle_for_flow_run, 

269 ) 

270 

271 flow_run = await self.client.create_flow_run( 

272 flow, 

273 parameters=parameters, 

274 state=Pending(), 

275 job_variables=job_variables, 

276 work_pool_name=self.work_pool.name, 

277 ) 

278 if task_status is not None: 

279 # Emit the flow run object to .submit to allow it to return a future as soon as possible 

280 task_status.started(flow_run) 

281 

282 api_flow = APIFlow(id=flow_run.flow_id, name=flow.name, labels={}) 

283 logger = self.get_flow_run_logger(flow_run) 

284 

285 configuration = await self.job_configuration.from_template_and_values( 

286 base_job_template=self.work_pool.base_job_template, 

287 values=job_variables or {}, 

288 client=self._client, 

289 ) 

290 configuration.prepare_for_flow_run( 

291 flow_run=flow_run, 

292 flow=api_flow, 

293 work_pool=self.work_pool, 

294 worker_name=self.name, 

295 ) 

296 

297 bundle = create_bundle_for_flow_run(flow=flow, flow_run=flow_run) 

298 

299 logger.debug("Executing flow run bundle in subprocess...") 

300 try: 

301 await self._runner.execute_bundle( 

302 bundle=bundle, 

303 cwd=configuration.working_dir, 

304 env=configuration.env, 

305 ) 

306 except Exception: 

307 logger.exception("Error executing flow run bundle in subprocess") 

308 await self._propose_crashed_state(flow_run, "Flow run execution failed") 

309 finally: 

310 logger.debug("Flow run bundle execution complete") 

311 

312 async def __aenter__(self) -> ProcessWorker: 1a

313 await super().__aenter__() 

314 self._runner = await self._exit_stack.enter_async_context( 

315 Runner(pause_on_shutdown=False, limit=None) 

316 ) 

317 return self 

318 

319 async def __aexit__(self, *exc_info: Any) -> None: 1a

320 await super().__aexit__(*exc_info)