Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/events.py: 18%

92 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import asyncio 1a

2import json 1a

3from enum import Enum 1a

4from typing import Optional 1a

5 

6import orjson 1a

7import typer 1a

8import websockets 1a

9from anyio import open_file 1a

10 

11from prefect.cli._types import PrefectTyper 1a

12from prefect.cli._utilities import exit_with_error 1a

13from prefect.cli.root import app 1a

14from prefect.events import Event 1a

15from prefect.events.clients import ( 1a

16 PrefectCloudAccountEventSubscriber, 

17 get_events_client, 

18 get_events_subscriber, 

19) 

20 

21events_app: PrefectTyper = PrefectTyper(name="events", help="Stream events.") 1a

22app.add_typer(events_app, aliases=["event"]) 1a

23 

24 

25class StreamFormat(str, Enum): 1a

26 json = "json" 1a

27 text = "text" 1a

28 

29 

30@events_app.command() 1a

31async def stream( 1a

32 format: StreamFormat = typer.Option( 

33 StreamFormat.json, "--format", help="Output format (json or text)" 

34 ), 

35 output_file: str = typer.Option( 

36 None, "--output-file", help="File to write events to" 

37 ), 

38 account: bool = typer.Option( 

39 False, 

40 "--account", 

41 help="Stream events for entire account, including audit logs", 

42 ), 

43 run_once: bool = typer.Option(False, "--run-once", help="Stream only one event"), 

44): 

45 """Subscribes to the event stream of a workspace, printing each event 

46 as it is received. By default, events are printed as JSON, but can be 

47 printed as text by passing `--format text`. 

48 """ 

49 

50 try: 

51 if account: 

52 events_subscriber = PrefectCloudAccountEventSubscriber() 

53 else: 

54 events_subscriber = get_events_subscriber() 

55 

56 app.console.print("Subscribing to event stream...") 

57 async with events_subscriber as subscriber: 

58 async for event in subscriber: 

59 await handle_event(event, format, output_file) 

60 if run_once: 

61 typer.Exit(0) 

62 except Exception as exc: 

63 handle_error(exc) 

64 

65 

66async def handle_event(event: Event, format: StreamFormat, output_file: str) -> None: 1a

67 if format == StreamFormat.json: 

68 event_data = orjson.dumps(event.model_dump(), default=str).decode() 

69 elif format == StreamFormat.text: 

70 event_data = f"{event.occurred.isoformat()} {event.event} {event.resource.id}" 

71 else: 

72 raise ValueError(f"Unknown format: {format}") 

73 if output_file: 

74 async with open_file(output_file, "a") as f: # type: ignore 

75 await f.write(event_data + "\n") 

76 else: 

77 print(event_data) 

78 

79 

80def handle_error(exc: Exception) -> None: 1a

81 if isinstance(exc, websockets.exceptions.ConnectionClosedError): 

82 exit_with_error(f"Connection closed, retrying... ({exc})") 

83 elif isinstance(exc, (KeyboardInterrupt, asyncio.exceptions.CancelledError)): 

84 exit_with_error("Exiting...") 

85 elif isinstance(exc, (PermissionError)): 

86 exit_with_error(f"Error writing to file: {exc}") 

87 else: 

88 exit_with_error(f"An unexpected error occurred: {exc}") 

89 

90 

91@events_app.command() 1a

92async def emit( 1a

93 event: str = typer.Argument(help="The name of the event"), 

94 resource: str = typer.Option( 

95 None, 

96 "--resource", 

97 "-r", 

98 help="Resource specification as 'key=value' or JSON. Can be used multiple times.", 

99 ), 

100 resource_id: str = typer.Option( 

101 None, 

102 "--resource-id", 

103 help="The resource ID (shorthand for --resource prefect.resource.id=<id>)", 

104 ), 

105 related: Optional[str] = typer.Option( 

106 None, 

107 "--related", 

108 help="Related resources as JSON string", 

109 ), 

110 payload: Optional[str] = typer.Option( 

111 None, 

112 "--payload", 

113 "-p", 

114 help="Event payload as JSON string", 

115 ), 

116): 

117 """Emit a single event to Prefect. 

118 

119 Examples: 

120 # Simple event with resource ID 

121 prefect event emit user.logged_in --resource-id user-123 

122 

123 # Event with payload 

124 prefect event emit order.shipped --resource-id order-456 --payload '{"tracking": "ABC123"}' 

125 

126 # Event with full resource specification 

127 prefect event emit customer.subscribed --resource '{"prefect.resource.id": "customer-789", "prefect.resource.name": "ACME Corp"}' 

128 """ 

129 resource_dict = {} 

130 

131 if resource: 

132 try: 

133 parsed = json.loads(resource) 

134 if not isinstance(parsed, dict): 

135 exit_with_error( 

136 "Resource must be a JSON object, not an array or string" 

137 ) 

138 resource_dict = parsed 

139 except json.JSONDecodeError: 

140 if "=" in resource: 

141 key, value = resource.split("=", 1) 

142 resource_dict[key] = value 

143 else: 

144 exit_with_error("Resource must be JSON or 'key=value' format") 

145 

146 if resource_id: 

147 resource_dict["prefect.resource.id"] = resource_id 

148 

149 if "prefect.resource.id" not in resource_dict: 

150 exit_with_error("Resource must include 'prefect.resource.id'") 

151 

152 related_list = None 

153 if related: 

154 try: 

155 parsed_related = json.loads(related) 

156 if isinstance(parsed_related, dict): 

157 related_list = [parsed_related] 

158 elif isinstance(parsed_related, list): 

159 related_list = parsed_related 

160 else: 

161 exit_with_error("Related resources must be a JSON object or array") 

162 except json.JSONDecodeError: 

163 exit_with_error("Related resources must be valid JSON") 

164 

165 payload_dict = None 

166 if payload: 

167 try: 

168 parsed_payload = json.loads(payload) 

169 if not isinstance(parsed_payload, dict): 

170 exit_with_error("Payload must be a JSON object") 

171 payload_dict = parsed_payload 

172 except json.JSONDecodeError: 

173 exit_with_error("Payload must be valid JSON") 

174 

175 event_obj = Event( 

176 event=event, 

177 resource=resource_dict, 

178 related=related_list or [], 

179 payload=payload_dict or {}, 

180 ) 

181 

182 async with get_events_client() as events_client: 

183 await events_client.emit(event_obj) 

184 

185 app.console.print(f"Successfully emitted event '{event}' with ID {event_obj.id}")