Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/cloud/webhook.py: 30%

82 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Command line interface for working with webhooks 

3""" 

4 

5from typing import Dict, List 1a

6from uuid import UUID 1a

7 

8import typer 1a

9from rich.table import Table 1a

10 

11from prefect.cli._types import PrefectTyper 1a

12from prefect.cli._utilities import exit_with_error 1a

13from prefect.cli.cloud import cloud_app, confirm_logged_in 1a

14from prefect.cli.root import app, is_interactive 1a

15from prefect.client.cloud import get_cloud_client 1a

16from prefect.exceptions import ObjectNotFound 1a

17from prefect.settings import get_current_settings 1a

18 

19webhook_app: PrefectTyper = PrefectTyper( 1a

20 name="webhook", help="Manage Prefect Cloud Webhooks" 

21) 

22cloud_app.add_typer(webhook_app, aliases=["webhooks"]) 1a

23 

24 

25def _render_webhooks_into_table(webhooks: List[Dict[str, str]]) -> Table: 1a

26 display_table = Table(show_lines=True) 

27 for field in ["webhook id", "url slug", "name", "enabled?", "template"]: 

28 # overflow=fold allows the entire table value to display in the terminal 

29 # even if it is too long for the computed column width 

30 # https://rich.readthedocs.io/en/stable/reference/table.html#rich.table.Column.overflow 

31 display_table.add_column(field, overflow="fold") 

32 

33 for webhook in webhooks: 

34 display_table.add_row( 

35 webhook["id"], 

36 webhook["slug"], 

37 webhook["name"], 

38 str(webhook["enabled"]), 

39 webhook["template"], 

40 ) 

41 return display_table 

42 

43 

44@webhook_app.command() 1a

45async def ls(): 1a

46 """ 

47 Fetch and list all webhooks in your workspace 

48 """ 

49 confirm_logged_in() 

50 

51 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree 

52 async with get_cloud_client(host=get_current_settings().api.url) as client: 

53 retrieved_webhooks = await client.request("POST", "/webhooks/filter") 

54 display_table = _render_webhooks_into_table(retrieved_webhooks) 

55 app.console.print(display_table) 

56 

57 

58@webhook_app.command() 1a

59async def get(webhook_id: UUID): 1a

60 """ 

61 Retrieve a webhook by ID. 

62 """ 

63 confirm_logged_in() 

64 

65 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree 

66 async with get_cloud_client(host=get_current_settings().api.url) as client: 

67 webhook = await client.request("GET", f"/webhooks/{webhook_id}") 

68 display_table = _render_webhooks_into_table([webhook]) 

69 app.console.print(display_table) 

70 

71 

72@webhook_app.command() 1a

73async def create( 1a

74 webhook_name: str, 

75 description: str = typer.Option( 

76 "", "--description", "-d", help="Description of the webhook" 

77 ), 

78 template: str = typer.Option( 

79 None, "--template", "-t", help="Jinja2 template expression" 

80 ), 

81): 

82 """ 

83 Create a new Cloud webhook 

84 """ 

85 if not template: 

86 exit_with_error( 

87 "Please provide a Jinja2 template expression in the --template flag \nwhich" 

88 ' should define (at minimum) the following attributes: \n{ "event":' 

89 ' "your.event.name", "resource": { "prefect.resource.id":' 

90 ' "your.resource.id" } }' 

91 " \nhttps://docs.prefect.io/latest/automate/events/webhook-triggers#webhook-templates" 

92 ) 

93 

94 confirm_logged_in() 

95 

96 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree 

97 async with get_cloud_client(host=get_current_settings().api.url) as client: 

98 response = await client.request( 

99 "POST", 

100 "/webhooks/", 

101 json={ 

102 "name": webhook_name, 

103 "description": description, 

104 "template": template, 

105 }, 

106 ) 

107 app.console.print(f"Successfully created webhook {response['name']}") 

108 

109 

110@webhook_app.command() 1a

111async def rotate(webhook_id: UUID): 1a

112 """ 

113 Rotate url for an existing Cloud webhook, in case it has been compromised 

114 """ 

115 confirm_logged_in() 

116 

117 confirm_rotate = typer.confirm( 

118 "Are you sure you want to rotate? This will invalidate the old URL." 

119 ) 

120 

121 if not confirm_rotate: 

122 return 

123 

124 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree 

125 async with get_cloud_client(host=get_current_settings().api.url) as client: 

126 response = await client.request("POST", f"/webhooks/{webhook_id}/rotate") 

127 app.console.print(f"Successfully rotated webhook URL to {response['slug']}") 

128 

129 

130@webhook_app.command() 1a

131async def toggle( 1a

132 webhook_id: UUID, 

133): 

134 """ 

135 Toggle the enabled status of an existing Cloud webhook 

136 """ 

137 confirm_logged_in() 

138 

139 status_lookup = {True: "enabled", False: "disabled"} 

140 

141 async with get_cloud_client(host=get_current_settings().api.url) as client: 

142 response = await client.request("GET", f"/webhooks/{webhook_id}") 

143 current_status = response["enabled"] 

144 new_status = not current_status 

145 

146 await client.request( 

147 "PATCH", f"/webhooks/{webhook_id}", json={"enabled": new_status} 

148 ) 

149 app.console.print(f"Webhook is now {status_lookup[new_status]}") 

150 

151 

152@webhook_app.command() 1a

153async def update( 1a

154 webhook_id: UUID, 

155 webhook_name: str = typer.Option(None, "--name", "-n", help="Webhook name"), 

156 description: str = typer.Option( 

157 None, "--description", "-d", help="Description of the webhook" 

158 ), 

159 template: str = typer.Option( 

160 None, "--template", "-t", help="Jinja2 template expression" 

161 ), 

162): 

163 """ 

164 Partially update an existing Cloud webhook 

165 """ 

166 confirm_logged_in() 

167 

168 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree 

169 async with get_cloud_client(host=get_current_settings().api.url) as client: 

170 response = await client.request("GET", f"/webhooks/{webhook_id}") 

171 update_payload = { 

172 "name": webhook_name or response["name"], 

173 "description": description or response["description"], 

174 "template": template or response["template"], 

175 } 

176 

177 await client.request("PUT", f"/webhooks/{webhook_id}", json=update_payload) 

178 app.console.print(f"Successfully updated webhook {webhook_id}") 

179 

180 

181@webhook_app.command() 1a

182async def delete(webhook_id: UUID): 1a

183 """ 

184 Delete an existing Cloud webhook 

185 """ 

186 confirm_logged_in() 

187 

188 if is_interactive() and not typer.confirm( 

189 (f"Are you sure you want to delete webhook with id '{webhook_id!s}'?"), 

190 default=False, 

191 ): 

192 exit_with_error("Deletion aborted.") 

193 

194 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree 

195 async with get_cloud_client(host=get_current_settings().api.url) as client: 

196 try: 

197 await client.request("DELETE", f"/webhooks/{webhook_id}") 

198 app.console.print(f"Successfully deleted webhook {webhook_id}") 

199 except ObjectNotFound: 

200 exit_with_error(f"Webhook with id '{webhook_id!s}' not found.") 

201 except Exception as exc: 

202 exit_with_error(f"Error deleting webhook: {exc}")