Coverage for /usr/local/lib/python3.12/site-packages/prefect/context.py: 35%

368 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Async and thread safe models for passing runtime context data. 

3 

4These contexts should never be directly mutated by the user. 

5 

6For more user-accessible information about the current run, see [`prefect.runtime`](https://docs.prefect.io/v3/api-ref/python/prefect-runtime-flow_run). 

7""" 

8 

9import asyncio 1a

10import json 1a

11import os 1a

12import sys 1a

13import warnings 1a

14from collections.abc import AsyncGenerator, Generator, Mapping 1a

15from contextlib import ExitStack, asynccontextmanager, contextmanager 1a

16from contextvars import ContextVar, Token 1a

17from typing import ( 1a

18 TYPE_CHECKING, 

19 Any, 

20 Callable, 

21 ClassVar, 

22 Optional, 

23 TypeVar, 

24 Union, 

25) 

26from uuid import UUID 1a

27 

28from pydantic import BaseModel, ConfigDict, Field, PrivateAttr 1a

29from typing_extensions import Self 1a

30 

31import prefect.settings 1a

32import prefect.types._datetime 1a

33from prefect._internal.compatibility.migration import getattr_migration 1a

34from prefect.assets import Asset 1a

35from prefect.client.orchestration import PrefectClient, SyncPrefectClient, get_client 1a

36from prefect.client.schemas import FlowRun, TaskRun 1a

37from prefect.client.schemas.objects import RunType 1a

38from prefect.events.worker import EventsWorker 1a

39from prefect.exceptions import MissingContextError 1a

40from prefect.results import ( 1a

41 ResultStore, 

42 get_default_persist_setting, 

43 get_default_persist_setting_for_tasks, 

44) 

45from prefect.settings import Profile, Settings 1a

46from prefect.settings.legacy import ( 1a

47 _get_settings_fields, # type: ignore[reportPrivateUsage] 

48) 

49from prefect.states import State 1a

50from prefect.task_runners import TaskRunner 1a

51from prefect.types import DateTime 1a

52from prefect.utilities.services import start_client_metrics_server 1a

53 

54T = TypeVar("T") 1a

55P = TypeVar("P") 1a

56R = TypeVar("R") 1a

57 

58if TYPE_CHECKING: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true1a

59 from prefect.flows import Flow 

60 from prefect.tasks import Task 

61 

62 

63def serialize_context( 1a

64 asset_ctx_kwargs: Union[dict[str, Any], None] = None, 

65) -> dict[str, Any]: 

66 """ 

67 Serialize the current context for use in a remote execution environment. 

68 

69 Optionally provide asset_ctx_kwargs to create new AssetContext, that will be used 

70 in the remote execution environment. This is useful for TaskRunners, who rely on creating the 

71 task run in the remote environment. 

72 """ 

73 flow_run_context = EngineContext.get() 

74 task_run_context = TaskRunContext.get() 

75 tags_context = TagsContext.get() 

76 settings_context = SettingsContext.get() 

77 

78 # Serialize deployment ContextVars for cross-process context propagation 

79 deployment_id = _deployment_id.get() 

80 deployment_params = _deployment_parameters.get() 

81 

82 return { 

83 "flow_run_context": flow_run_context.serialize() if flow_run_context else {}, 

84 "task_run_context": task_run_context.serialize() if task_run_context else {}, 

85 "tags_context": tags_context.serialize() if tags_context else {}, 

86 "settings_context": settings_context.serialize() if settings_context else {}, 

87 "asset_context": AssetContext.from_task_and_inputs( 

88 **asset_ctx_kwargs 

89 ).serialize() 

90 if asset_ctx_kwargs 

91 else {}, 

92 "deployment_id": str(deployment_id) if deployment_id else None, 

93 "deployment_parameters": deployment_params, 

94 } 

95 

96 

97@contextmanager 1a

98def hydrated_context( 1a

99 serialized_context: Optional[dict[str, Any]] = None, 

100 client: Union[PrefectClient, SyncPrefectClient, None] = None, 

101) -> Generator[None, Any, None]: 

102 # We need to rebuild the models because we might be hydrating in a remote 

103 # environment where the models are not available. 

104 # TODO: Remove this once we have fixed our circular imports and we don't need to rebuild models any more. 

105 from prefect._result_records import ResultRecordMetadata 

106 from prefect.flows import Flow 

107 from prefect.tasks import Task 

108 

109 _types: dict[str, Any] = dict( 

110 Flow=Flow, 

111 Task=Task, 

112 ResultRecordMetadata=ResultRecordMetadata, 

113 ) 

114 FlowRunContext.model_rebuild(_types_namespace=_types) 

115 TaskRunContext.model_rebuild(_types_namespace=_types) 

116 

117 with ExitStack() as stack: 

118 if serialized_context: 

119 # Set up settings context 

120 if settings_context := serialized_context.get("settings_context"): 

121 stack.enter_context(SettingsContext(**settings_context)) 

122 # Set up parent flow run context 

123 client = client or get_client(sync_client=True) 

124 if flow_run_context := serialized_context.get("flow_run_context"): 

125 flow = flow_run_context["flow"] 

126 task_runner = stack.enter_context(flow.task_runner.duplicate()) 

127 flow_run_context = FlowRunContext( 

128 **flow_run_context, 

129 client=client, 

130 task_runner=task_runner, 

131 detached=True, 

132 ) 

133 stack.enter_context(flow_run_context) 

134 # Set up parent task run context 

135 if parent_task_run_context := serialized_context.get("task_run_context"): 

136 task_run_context = TaskRunContext( 

137 **parent_task_run_context, 

138 client=client, 

139 ) 

140 stack.enter_context(task_run_context) 

141 # Set up tags context 

142 if tags_context := serialized_context.get("tags_context"): 

143 stack.enter_context(tags(*tags_context["current_tags"])) 

144 # Set up asset context 

145 if asset_context := serialized_context.get("asset_context"): 

146 stack.enter_context(AssetContext(**asset_context)) 

147 # Restore deployment ContextVars for cross-process context propagation 

148 if deployment_id_str := serialized_context.get("deployment_id"): 

149 from uuid import UUID 

150 

151 deployment_id_token = _deployment_id.set(UUID(deployment_id_str)) 

152 stack.callback(_deployment_id.reset, deployment_id_token) 

153 if deployment_params := serialized_context.get("deployment_parameters"): 

154 deployment_params_token = _deployment_parameters.set(deployment_params) 

155 stack.callback(_deployment_parameters.reset, deployment_params_token) 

156 yield 

157 

158 

159class ContextModel(BaseModel): 1a

160 """ 

161 A base model for context data that forbids mutation and extra data while providing 

162 a context manager 

163 """ 

164 

165 if TYPE_CHECKING: 165 ↛ 167line 165 didn't jump to line 167 because the condition on line 165 was never true1a

166 # subclasses can pass through keyword arguments to the pydantic base model 

167 def __init__(self, **kwargs: Any) -> None: ... 

168 

169 # The context variable for storing data must be defined by the child class 

170 __var__: ClassVar[ContextVar[Any]] 1a

171 _token: Optional[Token[Self]] = PrivateAttr(None) 1a

172 model_config: ClassVar[ConfigDict] = ConfigDict( 1a

173 arbitrary_types_allowed=True, 

174 extra="forbid", 

175 ) 

176 

177 def __enter__(self) -> Self: 1a

178 if self._token is not None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true1a

179 raise RuntimeError( 

180 "Context already entered. Context enter calls cannot be nested." 

181 ) 

182 self._token = self.__var__.set(self) 1a

183 return self 1a

184 

185 def __exit__(self, *_: Any) -> None: 1a

186 if not self._token: 

187 raise RuntimeError( 

188 "Asymmetric use of context. Context exit called without an enter." 

189 ) 

190 self.__var__.reset(self._token) 

191 self._token = None 

192 

193 @classmethod 1a

194 def get(cls: type[Self]) -> Optional[Self]: 1a

195 """Get the current context instance""" 

196 return cls.__var__.get(None) 1abcde

197 

198 def model_copy( 1a

199 self: Self, *, update: Optional[Mapping[str, Any]] = None, deep: bool = False 

200 ) -> Self: 

201 """ 

202 Duplicate the context model, optionally choosing which fields to include, exclude, or change. 

203 

204 Attributes: 

205 include: Fields to include in new model. 

206 exclude: Fields to exclude from new model, as with values this takes precedence over include. 

207 update: Values to change/add in the new model. Note: the data is not validated before creating 

208 the new model - you should trust this data. 

209 deep: Set to `True` to make a deep copy of the model. 

210 

211 Returns: 

212 A new model instance. 

213 """ 

214 new = super().model_copy(update=update, deep=deep) 

215 # Remove the token on copy to avoid re-entrance errors 

216 new._token = None 

217 return new 

218 

219 def serialize(self, include_secrets: bool = True) -> dict[str, Any]: 1a

220 """ 

221 Serialize the context model to a dictionary that can be pickled with cloudpickle. 

222 """ 

223 return self.model_dump( 

224 exclude_unset=True, context={"include_secrets": include_secrets} 

225 ) 

226 

227 

228class SyncClientContext(ContextModel): 1a

229 """ 

230 A context for managing the sync Prefect client instances. 

231 

232 Clients were formerly tracked on the TaskRunContext and FlowRunContext, but 

233 having two separate places and the addition of both sync and async clients 

234 made it difficult to manage. This context is intended to be the single 

235 source for sync clients. 

236 

237 The client creates a sync client, which can either be read directly from 

238 the context object OR loaded with get_client, inject_client, or other 

239 Prefect utilities. 

240 

241 with SyncClientContext.get_or_create() as ctx: 

242 c1 = get_client(sync_client=True) 

243 c2 = get_client(sync_client=True) 

244 assert c1 is c2 

245 assert c1 is ctx.client 

246 """ 

247 

248 __var__: ClassVar[ContextVar[Self]] = ContextVar("sync-client-context") 1a

249 client: SyncPrefectClient 1a

250 _httpx_settings: Optional[dict[str, Any]] = PrivateAttr(None) 1a

251 _context_stack: int = PrivateAttr(0) 1a

252 

253 def __init__(self, httpx_settings: Optional[dict[str, Any]] = None) -> None: 1a

254 super().__init__( 

255 client=get_client(sync_client=True, httpx_settings=httpx_settings), 

256 ) 

257 self._httpx_settings = httpx_settings 

258 self._context_stack = 0 

259 

260 def __enter__(self) -> Self: 1a

261 self._context_stack += 1 

262 if self._context_stack == 1: 

263 self.client.__enter__() 

264 self.client.raise_for_api_version_mismatch() 

265 return super().__enter__() 

266 else: 

267 return self 

268 

269 def __exit__(self, *exc_info: Any) -> None: 1a

270 self._context_stack -= 1 

271 if self._context_stack == 0: 

272 self.client.__exit__(*exc_info) 

273 return super().__exit__(*exc_info) 

274 

275 @classmethod 1a

276 @contextmanager 1a

277 def get_or_create(cls) -> Generator[Self, None, None]: 1a

278 ctx = cls.get() 

279 if ctx: 

280 yield ctx 

281 else: 

282 with cls() as ctx: 

283 yield ctx 

284 

285 

286class AsyncClientContext(ContextModel): 1a

287 """ 

288 A context for managing the async Prefect client instances. 

289 

290 Clients were formerly tracked on the TaskRunContext and FlowRunContext, but 

291 having two separate places and the addition of both sync and async clients 

292 made it difficult to manage. This context is intended to be the single 

293 source for async clients. 

294 

295 The client creates an async client, which can either be read directly from 

296 the context object OR loaded with get_client, inject_client, or other 

297 Prefect utilities. 

298 

299 with AsyncClientContext.get_or_create() as ctx: 

300 c1 = get_client(sync_client=False) 

301 c2 = get_client(sync_client=False) 

302 assert c1 is c2 

303 assert c1 is ctx.client 

304 """ 

305 

306 __var__: ClassVar[ContextVar[Self]] = ContextVar("async-client-context") 1a

307 client: PrefectClient 1a

308 _httpx_settings: Optional[dict[str, Any]] = PrivateAttr(None) 1a

309 _context_stack: int = PrivateAttr(0) 1a

310 

311 def __init__(self, httpx_settings: Optional[dict[str, Any]] = None): 1a

312 super().__init__( 

313 client=get_client(sync_client=False, httpx_settings=httpx_settings) 

314 ) 

315 self._httpx_settings = httpx_settings 

316 self._context_stack = 0 

317 

318 async def __aenter__(self: Self) -> Self: 1a

319 self._context_stack += 1 

320 if self._context_stack == 1: 

321 await self.client.__aenter__() 

322 await self.client.raise_for_api_version_mismatch() 

323 return super().__enter__() 

324 else: 

325 return self 

326 

327 async def __aexit__(self: Self, *exc_info: Any) -> None: 1a

328 self._context_stack -= 1 

329 if self._context_stack == 0: 

330 await self.client.__aexit__(*exc_info) 

331 return super().__exit__(*exc_info) 

332 

333 @classmethod 1a

334 @asynccontextmanager 1a

335 async def get_or_create(cls) -> AsyncGenerator[Self, None]: 1a

336 ctx = cls.get() 

337 if ctx and asyncio.get_running_loop() is ctx.client.loop: 

338 yield ctx 

339 else: 

340 async with cls() as ctx: 

341 yield ctx 

342 

343 

344class RunContext(ContextModel): 1a

345 """ 

346 The base context for a flow or task run. Data in this context will always be 

347 available when `get_run_context` is called. 

348 

349 Attributes: 

350 start_time: The time the run context was entered 

351 client: The Prefect client instance being used for API communication 

352 """ 

353 

354 def __init__(self, *args: Any, **kwargs: Any) -> None: 1a

355 super().__init__(*args, **kwargs) 

356 

357 start_client_metrics_server() 

358 

359 start_time: DateTime = Field( 1a

360 default_factory=lambda: prefect.types._datetime.now("UTC") 

361 ) 

362 input_keyset: Optional[dict[str, dict[str, str]]] = None 1a

363 client: Union[PrefectClient, SyncPrefectClient] 1a

364 

365 def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: 1a

366 return self.model_dump( 

367 include={"start_time", "input_keyset"}, 

368 exclude_unset=True, 

369 context={"include_secrets": include_secrets}, 

370 ) 

371 

372 

373class EngineContext(RunContext): 1a

374 """ 

375 The context for a flow run. Data in this context is only available from within a 

376 flow run function. 

377 

378 Attributes: 

379 flow: The flow instance associated with the run 

380 flow_run: The API metadata for the flow run 

381 task_runner: The task runner instance being used for the flow run 

382 run_results: A mapping of result ids to run states for this flow run 

383 log_prints: Whether to log print statements from the flow run 

384 parameters: The parameters passed to the flow run 

385 detached: Flag indicating if context has been serialized and sent to remote infrastructure 

386 result_store: The result store used to persist results 

387 persist_result: Whether to persist the flow run result 

388 task_run_dynamic_keys: Counter for task calls allowing unique keys 

389 observed_flow_pauses: Counter for flow pauses 

390 events: Events worker to emit events 

391 """ 

392 

393 flow: Optional["Flow[Any, Any]"] = None 1a

394 flow_run: Optional[FlowRun] = None 1a

395 task_runner: TaskRunner[Any] 1a

396 log_prints: bool = False 1a

397 parameters: Optional[dict[str, Any]] = None 1a

398 

399 # Flag signaling if the flow run context has been serialized and sent 

400 # to remote infrastructure. 

401 detached: bool = False 1a

402 

403 # Result handling 

404 result_store: ResultStore 1a

405 persist_result: bool = Field(default_factory=get_default_persist_setting) 1a

406 

407 # Counter for task calls allowing unique 

408 task_run_dynamic_keys: dict[str, Union[str, int]] = Field(default_factory=dict) 1a

409 

410 # Counter for flow pauses 

411 observed_flow_pauses: dict[str, int] = Field(default_factory=dict) 1a

412 

413 # Tracking for result from task runs and sub flows in this flow run for 

414 # dependency tracking. Holds the ID of the object returned by 

415 # the run and state 

416 run_results: dict[int, tuple[State, RunType]] = Field(default_factory=dict) 1a

417 

418 # Tracking information needed to track asset linage between 

419 # tasks and materialization 

420 task_run_assets: dict[UUID, set[Asset]] = Field(default_factory=dict) 1a

421 

422 # Events worker to emit events 

423 events: Optional[EventsWorker] = None 1a

424 

425 __var__: ClassVar[ContextVar[Self]] = ContextVar("flow_run") 1a

426 

427 def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: 1a

428 serialized = self.model_dump( 

429 include={ 

430 "flow_run", 

431 "flow", 

432 "parameters", 

433 "log_prints", 

434 "start_time", 

435 "input_keyset", 

436 "persist_result", 

437 }, 

438 exclude_unset=True, 

439 context={"include_secrets": include_secrets}, 

440 ) 

441 if self.result_store: 

442 serialized["result_store"] = self.result_store.model_dump( 

443 serialize_as_any=True, 

444 exclude_unset=True, 

445 context={"include_secrets": include_secrets}, 

446 ) 

447 return serialized 

448 

449 

450FlowRunContext = EngineContext # for backwards compatibility 1a

451 

452 

453class TaskRunContext(RunContext): 1a

454 """ 

455 The context for a task run. Data in this context is only available from within a 

456 task run function. 

457 

458 Attributes: 

459 task: The task instance associated with the task run 

460 task_run: The API metadata for this task run 

461 """ 

462 

463 task: "Task[Any, Any]" 1a

464 task_run: TaskRun 1a

465 log_prints: bool = False 1a

466 parameters: dict[str, Any] 1a

467 

468 # Result handling 

469 result_store: ResultStore 1a

470 persist_result: bool = Field(default_factory=get_default_persist_setting_for_tasks) 1a

471 

472 __var__: ClassVar[ContextVar[Self]] = ContextVar("task_run") 1a

473 

474 def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: 1a

475 serialized = self.model_dump( 

476 include={ 

477 "task_run", 

478 "task", 

479 "parameters", 

480 "log_prints", 

481 "start_time", 

482 "input_keyset", 

483 "persist_result", 

484 }, 

485 exclude_unset=True, 

486 context={"include_secrets": include_secrets}, 

487 ) 

488 if self.result_store: 

489 serialized["result_store"] = self.result_store.model_dump( 

490 serialize_as_any=True, 

491 exclude_unset=True, 

492 context={"include_secrets": include_secrets}, 

493 ) 

494 return serialized 

495 

496 

497class AssetContext(ContextModel): 1a

498 """ 

499 The asset context for a materializing task run. Contains all asset-related information needed 

500 for asset event emission and downstream asset dependency propagation. 

501 

502 Attributes: 

503 direct_asset_dependencies: Assets that this task directly depends on (from task.asset_deps) 

504 downstream_assets: Assets that this task will create/materialize (from MaterializingTask.assets) 

505 upstream_assets: Assets from upstream task dependencies 

506 materialized_by: Tool that materialized the assets (from MaterializingTask.materialized_by) 

507 task_run_id: ID of the associated task run 

508 materialization_metadata: Metadata for materialized assets 

509 """ 

510 

511 direct_asset_dependencies: set[Asset] = Field(default_factory=set) 1a

512 downstream_assets: set[Asset] = Field(default_factory=set) 1a

513 upstream_assets: set[Asset] = Field(default_factory=set) 1a

514 materialized_by: Optional[str] = None 1a

515 task_run_id: Optional[UUID] = None 1a

516 materialization_metadata: dict[str, dict[str, Any]] = Field(default_factory=dict) 1a

517 copy_to_child_ctx: bool = False 1a

518 

519 __var__: ClassVar[ContextVar[Self]] = ContextVar("asset_context") 1a

520 

521 @classmethod 1a

522 def from_task_and_inputs( 1a

523 cls, 

524 task: "Task[Any, Any]", 

525 task_run_id: UUID, 

526 task_inputs: Optional[dict[str, set[Any]]] = None, 

527 copy_to_child_ctx: bool = False, 

528 ) -> "AssetContext": 

529 """ 

530 Create an AssetContext from a task and its resolved inputs. 

531 

532 Args: 

533 task: The task instance 

534 task_run_id: The task run ID 

535 task_inputs: The resolved task inputs (TaskRunResult objects) 

536 copy_to_child_ctx: Whether this context should be copied on a child AssetContext 

537 

538 Returns: 

539 Configured AssetContext 

540 """ 

541 from prefect.client.schemas import TaskRunResult 

542 from prefect.tasks import MaterializingTask 

543 

544 upstream_assets: set[Asset] = set() 

545 

546 flow_ctx = FlowRunContext.get() 

547 if task_inputs and flow_ctx: 

548 for name, inputs in task_inputs.items(): 

549 # Parent task runs are not dependencies 

550 # that we want to track 

551 if name == "__parents__": 

552 continue 

553 

554 for task_input in inputs: 

555 if isinstance(task_input, TaskRunResult): 

556 task_assets = flow_ctx.task_run_assets.get(task_input.id) 

557 if task_assets: 

558 upstream_assets.update(task_assets) 

559 

560 ctx = cls( 

561 direct_asset_dependencies=set(task.asset_deps) 

562 if task.asset_deps 

563 else set(), 

564 downstream_assets=set(task.assets) 

565 if isinstance(task, MaterializingTask) and task.assets 

566 else set(), 

567 upstream_assets=upstream_assets, 

568 materialized_by=task.materialized_by 

569 if isinstance(task, MaterializingTask) 

570 else None, 

571 task_run_id=task_run_id, 

572 copy_to_child_ctx=copy_to_child_ctx, 

573 ) 

574 ctx.update_tracked_assets() 

575 

576 return ctx 

577 

578 def add_asset_metadata(self, asset_key: str, metadata: dict[str, Any]) -> None: 1a

579 """ 

580 Add metadata for a materialized asset. 

581 

582 Args: 

583 asset_key: The asset key 

584 metadata: Metadata dictionary to add 

585 

586 Raises: 

587 ValueError: If asset_key is not in downstream_assets 

588 """ 

589 downstream_keys = {asset.key for asset in self.downstream_assets} 

590 if asset_key not in downstream_keys: 

591 raise ValueError( 

592 "Can only add metadata to assets that are arguments to @materialize" 

593 ) 

594 

595 existing = self.materialization_metadata.get(asset_key, {}) 

596 self.materialization_metadata[asset_key] = existing | metadata 

597 

598 @staticmethod 1a

599 def asset_as_resource(asset: Asset) -> dict[str, str]: 1a

600 """Convert Asset to event resource format.""" 

601 resource = {"prefect.resource.id": asset.key} 

602 

603 if asset.properties: 

604 properties_dict = asset.properties.model_dump(exclude_unset=True) 

605 

606 if "name" in properties_dict: 

607 resource["prefect.resource.name"] = properties_dict["name"] 

608 

609 if "description" in properties_dict: 

610 resource["prefect.asset.description"] = properties_dict["description"] 

611 

612 if "url" in properties_dict: 

613 resource["prefect.asset.url"] = properties_dict["url"] 

614 

615 if "owners" in properties_dict: 

616 resource["prefect.asset.owners"] = json.dumps(properties_dict["owners"]) 

617 

618 return resource 

619 

620 @staticmethod 1a

621 def asset_as_related(asset: Asset) -> dict[str, str]: 1a

622 """Convert Asset to event related format.""" 

623 return { 

624 "prefect.resource.id": asset.key, 

625 "prefect.resource.role": "asset", 

626 } 

627 

628 @staticmethod 1a

629 def related_materialized_by(by: str) -> dict[str, str]: 1a

630 """Create a related resource for the tool that performed the materialization""" 

631 return { 

632 "prefect.resource.id": by, 

633 "prefect.resource.role": "asset-materialized-by", 

634 } 

635 

636 def emit_events(self, state: State) -> None: 1a

637 """ 

638 Emit asset events 

639 """ 

640 

641 from prefect.events import emit_event 

642 

643 if state.name == "Cached": 

644 return 

645 elif state.is_failed(): 

646 event_status = "failed" 

647 elif state.is_completed(): 

648 event_status = "succeeded" 

649 else: 

650 return 

651 

652 # If we have no downstream assets, this not a materialization 

653 if not self.downstream_assets: 

654 return 

655 

656 # Emit reference events for all upstream assets (direct + inherited) 

657 all_upstream_assets = self.upstream_assets | self.direct_asset_dependencies 

658 for asset in all_upstream_assets: 

659 emit_event( 

660 event="prefect.asset.referenced", 

661 resource=self.asset_as_resource(asset), 

662 related=[], 

663 ) 

664 

665 # Emit materialization events for downstream assets 

666 upstream_related = [self.asset_as_related(a) for a in all_upstream_assets] 

667 

668 if self.materialized_by: 

669 upstream_related.append(self.related_materialized_by(self.materialized_by)) 

670 

671 for asset in self.downstream_assets: 

672 emit_event( 

673 event=f"prefect.asset.materialization.{event_status}", 

674 resource=self.asset_as_resource(asset), 

675 related=upstream_related, 

676 payload=self.materialization_metadata.get(asset.key), 

677 ) 

678 

679 def update_tracked_assets(self) -> None: 1a

680 """ 

681 Update the flow run context with assets that should be propagated downstream. 

682 """ 

683 if not (flow_run_context := FlowRunContext.get()): 

684 return 

685 

686 if not self.task_run_id: 

687 return 

688 

689 if self.downstream_assets: 

690 # MaterializingTask: propagate the downstream assets (what we create) 

691 assets_for_downstream = set(self.downstream_assets) 

692 else: 

693 # Regular task: propagate upstream assets + direct dependencies 

694 assets_for_downstream = set( 

695 self.upstream_assets | self.direct_asset_dependencies 

696 ) 

697 

698 flow_run_context.task_run_assets[self.task_run_id] = assets_for_downstream 

699 

700 def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: 1a

701 """Serialize the AssetContext for distributed execution.""" 

702 return self.model_dump( 

703 # use json serialization so fields that are 

704 # sets of pydantic models are serialized 

705 mode="json", 

706 exclude_unset=True, 

707 serialize_as_any=True, 

708 context={"include_secrets": include_secrets}, 

709 ) 

710 

711 

712class TagsContext(ContextModel): 1a

713 """ 

714 The context for `prefect.tags` management. 

715 

716 Attributes: 

717 current_tags: A set of current tags in the context 

718 """ 

719 

720 current_tags: set[str] = Field(default_factory=set) 1a

721 

722 @classmethod 1a

723 def get(cls) -> Self: 1a

724 # Return an empty `TagsContext` instead of `None` if no context exists 

725 return cls.__var__.get(cls()) 

726 

727 __var__: ClassVar[ContextVar[Self]] = ContextVar("tags") 1a

728 

729 

730class SettingsContext(ContextModel): 1a

731 """ 

732 The context for a Prefect settings. 

733 

734 This allows for safe concurrent access and modification of settings. 

735 

736 Attributes: 

737 profile: The profile that is in use. 

738 settings: The complete settings model. 

739 """ 

740 

741 profile: Profile 1a

742 settings: Settings 1a

743 

744 __var__: ClassVar[ContextVar[Self]] = ContextVar("settings") 1a

745 

746 def __hash__(self: Self) -> int: 1a

747 return hash(self.settings) 

748 

749 @classmethod 1a

750 def get(cls) -> Optional["SettingsContext"]: 1a

751 # Return the global context instead of `None` if no context exists 

752 try: 1abcde

753 return super().get() or GLOBAL_SETTINGS_CONTEXT 1abcde

754 except NameError: 1a

755 # GLOBAL_SETTINGS_CONTEXT has not yet been set; in order to create 

756 # it profiles need to be loaded, and that process calls 

757 # SettingsContext.get(). 

758 return None 1a

759 

760 

761# Root deployment context vars for O(1) access in nested flows 

762_deployment_id: ContextVar[UUID | None] = ContextVar("deployment_id", default=None) 1a

763_deployment_parameters: ContextVar[dict[str, Any] | None] = ContextVar( 1a

764 "deployment_parameters", default=None 

765) 

766 

767 

768def get_run_context() -> Union[FlowRunContext, TaskRunContext]: 1a

769 """ 

770 Get the current run context from within a task or flow function. 

771 

772 Returns: 

773 A `FlowRunContext` or `TaskRunContext` depending on the function type. 

774 

775 Raises 

776 RuntimeError: If called outside of a flow or task run. 

777 """ 

778 task_run_ctx = TaskRunContext.get() 

779 flow_run_ctx = FlowRunContext.get() 

780 

781 # When both contexts exist, determine which represents the currently executing code. 

782 # If the flow_run_id from the flow context differs from the task's flow_run_id, 

783 # we're in a subflow running inside a task, so prefer the flow context. 

784 # Otherwise, we're in a regular task within the flow, so prefer the task context. 

785 if task_run_ctx and flow_run_ctx: 

786 if ( 

787 flow_run_ctx.flow_run 

788 and flow_run_ctx.flow_run.id != task_run_ctx.task_run.flow_run_id 

789 ): 

790 return flow_run_ctx 

791 return task_run_ctx 

792 

793 if task_run_ctx: 

794 return task_run_ctx 

795 

796 if flow_run_ctx: 

797 return flow_run_ctx 

798 

799 raise MissingContextError( 

800 "No run context available. You are not in a flow or task run context." 

801 ) 

802 

803 

804def get_settings_context() -> SettingsContext: 1a

805 """ 

806 Get the current settings context which contains profile information and the 

807 settings that are being used. 

808 

809 Generally, the settings that are being used are a combination of values from the 

810 profile and environment. See `prefect.context.use_profile` for more details. 

811 """ 

812 settings_ctx = SettingsContext.get() 1a

813 

814 if not settings_ctx: 814 ↛ 815line 814 didn't jump to line 815 because the condition on line 814 was never true1a

815 raise MissingContextError("No settings context found.") 

816 

817 return settings_ctx 1a

818 

819 

820@contextmanager 1a

821def tags(*new_tags: str) -> Generator[set[str], None, None]: 1a

822 """ 

823 Context manager to add tags to flow and task run calls. 

824 

825 Tags are always combined with any existing tags. 

826 

827 Yields: 

828 The current set of tags 

829 

830 Examples: 

831 ```python 

832 from prefect import tags, task, flow 

833 @task 

834 def my_task(): 

835 pass 

836 ``` 

837 

838 Run a task with tags 

839 

840 ```python 

841 @flow 

842 def my_flow(): 

843 with tags("a", "b"): 

844 my_task() # has tags: a, b 

845 ``` 

846 

847 Run a flow with tags 

848 

849 ```python 

850 @flow 

851 def my_flow(): 

852 pass 

853 with tags("a", "b"): 

854 my_flow() # has tags: a, b 

855 ``` 

856 

857 Run a task with nested tag contexts 

858 

859 ```python 

860 @flow 

861 def my_flow(): 

862 with tags("a", "b"): 

863 with tags("c", "d"): 

864 my_task() # has tags: a, b, c, d 

865 my_task() # has tags: a, b 

866 ``` 

867 

868 Inspect the current tags 

869 

870 ```python 

871 @flow 

872 def my_flow(): 

873 with tags("c", "d"): 

874 with tags("e", "f") as current_tags: 

875 print(current_tags) 

876 with tags("a", "b"): 

877 my_flow() 

878 # {"a", "b", "c", "d", "e", "f"} 

879 ``` 

880 """ 

881 current_tags = TagsContext.get().current_tags 

882 _new_tags = current_tags.union(new_tags) 

883 with TagsContext(current_tags=_new_tags): 

884 yield _new_tags 

885 

886 

887@contextmanager 1a

888def use_profile( 1a

889 profile: Union[Profile, str], 

890 override_environment_variables: bool = False, 

891 include_current_context: bool = True, 

892) -> Generator[SettingsContext, Any, None]: 

893 """ 

894 Switch to a profile for the duration of this context. 

895 

896 Profile contexts are confined to an async context in a single thread. 

897 

898 Args: 

899 profile: The name of the profile to load or an instance of a Profile. 

900 override_environment_variable: If set, variables in the profile will take 

901 precedence over current environment variables. By default, environment 

902 variables will override profile settings. 

903 include_current_context: If set, the new settings will be constructed 

904 with the current settings context as a base. If not set, the use_base settings 

905 will be loaded from the environment and defaults. 

906 

907 Yields: 

908 The created `SettingsContext` object 

909 """ 

910 if isinstance(profile, str): 

911 profiles = prefect.settings.load_profiles() 

912 profile = profiles[profile] 

913 

914 if not TYPE_CHECKING: 

915 if not isinstance(profile, Profile): 

916 raise TypeError( 

917 f"Unexpected type {type(profile).__name__!r} for `profile`. " 

918 "Expected 'str' or 'Profile'." 

919 ) 

920 

921 # Create a copy of the profiles settings as we will mutate it 

922 profile_settings = profile.settings.copy() 

923 existing_context = SettingsContext.get() 

924 if existing_context and include_current_context: 

925 settings = existing_context.settings 

926 else: 

927 settings = Settings() 

928 

929 if not override_environment_variables: 

930 for key in os.environ: 

931 if key in _get_settings_fields(Settings): 

932 profile_settings.pop(_get_settings_fields(Settings)[key], None) 

933 

934 new_settings = settings.copy_with_update(updates=profile_settings) 

935 

936 with SettingsContext(profile=profile, settings=new_settings) as ctx: 

937 yield ctx 

938 

939 

940def root_settings_context() -> SettingsContext: 1a

941 """ 

942 Return the settings context that will exist as the root context for the module. 

943 

944 The profile to use is determined with the following precedence 

945 - Command line via 'prefect --profile <name>' 

946 - Environment variable via 'PREFECT_PROFILE' 

947 - Profiles file via the 'active' key 

948 """ 

949 profiles = prefect.settings.load_profiles() 1a

950 active_name = profiles.active_name 1a

951 profile_source = "in the profiles file" 1a

952 

953 if "PREFECT_PROFILE" in os.environ: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true1a

954 active_name = os.environ["PREFECT_PROFILE"] 

955 profile_source = "by environment variable" 

956 

957 if ( 957 ↛ 962line 957 didn't jump to line 962 because the condition on line 957 was never true

958 sys.argv[0].endswith("/prefect") 

959 and len(sys.argv) >= 3 

960 and sys.argv[1] == "--profile" 

961 ): 

962 active_name = sys.argv[2] 

963 profile_source = "by command line argument" 

964 

965 if active_name not in profiles.names: 965 ↛ 966line 965 didn't jump to line 966 because the condition on line 965 was never true1a

966 print( 

967 ( 

968 f"WARNING: Active profile {active_name!r} set {profile_source} not " 

969 "found. The default profile will be used instead. " 

970 ), 

971 file=sys.stderr, 

972 ) 

973 active_name = "ephemeral" 

974 

975 if not (settings := Settings()).home.exists(): 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true1a

976 try: 

977 settings.home.mkdir(mode=0o0700, exist_ok=True) 

978 except OSError: 

979 warnings.warn( 

980 (f"Failed to create the Prefect home directory at {settings.home}"), 

981 stacklevel=2, 

982 ) 

983 

984 return SettingsContext(profile=profiles[active_name], settings=settings) 1a

985 

986 # Note the above context is exited and the global settings context is used by 

987 # an override in the `SettingsContext.get` method. 

988 

989 

990GLOBAL_SETTINGS_CONTEXT: SettingsContext = root_settings_context() 1a

991 

992 

993# 2024-07-02: This surfaces an actionable error message for removed objects 

994# in Prefect 3.0 upgrade. 

995__getattr__: Callable[[str], Any] = getattr_migration(__name__) 1a