Coverage for /usr/local/lib/python3.12/site-packages/prefect/context.py: 35%
368 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Async and thread safe models for passing runtime context data.
4These contexts should never be directly mutated by the user.
6For more user-accessible information about the current run, see [`prefect.runtime`](https://docs.prefect.io/v3/api-ref/python/prefect-runtime-flow_run).
7"""
9import asyncio 1a
10import json 1a
11import os 1a
12import sys 1a
13import warnings 1a
14from collections.abc import AsyncGenerator, Generator, Mapping 1a
15from contextlib import ExitStack, asynccontextmanager, contextmanager 1a
16from contextvars import ContextVar, Token 1a
17from typing import ( 1a
18 TYPE_CHECKING,
19 Any,
20 Callable,
21 ClassVar,
22 Optional,
23 TypeVar,
24 Union,
25)
26from uuid import UUID 1a
28from pydantic import BaseModel, ConfigDict, Field, PrivateAttr 1a
29from typing_extensions import Self 1a
31import prefect.settings 1a
32import prefect.types._datetime 1a
33from prefect._internal.compatibility.migration import getattr_migration 1a
34from prefect.assets import Asset 1a
35from prefect.client.orchestration import PrefectClient, SyncPrefectClient, get_client 1a
36from prefect.client.schemas import FlowRun, TaskRun 1a
37from prefect.client.schemas.objects import RunType 1a
38from prefect.events.worker import EventsWorker 1a
39from prefect.exceptions import MissingContextError 1a
40from prefect.results import ( 1a
41 ResultStore,
42 get_default_persist_setting,
43 get_default_persist_setting_for_tasks,
44)
45from prefect.settings import Profile, Settings 1a
46from prefect.settings.legacy import ( 1a
47 _get_settings_fields, # type: ignore[reportPrivateUsage]
48)
49from prefect.states import State 1a
50from prefect.task_runners import TaskRunner 1a
51from prefect.types import DateTime 1a
52from prefect.utilities.services import start_client_metrics_server 1a
54T = TypeVar("T") 1a
55P = TypeVar("P") 1a
56R = TypeVar("R") 1a
58if TYPE_CHECKING: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true1a
59 from prefect.flows import Flow
60 from prefect.tasks import Task
63def serialize_context( 1a
64 asset_ctx_kwargs: Union[dict[str, Any], None] = None,
65) -> dict[str, Any]:
66 """
67 Serialize the current context for use in a remote execution environment.
69 Optionally provide asset_ctx_kwargs to create new AssetContext, that will be used
70 in the remote execution environment. This is useful for TaskRunners, who rely on creating the
71 task run in the remote environment.
72 """
73 flow_run_context = EngineContext.get()
74 task_run_context = TaskRunContext.get()
75 tags_context = TagsContext.get()
76 settings_context = SettingsContext.get()
78 # Serialize deployment ContextVars for cross-process context propagation
79 deployment_id = _deployment_id.get()
80 deployment_params = _deployment_parameters.get()
82 return {
83 "flow_run_context": flow_run_context.serialize() if flow_run_context else {},
84 "task_run_context": task_run_context.serialize() if task_run_context else {},
85 "tags_context": tags_context.serialize() if tags_context else {},
86 "settings_context": settings_context.serialize() if settings_context else {},
87 "asset_context": AssetContext.from_task_and_inputs(
88 **asset_ctx_kwargs
89 ).serialize()
90 if asset_ctx_kwargs
91 else {},
92 "deployment_id": str(deployment_id) if deployment_id else None,
93 "deployment_parameters": deployment_params,
94 }
97@contextmanager 1a
98def hydrated_context( 1a
99 serialized_context: Optional[dict[str, Any]] = None,
100 client: Union[PrefectClient, SyncPrefectClient, None] = None,
101) -> Generator[None, Any, None]:
102 # We need to rebuild the models because we might be hydrating in a remote
103 # environment where the models are not available.
104 # TODO: Remove this once we have fixed our circular imports and we don't need to rebuild models any more.
105 from prefect._result_records import ResultRecordMetadata
106 from prefect.flows import Flow
107 from prefect.tasks import Task
109 _types: dict[str, Any] = dict(
110 Flow=Flow,
111 Task=Task,
112 ResultRecordMetadata=ResultRecordMetadata,
113 )
114 FlowRunContext.model_rebuild(_types_namespace=_types)
115 TaskRunContext.model_rebuild(_types_namespace=_types)
117 with ExitStack() as stack:
118 if serialized_context:
119 # Set up settings context
120 if settings_context := serialized_context.get("settings_context"):
121 stack.enter_context(SettingsContext(**settings_context))
122 # Set up parent flow run context
123 client = client or get_client(sync_client=True)
124 if flow_run_context := serialized_context.get("flow_run_context"):
125 flow = flow_run_context["flow"]
126 task_runner = stack.enter_context(flow.task_runner.duplicate())
127 flow_run_context = FlowRunContext(
128 **flow_run_context,
129 client=client,
130 task_runner=task_runner,
131 detached=True,
132 )
133 stack.enter_context(flow_run_context)
134 # Set up parent task run context
135 if parent_task_run_context := serialized_context.get("task_run_context"):
136 task_run_context = TaskRunContext(
137 **parent_task_run_context,
138 client=client,
139 )
140 stack.enter_context(task_run_context)
141 # Set up tags context
142 if tags_context := serialized_context.get("tags_context"):
143 stack.enter_context(tags(*tags_context["current_tags"]))
144 # Set up asset context
145 if asset_context := serialized_context.get("asset_context"):
146 stack.enter_context(AssetContext(**asset_context))
147 # Restore deployment ContextVars for cross-process context propagation
148 if deployment_id_str := serialized_context.get("deployment_id"):
149 from uuid import UUID
151 deployment_id_token = _deployment_id.set(UUID(deployment_id_str))
152 stack.callback(_deployment_id.reset, deployment_id_token)
153 if deployment_params := serialized_context.get("deployment_parameters"):
154 deployment_params_token = _deployment_parameters.set(deployment_params)
155 stack.callback(_deployment_parameters.reset, deployment_params_token)
156 yield
159class ContextModel(BaseModel): 1a
160 """
161 A base model for context data that forbids mutation and extra data while providing
162 a context manager
163 """
165 if TYPE_CHECKING: 165 ↛ 167line 165 didn't jump to line 167 because the condition on line 165 was never true1a
166 # subclasses can pass through keyword arguments to the pydantic base model
167 def __init__(self, **kwargs: Any) -> None: ...
169 # The context variable for storing data must be defined by the child class
170 __var__: ClassVar[ContextVar[Any]] 1a
171 _token: Optional[Token[Self]] = PrivateAttr(None) 1a
172 model_config: ClassVar[ConfigDict] = ConfigDict( 1a
173 arbitrary_types_allowed=True,
174 extra="forbid",
175 )
177 def __enter__(self) -> Self: 1a
178 if self._token is not None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true1a
179 raise RuntimeError(
180 "Context already entered. Context enter calls cannot be nested."
181 )
182 self._token = self.__var__.set(self) 1a
183 return self 1a
185 def __exit__(self, *_: Any) -> None: 1a
186 if not self._token:
187 raise RuntimeError(
188 "Asymmetric use of context. Context exit called without an enter."
189 )
190 self.__var__.reset(self._token)
191 self._token = None
193 @classmethod 1a
194 def get(cls: type[Self]) -> Optional[Self]: 1a
195 """Get the current context instance"""
196 return cls.__var__.get(None) 1abcdefghijklm
198 def model_copy( 1a
199 self: Self, *, update: Optional[Mapping[str, Any]] = None, deep: bool = False
200 ) -> Self:
201 """
202 Duplicate the context model, optionally choosing which fields to include, exclude, or change.
204 Attributes:
205 include: Fields to include in new model.
206 exclude: Fields to exclude from new model, as with values this takes precedence over include.
207 update: Values to change/add in the new model. Note: the data is not validated before creating
208 the new model - you should trust this data.
209 deep: Set to `True` to make a deep copy of the model.
211 Returns:
212 A new model instance.
213 """
214 new = super().model_copy(update=update, deep=deep)
215 # Remove the token on copy to avoid re-entrance errors
216 new._token = None
217 return new
219 def serialize(self, include_secrets: bool = True) -> dict[str, Any]: 1a
220 """
221 Serialize the context model to a dictionary that can be pickled with cloudpickle.
222 """
223 return self.model_dump(
224 exclude_unset=True, context={"include_secrets": include_secrets}
225 )
228class SyncClientContext(ContextModel): 1a
229 """
230 A context for managing the sync Prefect client instances.
232 Clients were formerly tracked on the TaskRunContext and FlowRunContext, but
233 having two separate places and the addition of both sync and async clients
234 made it difficult to manage. This context is intended to be the single
235 source for sync clients.
237 The client creates a sync client, which can either be read directly from
238 the context object OR loaded with get_client, inject_client, or other
239 Prefect utilities.
241 with SyncClientContext.get_or_create() as ctx:
242 c1 = get_client(sync_client=True)
243 c2 = get_client(sync_client=True)
244 assert c1 is c2
245 assert c1 is ctx.client
246 """
248 __var__: ClassVar[ContextVar[Self]] = ContextVar("sync-client-context") 1a
249 client: SyncPrefectClient 1a
250 _httpx_settings: Optional[dict[str, Any]] = PrivateAttr(None) 1a
251 _context_stack: int = PrivateAttr(0) 1a
253 def __init__(self, httpx_settings: Optional[dict[str, Any]] = None) -> None: 1a
254 super().__init__(
255 client=get_client(sync_client=True, httpx_settings=httpx_settings),
256 )
257 self._httpx_settings = httpx_settings
258 self._context_stack = 0
260 def __enter__(self) -> Self: 1a
261 self._context_stack += 1
262 if self._context_stack == 1:
263 self.client.__enter__()
264 self.client.raise_for_api_version_mismatch()
265 return super().__enter__()
266 else:
267 return self
269 def __exit__(self, *exc_info: Any) -> None: 1a
270 self._context_stack -= 1
271 if self._context_stack == 0:
272 self.client.__exit__(*exc_info)
273 return super().__exit__(*exc_info)
275 @classmethod 1a
276 @contextmanager 1a
277 def get_or_create(cls) -> Generator[Self, None, None]: 1a
278 ctx = cls.get()
279 if ctx:
280 yield ctx
281 else:
282 with cls() as ctx:
283 yield ctx
286class AsyncClientContext(ContextModel): 1a
287 """
288 A context for managing the async Prefect client instances.
290 Clients were formerly tracked on the TaskRunContext and FlowRunContext, but
291 having two separate places and the addition of both sync and async clients
292 made it difficult to manage. This context is intended to be the single
293 source for async clients.
295 The client creates an async client, which can either be read directly from
296 the context object OR loaded with get_client, inject_client, or other
297 Prefect utilities.
299 with AsyncClientContext.get_or_create() as ctx:
300 c1 = get_client(sync_client=False)
301 c2 = get_client(sync_client=False)
302 assert c1 is c2
303 assert c1 is ctx.client
304 """
306 __var__: ClassVar[ContextVar[Self]] = ContextVar("async-client-context") 1a
307 client: PrefectClient 1a
308 _httpx_settings: Optional[dict[str, Any]] = PrivateAttr(None) 1a
309 _context_stack: int = PrivateAttr(0) 1a
311 def __init__(self, httpx_settings: Optional[dict[str, Any]] = None): 1a
312 super().__init__(
313 client=get_client(sync_client=False, httpx_settings=httpx_settings)
314 )
315 self._httpx_settings = httpx_settings
316 self._context_stack = 0
318 async def __aenter__(self: Self) -> Self: 1a
319 self._context_stack += 1
320 if self._context_stack == 1:
321 await self.client.__aenter__()
322 await self.client.raise_for_api_version_mismatch()
323 return super().__enter__()
324 else:
325 return self
327 async def __aexit__(self: Self, *exc_info: Any) -> None: 1a
328 self._context_stack -= 1
329 if self._context_stack == 0:
330 await self.client.__aexit__(*exc_info)
331 return super().__exit__(*exc_info)
333 @classmethod 1a
334 @asynccontextmanager 1a
335 async def get_or_create(cls) -> AsyncGenerator[Self, None]: 1a
336 ctx = cls.get()
337 if ctx and asyncio.get_running_loop() is ctx.client.loop:
338 yield ctx
339 else:
340 async with cls() as ctx:
341 yield ctx
344class RunContext(ContextModel): 1a
345 """
346 The base context for a flow or task run. Data in this context will always be
347 available when `get_run_context` is called.
349 Attributes:
350 start_time: The time the run context was entered
351 client: The Prefect client instance being used for API communication
352 """
354 def __init__(self, *args: Any, **kwargs: Any) -> None: 1a
355 super().__init__(*args, **kwargs)
357 start_client_metrics_server()
359 start_time: DateTime = Field( 1a
360 default_factory=lambda: prefect.types._datetime.now("UTC")
361 )
362 input_keyset: Optional[dict[str, dict[str, str]]] = None 1a
363 client: Union[PrefectClient, SyncPrefectClient] 1a
365 def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: 1a
366 return self.model_dump(
367 include={"start_time", "input_keyset"},
368 exclude_unset=True,
369 context={"include_secrets": include_secrets},
370 )
373class EngineContext(RunContext): 1a
374 """
375 The context for a flow run. Data in this context is only available from within a
376 flow run function.
378 Attributes:
379 flow: The flow instance associated with the run
380 flow_run: The API metadata for the flow run
381 task_runner: The task runner instance being used for the flow run
382 run_results: A mapping of result ids to run states for this flow run
383 log_prints: Whether to log print statements from the flow run
384 parameters: The parameters passed to the flow run
385 detached: Flag indicating if context has been serialized and sent to remote infrastructure
386 result_store: The result store used to persist results
387 persist_result: Whether to persist the flow run result
388 task_run_dynamic_keys: Counter for task calls allowing unique keys
389 observed_flow_pauses: Counter for flow pauses
390 events: Events worker to emit events
391 """
393 flow: Optional["Flow[Any, Any]"] = None 1a
394 flow_run: Optional[FlowRun] = None 1a
395 task_runner: TaskRunner[Any] 1a
396 log_prints: bool = False 1a
397 parameters: Optional[dict[str, Any]] = None 1a
399 # Flag signaling if the flow run context has been serialized and sent
400 # to remote infrastructure.
401 detached: bool = False 1a
403 # Result handling
404 result_store: ResultStore 1a
405 persist_result: bool = Field(default_factory=get_default_persist_setting) 1a
407 # Counter for task calls allowing unique
408 task_run_dynamic_keys: dict[str, Union[str, int]] = Field(default_factory=dict) 1a
410 # Counter for flow pauses
411 observed_flow_pauses: dict[str, int] = Field(default_factory=dict) 1a
413 # Tracking for result from task runs and sub flows in this flow run for
414 # dependency tracking. Holds the ID of the object returned by
415 # the run and state
416 run_results: dict[int, tuple[State, RunType]] = Field(default_factory=dict) 1a
418 # Tracking information needed to track asset linage between
419 # tasks and materialization
420 task_run_assets: dict[UUID, set[Asset]] = Field(default_factory=dict) 1a
422 # Events worker to emit events
423 events: Optional[EventsWorker] = None 1a
425 __var__: ClassVar[ContextVar[Self]] = ContextVar("flow_run") 1a
427 def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: 1a
428 serialized = self.model_dump(
429 include={
430 "flow_run",
431 "flow",
432 "parameters",
433 "log_prints",
434 "start_time",
435 "input_keyset",
436 "persist_result",
437 },
438 exclude_unset=True,
439 context={"include_secrets": include_secrets},
440 )
441 if self.result_store:
442 serialized["result_store"] = self.result_store.model_dump(
443 serialize_as_any=True,
444 exclude_unset=True,
445 context={"include_secrets": include_secrets},
446 )
447 return serialized
450FlowRunContext = EngineContext # for backwards compatibility 1a
453class TaskRunContext(RunContext): 1a
454 """
455 The context for a task run. Data in this context is only available from within a
456 task run function.
458 Attributes:
459 task: The task instance associated with the task run
460 task_run: The API metadata for this task run
461 """
463 task: "Task[Any, Any]" 1a
464 task_run: TaskRun 1a
465 log_prints: bool = False 1a
466 parameters: dict[str, Any] 1a
468 # Result handling
469 result_store: ResultStore 1a
470 persist_result: bool = Field(default_factory=get_default_persist_setting_for_tasks) 1a
472 __var__: ClassVar[ContextVar[Self]] = ContextVar("task_run") 1a
474 def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: 1a
475 serialized = self.model_dump(
476 include={
477 "task_run",
478 "task",
479 "parameters",
480 "log_prints",
481 "start_time",
482 "input_keyset",
483 "persist_result",
484 },
485 exclude_unset=True,
486 context={"include_secrets": include_secrets},
487 )
488 if self.result_store:
489 serialized["result_store"] = self.result_store.model_dump(
490 serialize_as_any=True,
491 exclude_unset=True,
492 context={"include_secrets": include_secrets},
493 )
494 return serialized
497class AssetContext(ContextModel): 1a
498 """
499 The asset context for a materializing task run. Contains all asset-related information needed
500 for asset event emission and downstream asset dependency propagation.
502 Attributes:
503 direct_asset_dependencies: Assets that this task directly depends on (from task.asset_deps)
504 downstream_assets: Assets that this task will create/materialize (from MaterializingTask.assets)
505 upstream_assets: Assets from upstream task dependencies
506 materialized_by: Tool that materialized the assets (from MaterializingTask.materialized_by)
507 task_run_id: ID of the associated task run
508 materialization_metadata: Metadata for materialized assets
509 """
511 direct_asset_dependencies: set[Asset] = Field(default_factory=set) 1a
512 downstream_assets: set[Asset] = Field(default_factory=set) 1a
513 upstream_assets: set[Asset] = Field(default_factory=set) 1a
514 materialized_by: Optional[str] = None 1a
515 task_run_id: Optional[UUID] = None 1a
516 materialization_metadata: dict[str, dict[str, Any]] = Field(default_factory=dict) 1a
517 copy_to_child_ctx: bool = False 1a
519 __var__: ClassVar[ContextVar[Self]] = ContextVar("asset_context") 1a
521 @classmethod 1a
522 def from_task_and_inputs( 1a
523 cls,
524 task: "Task[Any, Any]",
525 task_run_id: UUID,
526 task_inputs: Optional[dict[str, set[Any]]] = None,
527 copy_to_child_ctx: bool = False,
528 ) -> "AssetContext":
529 """
530 Create an AssetContext from a task and its resolved inputs.
532 Args:
533 task: The task instance
534 task_run_id: The task run ID
535 task_inputs: The resolved task inputs (TaskRunResult objects)
536 copy_to_child_ctx: Whether this context should be copied on a child AssetContext
538 Returns:
539 Configured AssetContext
540 """
541 from prefect.client.schemas import TaskRunResult
542 from prefect.tasks import MaterializingTask
544 upstream_assets: set[Asset] = set()
546 flow_ctx = FlowRunContext.get()
547 if task_inputs and flow_ctx:
548 for name, inputs in task_inputs.items():
549 # Parent task runs are not dependencies
550 # that we want to track
551 if name == "__parents__":
552 continue
554 for task_input in inputs:
555 if isinstance(task_input, TaskRunResult):
556 task_assets = flow_ctx.task_run_assets.get(task_input.id)
557 if task_assets:
558 upstream_assets.update(task_assets)
560 ctx = cls(
561 direct_asset_dependencies=set(task.asset_deps)
562 if task.asset_deps
563 else set(),
564 downstream_assets=set(task.assets)
565 if isinstance(task, MaterializingTask) and task.assets
566 else set(),
567 upstream_assets=upstream_assets,
568 materialized_by=task.materialized_by
569 if isinstance(task, MaterializingTask)
570 else None,
571 task_run_id=task_run_id,
572 copy_to_child_ctx=copy_to_child_ctx,
573 )
574 ctx.update_tracked_assets()
576 return ctx
578 def add_asset_metadata(self, asset_key: str, metadata: dict[str, Any]) -> None: 1a
579 """
580 Add metadata for a materialized asset.
582 Args:
583 asset_key: The asset key
584 metadata: Metadata dictionary to add
586 Raises:
587 ValueError: If asset_key is not in downstream_assets
588 """
589 downstream_keys = {asset.key for asset in self.downstream_assets}
590 if asset_key not in downstream_keys:
591 raise ValueError(
592 "Can only add metadata to assets that are arguments to @materialize"
593 )
595 existing = self.materialization_metadata.get(asset_key, {})
596 self.materialization_metadata[asset_key] = existing | metadata
598 @staticmethod 1a
599 def asset_as_resource(asset: Asset) -> dict[str, str]: 1a
600 """Convert Asset to event resource format."""
601 resource = {"prefect.resource.id": asset.key}
603 if asset.properties:
604 properties_dict = asset.properties.model_dump(exclude_unset=True)
606 if "name" in properties_dict:
607 resource["prefect.resource.name"] = properties_dict["name"]
609 if "description" in properties_dict:
610 resource["prefect.asset.description"] = properties_dict["description"]
612 if "url" in properties_dict:
613 resource["prefect.asset.url"] = properties_dict["url"]
615 if "owners" in properties_dict:
616 resource["prefect.asset.owners"] = json.dumps(properties_dict["owners"])
618 return resource
620 @staticmethod 1a
621 def asset_as_related(asset: Asset) -> dict[str, str]: 1a
622 """Convert Asset to event related format."""
623 return {
624 "prefect.resource.id": asset.key,
625 "prefect.resource.role": "asset",
626 }
628 @staticmethod 1a
629 def related_materialized_by(by: str) -> dict[str, str]: 1a
630 """Create a related resource for the tool that performed the materialization"""
631 return {
632 "prefect.resource.id": by,
633 "prefect.resource.role": "asset-materialized-by",
634 }
636 def emit_events(self, state: State) -> None: 1a
637 """
638 Emit asset events
639 """
641 from prefect.events import emit_event
643 if state.name == "Cached":
644 return
645 elif state.is_failed():
646 event_status = "failed"
647 elif state.is_completed():
648 event_status = "succeeded"
649 else:
650 return
652 # If we have no downstream assets, this not a materialization
653 if not self.downstream_assets:
654 return
656 # Emit reference events for all upstream assets (direct + inherited)
657 all_upstream_assets = self.upstream_assets | self.direct_asset_dependencies
658 for asset in all_upstream_assets:
659 emit_event(
660 event="prefect.asset.referenced",
661 resource=self.asset_as_resource(asset),
662 related=[],
663 )
665 # Emit materialization events for downstream assets
666 upstream_related = [self.asset_as_related(a) for a in all_upstream_assets]
668 if self.materialized_by:
669 upstream_related.append(self.related_materialized_by(self.materialized_by))
671 for asset in self.downstream_assets:
672 emit_event(
673 event=f"prefect.asset.materialization.{event_status}",
674 resource=self.asset_as_resource(asset),
675 related=upstream_related,
676 payload=self.materialization_metadata.get(asset.key),
677 )
679 def update_tracked_assets(self) -> None: 1a
680 """
681 Update the flow run context with assets that should be propagated downstream.
682 """
683 if not (flow_run_context := FlowRunContext.get()):
684 return
686 if not self.task_run_id:
687 return
689 if self.downstream_assets:
690 # MaterializingTask: propagate the downstream assets (what we create)
691 assets_for_downstream = set(self.downstream_assets)
692 else:
693 # Regular task: propagate upstream assets + direct dependencies
694 assets_for_downstream = set(
695 self.upstream_assets | self.direct_asset_dependencies
696 )
698 flow_run_context.task_run_assets[self.task_run_id] = assets_for_downstream
700 def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: 1a
701 """Serialize the AssetContext for distributed execution."""
702 return self.model_dump(
703 # use json serialization so fields that are
704 # sets of pydantic models are serialized
705 mode="json",
706 exclude_unset=True,
707 serialize_as_any=True,
708 context={"include_secrets": include_secrets},
709 )
712class TagsContext(ContextModel): 1a
713 """
714 The context for `prefect.tags` management.
716 Attributes:
717 current_tags: A set of current tags in the context
718 """
720 current_tags: set[str] = Field(default_factory=set) 1a
722 @classmethod 1a
723 def get(cls) -> Self: 1a
724 # Return an empty `TagsContext` instead of `None` if no context exists
725 return cls.__var__.get(cls())
727 __var__: ClassVar[ContextVar[Self]] = ContextVar("tags") 1a
730class SettingsContext(ContextModel): 1a
731 """
732 The context for a Prefect settings.
734 This allows for safe concurrent access and modification of settings.
736 Attributes:
737 profile: The profile that is in use.
738 settings: The complete settings model.
739 """
741 profile: Profile 1a
742 settings: Settings 1a
744 __var__: ClassVar[ContextVar[Self]] = ContextVar("settings") 1a
746 def __hash__(self: Self) -> int: 1a
747 return hash(self.settings)
749 @classmethod 1a
750 def get(cls) -> Optional["SettingsContext"]: 1a
751 # Return the global context instead of `None` if no context exists
752 try: 1abcdefghijklm
753 return super().get() or GLOBAL_SETTINGS_CONTEXT 1abcdefghijklm
754 except NameError: 1a
755 # GLOBAL_SETTINGS_CONTEXT has not yet been set; in order to create
756 # it profiles need to be loaded, and that process calls
757 # SettingsContext.get().
758 return None 1a
761# Root deployment context vars for O(1) access in nested flows
762_deployment_id: ContextVar[UUID | None] = ContextVar("deployment_id", default=None) 1a
763_deployment_parameters: ContextVar[dict[str, Any] | None] = ContextVar( 1a
764 "deployment_parameters", default=None
765)
768def get_run_context() -> Union[FlowRunContext, TaskRunContext]: 1a
769 """
770 Get the current run context from within a task or flow function.
772 Returns:
773 A `FlowRunContext` or `TaskRunContext` depending on the function type.
775 Raises
776 RuntimeError: If called outside of a flow or task run.
777 """
778 task_run_ctx = TaskRunContext.get()
779 flow_run_ctx = FlowRunContext.get()
781 # When both contexts exist, determine which represents the currently executing code.
782 # If the flow_run_id from the flow context differs from the task's flow_run_id,
783 # we're in a subflow running inside a task, so prefer the flow context.
784 # Otherwise, we're in a regular task within the flow, so prefer the task context.
785 if task_run_ctx and flow_run_ctx:
786 if (
787 flow_run_ctx.flow_run
788 and flow_run_ctx.flow_run.id != task_run_ctx.task_run.flow_run_id
789 ):
790 return flow_run_ctx
791 return task_run_ctx
793 if task_run_ctx:
794 return task_run_ctx
796 if flow_run_ctx:
797 return flow_run_ctx
799 raise MissingContextError(
800 "No run context available. You are not in a flow or task run context."
801 )
804def get_settings_context() -> SettingsContext: 1a
805 """
806 Get the current settings context which contains profile information and the
807 settings that are being used.
809 Generally, the settings that are being used are a combination of values from the
810 profile and environment. See `prefect.context.use_profile` for more details.
811 """
812 settings_ctx = SettingsContext.get() 1a
814 if not settings_ctx: 814 ↛ 815line 814 didn't jump to line 815 because the condition on line 814 was never true1a
815 raise MissingContextError("No settings context found.")
817 return settings_ctx 1a
820@contextmanager 1a
821def tags(*new_tags: str) -> Generator[set[str], None, None]: 1a
822 """
823 Context manager to add tags to flow and task run calls.
825 Tags are always combined with any existing tags.
827 Yields:
828 The current set of tags
830 Examples:
831 ```python
832 from prefect import tags, task, flow
833 @task
834 def my_task():
835 pass
836 ```
838 Run a task with tags
840 ```python
841 @flow
842 def my_flow():
843 with tags("a", "b"):
844 my_task() # has tags: a, b
845 ```
847 Run a flow with tags
849 ```python
850 @flow
851 def my_flow():
852 pass
853 with tags("a", "b"):
854 my_flow() # has tags: a, b
855 ```
857 Run a task with nested tag contexts
859 ```python
860 @flow
861 def my_flow():
862 with tags("a", "b"):
863 with tags("c", "d"):
864 my_task() # has tags: a, b, c, d
865 my_task() # has tags: a, b
866 ```
868 Inspect the current tags
870 ```python
871 @flow
872 def my_flow():
873 with tags("c", "d"):
874 with tags("e", "f") as current_tags:
875 print(current_tags)
876 with tags("a", "b"):
877 my_flow()
878 # {"a", "b", "c", "d", "e", "f"}
879 ```
880 """
881 current_tags = TagsContext.get().current_tags
882 _new_tags = current_tags.union(new_tags)
883 with TagsContext(current_tags=_new_tags):
884 yield _new_tags
887@contextmanager 1a
888def use_profile( 1a
889 profile: Union[Profile, str],
890 override_environment_variables: bool = False,
891 include_current_context: bool = True,
892) -> Generator[SettingsContext, Any, None]:
893 """
894 Switch to a profile for the duration of this context.
896 Profile contexts are confined to an async context in a single thread.
898 Args:
899 profile: The name of the profile to load or an instance of a Profile.
900 override_environment_variable: If set, variables in the profile will take
901 precedence over current environment variables. By default, environment
902 variables will override profile settings.
903 include_current_context: If set, the new settings will be constructed
904 with the current settings context as a base. If not set, the use_base settings
905 will be loaded from the environment and defaults.
907 Yields:
908 The created `SettingsContext` object
909 """
910 if isinstance(profile, str):
911 profiles = prefect.settings.load_profiles()
912 profile = profiles[profile]
914 if not TYPE_CHECKING:
915 if not isinstance(profile, Profile):
916 raise TypeError(
917 f"Unexpected type {type(profile).__name__!r} for `profile`. "
918 "Expected 'str' or 'Profile'."
919 )
921 # Create a copy of the profiles settings as we will mutate it
922 profile_settings = profile.settings.copy()
923 existing_context = SettingsContext.get()
924 if existing_context and include_current_context:
925 settings = existing_context.settings
926 else:
927 settings = Settings()
929 if not override_environment_variables:
930 for key in os.environ:
931 if key in _get_settings_fields(Settings):
932 profile_settings.pop(_get_settings_fields(Settings)[key], None)
934 new_settings = settings.copy_with_update(updates=profile_settings)
936 with SettingsContext(profile=profile, settings=new_settings) as ctx:
937 yield ctx
940def root_settings_context() -> SettingsContext: 1a
941 """
942 Return the settings context that will exist as the root context for the module.
944 The profile to use is determined with the following precedence
945 - Command line via 'prefect --profile <name>'
946 - Environment variable via 'PREFECT_PROFILE'
947 - Profiles file via the 'active' key
948 """
949 profiles = prefect.settings.load_profiles() 1a
950 active_name = profiles.active_name 1a
951 profile_source = "in the profiles file" 1a
953 if "PREFECT_PROFILE" in os.environ: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true1a
954 active_name = os.environ["PREFECT_PROFILE"]
955 profile_source = "by environment variable"
957 if ( 957 ↛ 962line 957 didn't jump to line 962 because the condition on line 957 was never true
958 sys.argv[0].endswith("/prefect")
959 and len(sys.argv) >= 3
960 and sys.argv[1] == "--profile"
961 ):
962 active_name = sys.argv[2]
963 profile_source = "by command line argument"
965 if active_name not in profiles.names: 965 ↛ 966line 965 didn't jump to line 966 because the condition on line 965 was never true1a
966 print(
967 (
968 f"WARNING: Active profile {active_name!r} set {profile_source} not "
969 "found. The default profile will be used instead. "
970 ),
971 file=sys.stderr,
972 )
973 active_name = "ephemeral"
975 if not (settings := Settings()).home.exists(): 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true1a
976 try:
977 settings.home.mkdir(mode=0o0700, exist_ok=True)
978 except OSError:
979 warnings.warn(
980 (f"Failed to create the Prefect home directory at {settings.home}"),
981 stacklevel=2,
982 )
984 return SettingsContext(profile=profiles[active_name], settings=settings) 1a
986 # Note the above context is exited and the global settings context is used by
987 # an override in the `SettingsContext.get` method.
990GLOBAL_SETTINGS_CONTEXT: SettingsContext = root_settings_context() 1a
993# 2024-07-02: This surfaces an actionable error message for removed objects
994# in Prefect 3.0 upgrade.
995__getattr__: Callable[[str], Any] = getattr_migration(__name__) 1a