Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/worker.py: 32%
70 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from contextlib import asynccontextmanager 1a
2from contextvars import Context, copy_context 1a
3from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type 1a
4from uuid import UUID 1a
6from typing_extensions import Self 1a
8from prefect._internal.concurrency.services import QueueService 1a
9from prefect.settings import ( 1a
10 PREFECT_API_KEY,
11 PREFECT_API_URL,
12 PREFECT_CLOUD_API_URL,
13)
14from prefect.utilities.context import temporary_context 1a
16from .clients import ( 1a
17 EventsClient,
18 NullEventsClient,
19 PrefectCloudEventsClient,
20 PrefectEventsClient,
21)
22from .related import related_resources_from_run_context 1a
23from .schemas.events import Event 1a
25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a
26 from prefect.client.orchestration import PrefectClient
29def should_emit_events() -> bool: 1a
30 return (
31 emit_events_to_cloud()
32 or should_emit_events_to_running_server()
33 or should_emit_events_to_ephemeral_server()
34 )
37def emit_events_to_cloud() -> bool: 1a
38 api_url = PREFECT_API_URL.value()
39 return isinstance(api_url, str) and api_url.startswith(
40 PREFECT_CLOUD_API_URL.value()
41 )
44def should_emit_events_to_running_server() -> bool: 1a
45 api_url = PREFECT_API_URL.value()
46 return isinstance(api_url, str)
49def should_emit_events_to_ephemeral_server() -> bool: 1a
50 return PREFECT_API_KEY.value() is None
53class EventsWorker(QueueService[Event]): 1a
54 def __init__( 1a
55 self, client_type: Type[EventsClient], client_options: Tuple[Tuple[str, Any]]
56 ):
57 super().__init__(client_type, client_options)
58 self.client_type = client_type
59 self.client_options = client_options
60 self._client: EventsClient
61 self._orchestration_client: "PrefectClient"
62 self._context_cache: Dict[UUID, Context] = {}
64 @asynccontextmanager 1a
65 async def _lifespan(self): 1a
66 self._client = self.client_type(**{k: v for k, v in self.client_options})
67 from prefect.client.orchestration import get_client
69 self._orchestration_client = get_client()
70 async with self._client:
71 async with self._orchestration_client:
72 yield
74 def _prepare_item(self, event: Event) -> Event: 1a
75 self._context_cache[event.id] = copy_context()
76 return event
78 async def _handle(self, event: Event): 1a
79 context = self._context_cache.pop(event.id)
80 with temporary_context(context=context):
81 await self.attach_related_resources_from_context(event)
83 await self._client.emit(event)
85 async def attach_related_resources_from_context(self, event: Event) -> None: 1a
86 if "prefect.resource.lineage-group" in event.resource:
87 # We attach related resources to lineage events in `emit_lineage_event`,
88 # instead of the worker, because not all run-related resources are
89 # upstream from every lineage event (they might be downstream).
90 # The "related" field in the event schema tracks upstream resources
91 # only.
92 return
94 exclude = {resource.id for resource in event.involved_resources}
95 event.related += await related_resources_from_run_context(
96 client=self._orchestration_client, exclude=exclude
97 )
99 @classmethod 1a
100 def instance( 1a
101 cls: Type[Self], client_type: Optional[Type[EventsClient]] = None
102 ) -> Self:
103 client_kwargs = {}
105 # Select a client type for this worker based on settings
106 if client_type is None:
107 if emit_events_to_cloud():
108 client_type = PrefectCloudEventsClient
109 client_kwargs = {
110 "api_url": PREFECT_API_URL.value(),
111 "api_key": PREFECT_API_KEY.value(),
112 }
113 elif should_emit_events_to_running_server():
114 client_type = PrefectEventsClient
115 elif should_emit_events_to_ephemeral_server():
116 # create an ephemeral API if none was provided
117 from prefect.server.api.server import SubprocessASGIServer
119 server = SubprocessASGIServer()
120 server.start()
121 assert server.server_process is not None, "Server process did not start"
123 client_kwargs = {"api_url": server.api_url}
124 client_type = PrefectEventsClient
125 else:
126 client_type = NullEventsClient
128 # The base class will take care of returning an existing worker with these
129 # options if available
130 return super().instance(client_type, tuple(client_kwargs.items()))