Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/worker.py: 32%

70 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from contextlib import asynccontextmanager 1a

2from contextvars import Context, copy_context 1a

3from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type 1a

4from uuid import UUID 1a

5 

6from typing_extensions import Self 1a

7 

8from prefect._internal.concurrency.services import QueueService 1a

9from prefect.settings import ( 1a

10 PREFECT_API_KEY, 

11 PREFECT_API_URL, 

12 PREFECT_CLOUD_API_URL, 

13) 

14from prefect.utilities.context import temporary_context 1a

15 

16from .clients import ( 1a

17 EventsClient, 

18 NullEventsClient, 

19 PrefectCloudEventsClient, 

20 PrefectEventsClient, 

21) 

22from .related import related_resources_from_run_context 1a

23from .schemas.events import Event 1a

24 

25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a

26 from prefect.client.orchestration import PrefectClient 

27 

28 

29def should_emit_events() -> bool: 1a

30 return ( 

31 emit_events_to_cloud() 

32 or should_emit_events_to_running_server() 

33 or should_emit_events_to_ephemeral_server() 

34 ) 

35 

36 

37def emit_events_to_cloud() -> bool: 1a

38 api_url = PREFECT_API_URL.value() 

39 return isinstance(api_url, str) and api_url.startswith( 

40 PREFECT_CLOUD_API_URL.value() 

41 ) 

42 

43 

44def should_emit_events_to_running_server() -> bool: 1a

45 api_url = PREFECT_API_URL.value() 

46 return isinstance(api_url, str) 

47 

48 

49def should_emit_events_to_ephemeral_server() -> bool: 1a

50 return PREFECT_API_KEY.value() is None 

51 

52 

53class EventsWorker(QueueService[Event]): 1a

54 def __init__( 1a

55 self, client_type: Type[EventsClient], client_options: Tuple[Tuple[str, Any]] 

56 ): 

57 super().__init__(client_type, client_options) 

58 self.client_type = client_type 

59 self.client_options = client_options 

60 self._client: EventsClient 

61 self._orchestration_client: "PrefectClient" 

62 self._context_cache: Dict[UUID, Context] = {} 

63 

64 @asynccontextmanager 1a

65 async def _lifespan(self): 1a

66 self._client = self.client_type(**{k: v for k, v in self.client_options}) 

67 from prefect.client.orchestration import get_client 

68 

69 self._orchestration_client = get_client() 

70 async with self._client: 

71 async with self._orchestration_client: 

72 yield 

73 

74 def _prepare_item(self, event: Event) -> Event: 1a

75 self._context_cache[event.id] = copy_context() 

76 return event 

77 

78 async def _handle(self, event: Event): 1a

79 context = self._context_cache.pop(event.id) 

80 with temporary_context(context=context): 

81 await self.attach_related_resources_from_context(event) 

82 

83 await self._client.emit(event) 

84 

85 async def attach_related_resources_from_context(self, event: Event) -> None: 1a

86 if "prefect.resource.lineage-group" in event.resource: 

87 # We attach related resources to lineage events in `emit_lineage_event`, 

88 # instead of the worker, because not all run-related resources are 

89 # upstream from every lineage event (they might be downstream). 

90 # The "related" field in the event schema tracks upstream resources 

91 # only. 

92 return 

93 

94 exclude = {resource.id for resource in event.involved_resources} 

95 event.related += await related_resources_from_run_context( 

96 client=self._orchestration_client, exclude=exclude 

97 ) 

98 

99 @classmethod 1a

100 def instance( 1a

101 cls: Type[Self], client_type: Optional[Type[EventsClient]] = None 

102 ) -> Self: 

103 client_kwargs = {} 

104 

105 # Select a client type for this worker based on settings 

106 if client_type is None: 

107 if emit_events_to_cloud(): 

108 client_type = PrefectCloudEventsClient 

109 client_kwargs = { 

110 "api_url": PREFECT_API_URL.value(), 

111 "api_key": PREFECT_API_KEY.value(), 

112 } 

113 elif should_emit_events_to_running_server(): 

114 client_type = PrefectEventsClient 

115 elif should_emit_events_to_ephemeral_server(): 

116 # create an ephemeral API if none was provided 

117 from prefect.server.api.server import SubprocessASGIServer 

118 

119 server = SubprocessASGIServer() 

120 server.start() 

121 assert server.server_process is not None, "Server process did not start" 

122 

123 client_kwargs = {"api_url": server.api_url} 

124 client_type = PrefectEventsClient 

125 else: 

126 client_type = NullEventsClient 

127 

128 # The base class will take care of returning an existing worker with these 

129 # options if available 

130 return super().instance(client_type, tuple(client_kwargs.items()))