Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/related.py: 15%
71 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import asyncio 1a
2from typing import ( 1a
3 TYPE_CHECKING,
4 Any,
5 Awaitable,
6 Callable,
7 Dict,
8 Iterable,
9 List,
10 Optional,
11 Set,
12 Tuple,
13 Union,
14)
15from uuid import UUID 1a
17import prefect.types._datetime 1a
19from .schemas.events import RelatedResource 1a
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1a
22 from prefect._internal.schemas.bases import ObjectBaseModel
23 from prefect.client.orchestration import PrefectClient
25ResourceCacheEntry = Dict[str, Union[str, "ObjectBaseModel", None]] 1a
26RelatedResourceCache = Dict[ 1a
27 str, Tuple[ResourceCacheEntry, prefect.types._datetime.DateTime]
28]
30MAX_CACHE_SIZE = 100 1a
31RESOURCE_CACHE: RelatedResourceCache = {} 1a
34def tags_as_related_resources(tags: Iterable[str]) -> List[RelatedResource]: 1a
35 return [
36 RelatedResource(
37 {
38 "prefect.resource.id": f"prefect.tag.{tag}",
39 "prefect.resource.role": "tag",
40 }
41 )
42 for tag in sorted(tags)
43 ]
46def object_as_related_resource(kind: str, role: str, object: Any) -> RelatedResource: 1a
47 if as_related_resource := getattr(object, "as_related_resource", None):
48 return as_related_resource(role=role)
50 resource_id = f"prefect.{kind}.{object.id}"
51 return RelatedResource(
52 {
53 "prefect.resource.id": resource_id,
54 "prefect.resource.role": role,
55 "prefect.resource.name": object.name,
56 }
57 )
60async def related_resources_from_run_context( 1a
61 client: "PrefectClient",
62 exclude: Optional[Set[str]] = None,
63) -> List[RelatedResource]:
64 from prefect.client.schemas.objects import FlowRun
65 from prefect.context import FlowRunContext, TaskRunContext
67 if exclude is None:
68 exclude = set()
70 flow_run_context = FlowRunContext.get()
71 task_run_context = TaskRunContext.get()
73 if not flow_run_context and not task_run_context:
74 return []
76 flow_run_id: Optional[UUID] = getattr(
77 getattr(flow_run_context, "flow_run", None), "id", None
78 ) or getattr(getattr(task_run_context, "task_run", None), "flow_run_id", None)
79 if flow_run_id is None:
80 return []
82 related_objects: List[ResourceCacheEntry] = []
84 async def dummy_read() -> ResourceCacheEntry:
85 return {}
87 if flow_run_context:
88 related_objects.append(
89 {
90 "kind": "flow-run",
91 "role": "flow-run",
92 "object": flow_run_context.flow_run,
93 },
94 )
95 else:
96 related_objects.append(
97 await _get_and_cache_related_object(
98 kind="flow-run",
99 role="flow-run",
100 client_method=client.read_flow_run,
101 obj_id=flow_run_id,
102 cache=RESOURCE_CACHE,
103 )
104 )
106 if task_run_context:
107 related_objects.append(
108 {
109 "kind": "task-run",
110 "role": "task-run",
111 "object": task_run_context.task_run,
112 },
113 )
115 flow_run = related_objects[0]["object"]
117 if isinstance(flow_run, FlowRun):
118 related_objects += list(
119 await asyncio.gather(
120 _get_and_cache_related_object(
121 kind="flow",
122 role="flow",
123 client_method=client.read_flow,
124 obj_id=flow_run.flow_id,
125 cache=RESOURCE_CACHE,
126 ),
127 (
128 _get_and_cache_related_object(
129 kind="deployment",
130 role="deployment",
131 client_method=client.read_deployment,
132 obj_id=flow_run.deployment_id,
133 cache=RESOURCE_CACHE,
134 )
135 if flow_run.deployment_id
136 else dummy_read()
137 ),
138 (
139 _get_and_cache_related_object(
140 kind="work-queue",
141 role="work-queue",
142 client_method=client.read_work_queue,
143 obj_id=flow_run.work_queue_id,
144 cache=RESOURCE_CACHE,
145 )
146 if flow_run.work_queue_id
147 else dummy_read()
148 ),
149 (
150 _get_and_cache_related_object(
151 kind="work-pool",
152 role="work-pool",
153 client_method=client.read_work_pool,
154 obj_id=flow_run.work_pool_name,
155 cache=RESOURCE_CACHE,
156 )
157 if flow_run.work_pool_name
158 else dummy_read()
159 ),
160 )
161 )
163 related = []
164 tags = set()
166 for entry in related_objects:
167 obj_ = entry.get("object")
168 if obj_ is None:
169 continue
171 assert isinstance(entry["kind"], str) and isinstance(entry["role"], str)
173 resource = object_as_related_resource(
174 kind=entry["kind"], role=entry["kind"], object=obj_
175 )
177 if resource.id in exclude:
178 continue
180 related.append(resource)
181 if hasattr(obj_, "tags"):
182 tags |= set(obj_.tags)
184 related += [
185 resource
186 for resource in tags_as_related_resources(tags)
187 if resource.id not in exclude
188 ]
190 return related
193async def _get_and_cache_related_object( 1a
194 kind: str,
195 role: str,
196 client_method: Callable[[Union[UUID, str]], Awaitable[Optional["ObjectBaseModel"]]],
197 obj_id: Union[UUID, str],
198 cache: RelatedResourceCache,
199) -> ResourceCacheEntry:
200 cache_key = f"{kind}.{obj_id}"
201 entry = None
203 if cache_key in cache:
204 entry, _ = cache[cache_key]
205 else:
206 obj_ = await client_method(obj_id)
207 entry = {
208 "kind": kind,
209 "object": obj_,
210 }
212 cache[cache_key] = (entry, prefect.types._datetime.now("UTC"))
214 # In the case of a worker or agent this cache could be long-lived. To keep
215 # from running out of memory only keep `MAX_CACHE_SIZE` entries in the
216 # cache.
217 if len(cache) > MAX_CACHE_SIZE:
218 oldest_key = sorted(
219 [(key, timestamp) for key, (_, timestamp) in cache.items()],
220 key=lambda k: k[1],
221 )[0][0]
223 if oldest_key:
224 del cache[oldest_key]
226 # Because the role is event specific and can change depending on the
227 # type of event being emitted, this adds the role from the args to the
228 # entry before returning it rather than storing it in the cache.
229 entry["role"] = role
230 return entry