Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/related.py: 15%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import asyncio 1a

2from typing import ( 1a

3 TYPE_CHECKING, 

4 Any, 

5 Awaitable, 

6 Callable, 

7 Dict, 

8 Iterable, 

9 List, 

10 Optional, 

11 Set, 

12 Tuple, 

13 Union, 

14) 

15from uuid import UUID 1a

16 

17import prefect.types._datetime 1a

18 

19from .schemas.events import RelatedResource 1a

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1a

22 from prefect._internal.schemas.bases import ObjectBaseModel 

23 from prefect.client.orchestration import PrefectClient 

24 

25ResourceCacheEntry = Dict[str, Union[str, "ObjectBaseModel", None]] 1a

26RelatedResourceCache = Dict[ 1a

27 str, Tuple[ResourceCacheEntry, prefect.types._datetime.DateTime] 

28] 

29 

30MAX_CACHE_SIZE = 100 1a

31RESOURCE_CACHE: RelatedResourceCache = {} 1a

32 

33 

34def tags_as_related_resources(tags: Iterable[str]) -> List[RelatedResource]: 1a

35 return [ 

36 RelatedResource( 

37 { 

38 "prefect.resource.id": f"prefect.tag.{tag}", 

39 "prefect.resource.role": "tag", 

40 } 

41 ) 

42 for tag in sorted(tags) 

43 ] 

44 

45 

46def object_as_related_resource(kind: str, role: str, object: Any) -> RelatedResource: 1a

47 if as_related_resource := getattr(object, "as_related_resource", None): 

48 return as_related_resource(role=role) 

49 

50 resource_id = f"prefect.{kind}.{object.id}" 

51 return RelatedResource( 

52 { 

53 "prefect.resource.id": resource_id, 

54 "prefect.resource.role": role, 

55 "prefect.resource.name": object.name, 

56 } 

57 ) 

58 

59 

60async def related_resources_from_run_context( 1a

61 client: "PrefectClient", 

62 exclude: Optional[Set[str]] = None, 

63) -> List[RelatedResource]: 

64 from prefect.client.schemas.objects import FlowRun 

65 from prefect.context import FlowRunContext, TaskRunContext 

66 

67 if exclude is None: 

68 exclude = set() 

69 

70 flow_run_context = FlowRunContext.get() 

71 task_run_context = TaskRunContext.get() 

72 

73 if not flow_run_context and not task_run_context: 

74 return [] 

75 

76 flow_run_id: Optional[UUID] = getattr( 

77 getattr(flow_run_context, "flow_run", None), "id", None 

78 ) or getattr(getattr(task_run_context, "task_run", None), "flow_run_id", None) 

79 if flow_run_id is None: 

80 return [] 

81 

82 related_objects: List[ResourceCacheEntry] = [] 

83 

84 async def dummy_read() -> ResourceCacheEntry: 

85 return {} 

86 

87 if flow_run_context: 

88 related_objects.append( 

89 { 

90 "kind": "flow-run", 

91 "role": "flow-run", 

92 "object": flow_run_context.flow_run, 

93 }, 

94 ) 

95 else: 

96 related_objects.append( 

97 await _get_and_cache_related_object( 

98 kind="flow-run", 

99 role="flow-run", 

100 client_method=client.read_flow_run, 

101 obj_id=flow_run_id, 

102 cache=RESOURCE_CACHE, 

103 ) 

104 ) 

105 

106 if task_run_context: 

107 related_objects.append( 

108 { 

109 "kind": "task-run", 

110 "role": "task-run", 

111 "object": task_run_context.task_run, 

112 }, 

113 ) 

114 

115 flow_run = related_objects[0]["object"] 

116 

117 if isinstance(flow_run, FlowRun): 

118 related_objects += list( 

119 await asyncio.gather( 

120 _get_and_cache_related_object( 

121 kind="flow", 

122 role="flow", 

123 client_method=client.read_flow, 

124 obj_id=flow_run.flow_id, 

125 cache=RESOURCE_CACHE, 

126 ), 

127 ( 

128 _get_and_cache_related_object( 

129 kind="deployment", 

130 role="deployment", 

131 client_method=client.read_deployment, 

132 obj_id=flow_run.deployment_id, 

133 cache=RESOURCE_CACHE, 

134 ) 

135 if flow_run.deployment_id 

136 else dummy_read() 

137 ), 

138 ( 

139 _get_and_cache_related_object( 

140 kind="work-queue", 

141 role="work-queue", 

142 client_method=client.read_work_queue, 

143 obj_id=flow_run.work_queue_id, 

144 cache=RESOURCE_CACHE, 

145 ) 

146 if flow_run.work_queue_id 

147 else dummy_read() 

148 ), 

149 ( 

150 _get_and_cache_related_object( 

151 kind="work-pool", 

152 role="work-pool", 

153 client_method=client.read_work_pool, 

154 obj_id=flow_run.work_pool_name, 

155 cache=RESOURCE_CACHE, 

156 ) 

157 if flow_run.work_pool_name 

158 else dummy_read() 

159 ), 

160 ) 

161 ) 

162 

163 related = [] 

164 tags = set() 

165 

166 for entry in related_objects: 

167 obj_ = entry.get("object") 

168 if obj_ is None: 

169 continue 

170 

171 assert isinstance(entry["kind"], str) and isinstance(entry["role"], str) 

172 

173 resource = object_as_related_resource( 

174 kind=entry["kind"], role=entry["kind"], object=obj_ 

175 ) 

176 

177 if resource.id in exclude: 

178 continue 

179 

180 related.append(resource) 

181 if hasattr(obj_, "tags"): 

182 tags |= set(obj_.tags) 

183 

184 related += [ 

185 resource 

186 for resource in tags_as_related_resources(tags) 

187 if resource.id not in exclude 

188 ] 

189 

190 return related 

191 

192 

193async def _get_and_cache_related_object( 1a

194 kind: str, 

195 role: str, 

196 client_method: Callable[[Union[UUID, str]], Awaitable[Optional["ObjectBaseModel"]]], 

197 obj_id: Union[UUID, str], 

198 cache: RelatedResourceCache, 

199) -> ResourceCacheEntry: 

200 cache_key = f"{kind}.{obj_id}" 

201 entry = None 

202 

203 if cache_key in cache: 

204 entry, _ = cache[cache_key] 

205 else: 

206 obj_ = await client_method(obj_id) 

207 entry = { 

208 "kind": kind, 

209 "object": obj_, 

210 } 

211 

212 cache[cache_key] = (entry, prefect.types._datetime.now("UTC")) 

213 

214 # In the case of a worker or agent this cache could be long-lived. To keep 

215 # from running out of memory only keep `MAX_CACHE_SIZE` entries in the 

216 # cache. 

217 if len(cache) > MAX_CACHE_SIZE: 

218 oldest_key = sorted( 

219 [(key, timestamp) for key, (_, timestamp) in cache.items()], 

220 key=lambda k: k[1], 

221 )[0][0] 

222 

223 if oldest_key: 

224 del cache[oldest_key] 

225 

226 # Because the role is event specific and can change depending on the 

227 # type of event being emitted, this adds the role from the args to the 

228 # entry before returning it rather than storing it in the cache. 

229 entry["role"] = role 

230 return entry