Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/jinja_filters.py: 36%

32 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 

2 

3import urllib.parse 

4from typing import Any, Callable, Mapping, Optional 

5 

6from jinja2 import pass_context 

7 

8from prefect.server.schemas.core import ( 

9 Deployment, 

10 Flow, 

11 FlowRun, 

12 TaskRun, 

13 WorkPool, 

14 WorkQueue, 

15) 

16from prefect.server.schemas.responses import WorkQueueWithStatus 

17from prefect.server.utilities.schemas import ORMBaseModel 

18from prefect.settings import PREFECT_UI_URL 

19from prefect.utilities.urls import url_for 

20 

21model_to_kind: dict[type[ORMBaseModel], str] = { 

22 Deployment: "prefect.deployment", 

23 Flow: "prefect.flow", 

24 FlowRun: "prefect.flow-run", 

25 TaskRun: "prefect.task-run", 

26 WorkQueue: "prefect.work-queue", 

27 WorkQueueWithStatus: "prefect.work-queue", 

28 WorkPool: "prefect.work-pool", 

29} 

30 

31 

32@pass_context 

33def ui_url(ctx: Mapping[str, Any], obj: Any) -> Optional[str]: 

34 """Return the UI URL for the given object.""" 

35 return url_for(obj, url_type="ui") 

36 

37 

38@pass_context 

39def ui_resource_events_url(ctx: Mapping[str, Any], obj: Any) -> Optional[str]: 

40 """Given a Resource or Model, return a UI link to the events page 

41 filtered for that resource. If an unsupported object is provided, 

42 return `None`. 

43 

44 Currently supports Automation, Resource, Deployment, Flow, FlowRun, TaskRun, and 

45 WorkQueue objects. Within a Resource, deployment, flow, flow-run, task-run, 

46 and work-queue are supported.""" 

47 from prefect.server.events.schemas.automations import Automation 

48 from prefect.server.events.schemas.events import Resource 

49 

50 url = None 

51 url_format = "events?resource={resource_id}" 

52 

53 if isinstance(obj, Automation): 

54 url = url_format.format(resource_id=f"prefect.automation.{obj.id}") 

55 elif isinstance(obj, Resource): 

56 kind, _, id = obj.id.rpartition(".") 

57 url = url_format.format(resource_id=f"{kind}.{id}") 

58 elif isinstance(obj, ORMBaseModel): 

59 kind = model_to_kind.get(type(obj)) # type: ignore 

60 

61 if kind: 

62 url = url_format.format(resource_id=f"{kind}.{obj.id}") 

63 if url: 

64 return urllib.parse.urljoin(PREFECT_UI_URL.value(), url) 

65 else: 

66 return None 

67 

68 

69all_filters: dict[str, Callable[[Mapping[str, Any], Any], str | None]] = { 

70 "ui_url": ui_url, 

71 "ui_resource_events_url": ui_resource_events_url, 

72}