Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/events.py: 18%
92 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import asyncio 1a
2import json 1a
3from enum import Enum 1a
4from typing import Optional 1a
6import orjson 1a
7import typer 1a
8import websockets 1a
9from anyio import open_file 1a
11from prefect.cli._types import PrefectTyper 1a
12from prefect.cli._utilities import exit_with_error 1a
13from prefect.cli.root import app 1a
14from prefect.events import Event 1a
15from prefect.events.clients import ( 1a
16 PrefectCloudAccountEventSubscriber,
17 get_events_client,
18 get_events_subscriber,
19)
21events_app: PrefectTyper = PrefectTyper(name="events", help="Stream events.") 1a
22app.add_typer(events_app, aliases=["event"]) 1a
25class StreamFormat(str, Enum): 1a
26 json = "json" 1a
27 text = "text" 1a
30@events_app.command() 1a
31async def stream( 1a
32 format: StreamFormat = typer.Option(
33 StreamFormat.json, "--format", help="Output format (json or text)"
34 ),
35 output_file: str = typer.Option(
36 None, "--output-file", help="File to write events to"
37 ),
38 account: bool = typer.Option(
39 False,
40 "--account",
41 help="Stream events for entire account, including audit logs",
42 ),
43 run_once: bool = typer.Option(False, "--run-once", help="Stream only one event"),
44):
45 """Subscribes to the event stream of a workspace, printing each event
46 as it is received. By default, events are printed as JSON, but can be
47 printed as text by passing `--format text`.
48 """
50 try:
51 if account:
52 events_subscriber = PrefectCloudAccountEventSubscriber()
53 else:
54 events_subscriber = get_events_subscriber()
56 app.console.print("Subscribing to event stream...")
57 async with events_subscriber as subscriber:
58 async for event in subscriber:
59 await handle_event(event, format, output_file)
60 if run_once:
61 typer.Exit(0)
62 except Exception as exc:
63 handle_error(exc)
66async def handle_event(event: Event, format: StreamFormat, output_file: str) -> None: 1a
67 if format == StreamFormat.json:
68 event_data = orjson.dumps(event.model_dump(), default=str).decode()
69 elif format == StreamFormat.text:
70 event_data = f"{event.occurred.isoformat()} {event.event} {event.resource.id}"
71 else:
72 raise ValueError(f"Unknown format: {format}")
73 if output_file:
74 async with open_file(output_file, "a") as f: # type: ignore
75 await f.write(event_data + "\n")
76 else:
77 print(event_data)
80def handle_error(exc: Exception) -> None: 1a
81 if isinstance(exc, websockets.exceptions.ConnectionClosedError):
82 exit_with_error(f"Connection closed, retrying... ({exc})")
83 elif isinstance(exc, (KeyboardInterrupt, asyncio.exceptions.CancelledError)):
84 exit_with_error("Exiting...")
85 elif isinstance(exc, (PermissionError)):
86 exit_with_error(f"Error writing to file: {exc}")
87 else:
88 exit_with_error(f"An unexpected error occurred: {exc}")
91@events_app.command() 1a
92async def emit( 1a
93 event: str = typer.Argument(help="The name of the event"),
94 resource: str = typer.Option(
95 None,
96 "--resource",
97 "-r",
98 help="Resource specification as 'key=value' or JSON. Can be used multiple times.",
99 ),
100 resource_id: str = typer.Option(
101 None,
102 "--resource-id",
103 help="The resource ID (shorthand for --resource prefect.resource.id=<id>)",
104 ),
105 related: Optional[str] = typer.Option(
106 None,
107 "--related",
108 help="Related resources as JSON string",
109 ),
110 payload: Optional[str] = typer.Option(
111 None,
112 "--payload",
113 "-p",
114 help="Event payload as JSON string",
115 ),
116):
117 """Emit a single event to Prefect.
119 Examples:
120 # Simple event with resource ID
121 prefect event emit user.logged_in --resource-id user-123
123 # Event with payload
124 prefect event emit order.shipped --resource-id order-456 --payload '{"tracking": "ABC123"}'
126 # Event with full resource specification
127 prefect event emit customer.subscribed --resource '{"prefect.resource.id": "customer-789", "prefect.resource.name": "ACME Corp"}'
128 """
129 resource_dict = {}
131 if resource:
132 try:
133 parsed = json.loads(resource)
134 if not isinstance(parsed, dict):
135 exit_with_error(
136 "Resource must be a JSON object, not an array or string"
137 )
138 resource_dict = parsed
139 except json.JSONDecodeError:
140 if "=" in resource:
141 key, value = resource.split("=", 1)
142 resource_dict[key] = value
143 else:
144 exit_with_error("Resource must be JSON or 'key=value' format")
146 if resource_id:
147 resource_dict["prefect.resource.id"] = resource_id
149 if "prefect.resource.id" not in resource_dict:
150 exit_with_error("Resource must include 'prefect.resource.id'")
152 related_list = None
153 if related:
154 try:
155 parsed_related = json.loads(related)
156 if isinstance(parsed_related, dict):
157 related_list = [parsed_related]
158 elif isinstance(parsed_related, list):
159 related_list = parsed_related
160 else:
161 exit_with_error("Related resources must be a JSON object or array")
162 except json.JSONDecodeError:
163 exit_with_error("Related resources must be valid JSON")
165 payload_dict = None
166 if payload:
167 try:
168 parsed_payload = json.loads(payload)
169 if not isinstance(parsed_payload, dict):
170 exit_with_error("Payload must be a JSON object")
171 payload_dict = parsed_payload
172 except json.JSONDecodeError:
173 exit_with_error("Payload must be valid JSON")
175 event_obj = Event(
176 event=event,
177 resource=resource_dict,
178 related=related_list or [],
179 payload=payload_dict or {},
180 )
182 async with get_events_client() as events_client:
183 await events_client.emit(event_obj)
185 app.console.print(f"Successfully emitted event '{event}' with ID {event_obj.id}")