Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/jinja_filters.py: 0%
32 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations
3import urllib.parse
4from typing import Any, Callable, Mapping, Optional
6from jinja2 import pass_context
8from prefect.server.schemas.core import (
9 Deployment,
10 Flow,
11 FlowRun,
12 TaskRun,
13 WorkPool,
14 WorkQueue,
15)
16from prefect.server.schemas.responses import WorkQueueWithStatus
17from prefect.server.utilities.schemas import ORMBaseModel
18from prefect.settings import PREFECT_UI_URL
19from prefect.utilities.urls import url_for
21model_to_kind: dict[type[ORMBaseModel], str] = {
22 Deployment: "prefect.deployment",
23 Flow: "prefect.flow",
24 FlowRun: "prefect.flow-run",
25 TaskRun: "prefect.task-run",
26 WorkQueue: "prefect.work-queue",
27 WorkQueueWithStatus: "prefect.work-queue",
28 WorkPool: "prefect.work-pool",
29}
32@pass_context
33def ui_url(ctx: Mapping[str, Any], obj: Any) -> Optional[str]:
34 """Return the UI URL for the given object."""
35 return url_for(obj, url_type="ui")
38@pass_context
39def ui_resource_events_url(ctx: Mapping[str, Any], obj: Any) -> Optional[str]:
40 """Given a Resource or Model, return a UI link to the events page
41 filtered for that resource. If an unsupported object is provided,
42 return `None`.
44 Currently supports Automation, Resource, Deployment, Flow, FlowRun, TaskRun, and
45 WorkQueue objects. Within a Resource, deployment, flow, flow-run, task-run,
46 and work-queue are supported."""
47 from prefect.server.events.schemas.automations import Automation
48 from prefect.server.events.schemas.events import Resource
50 url = None
51 url_format = "events?resource={resource_id}"
53 if isinstance(obj, Automation):
54 url = url_format.format(resource_id=f"prefect.automation.{obj.id}")
55 elif isinstance(obj, Resource):
56 kind, _, id = obj.id.rpartition(".")
57 url = url_format.format(resource_id=f"{kind}.{id}")
58 elif isinstance(obj, ORMBaseModel):
59 kind = model_to_kind.get(type(obj)) # type: ignore
61 if kind:
62 url = url_format.format(resource_id=f"{kind}.{obj.id}")
63 if url:
64 return urllib.parse.urljoin(PREFECT_UI_URL.value(), url)
65 else:
66 return None
69all_filters: dict[str, Callable[[Mapping[str, Any], Any], str | None]] = {
70 "ui_url": ui_url,
71 "ui_resource_events_url": ui_resource_events_url,
72}