Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/cloud/webhook.py: 30%
82 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Command line interface for working with webhooks
3"""
5from typing import Dict, List 1a
6from uuid import UUID 1a
8import typer 1a
9from rich.table import Table 1a
11from prefect.cli._types import PrefectTyper 1a
12from prefect.cli._utilities import exit_with_error 1a
13from prefect.cli.cloud import cloud_app, confirm_logged_in 1a
14from prefect.cli.root import app, is_interactive 1a
15from prefect.client.cloud import get_cloud_client 1a
16from prefect.exceptions import ObjectNotFound 1a
17from prefect.settings import get_current_settings 1a
19webhook_app: PrefectTyper = PrefectTyper( 1a
20 name="webhook", help="Manage Prefect Cloud Webhooks"
21)
22cloud_app.add_typer(webhook_app, aliases=["webhooks"]) 1a
25def _render_webhooks_into_table(webhooks: List[Dict[str, str]]) -> Table: 1a
26 display_table = Table(show_lines=True)
27 for field in ["webhook id", "url slug", "name", "enabled?", "template"]:
28 # overflow=fold allows the entire table value to display in the terminal
29 # even if it is too long for the computed column width
30 # https://rich.readthedocs.io/en/stable/reference/table.html#rich.table.Column.overflow
31 display_table.add_column(field, overflow="fold")
33 for webhook in webhooks:
34 display_table.add_row(
35 webhook["id"],
36 webhook["slug"],
37 webhook["name"],
38 str(webhook["enabled"]),
39 webhook["template"],
40 )
41 return display_table
44@webhook_app.command() 1a
45async def ls(): 1a
46 """
47 Fetch and list all webhooks in your workspace
48 """
49 confirm_logged_in()
51 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree
52 async with get_cloud_client(host=get_current_settings().api.url) as client:
53 retrieved_webhooks = await client.request("POST", "/webhooks/filter")
54 display_table = _render_webhooks_into_table(retrieved_webhooks)
55 app.console.print(display_table)
58@webhook_app.command() 1a
59async def get(webhook_id: UUID): 1a
60 """
61 Retrieve a webhook by ID.
62 """
63 confirm_logged_in()
65 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree
66 async with get_cloud_client(host=get_current_settings().api.url) as client:
67 webhook = await client.request("GET", f"/webhooks/{webhook_id}")
68 display_table = _render_webhooks_into_table([webhook])
69 app.console.print(display_table)
72@webhook_app.command() 1a
73async def create( 1a
74 webhook_name: str,
75 description: str = typer.Option(
76 "", "--description", "-d", help="Description of the webhook"
77 ),
78 template: str = typer.Option(
79 None, "--template", "-t", help="Jinja2 template expression"
80 ),
81):
82 """
83 Create a new Cloud webhook
84 """
85 if not template:
86 exit_with_error(
87 "Please provide a Jinja2 template expression in the --template flag \nwhich"
88 ' should define (at minimum) the following attributes: \n{ "event":'
89 ' "your.event.name", "resource": { "prefect.resource.id":'
90 ' "your.resource.id" } }'
91 " \nhttps://docs.prefect.io/latest/automate/events/webhook-triggers#webhook-templates"
92 )
94 confirm_logged_in()
96 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree
97 async with get_cloud_client(host=get_current_settings().api.url) as client:
98 response = await client.request(
99 "POST",
100 "/webhooks/",
101 json={
102 "name": webhook_name,
103 "description": description,
104 "template": template,
105 },
106 )
107 app.console.print(f"Successfully created webhook {response['name']}")
110@webhook_app.command() 1a
111async def rotate(webhook_id: UUID): 1a
112 """
113 Rotate url for an existing Cloud webhook, in case it has been compromised
114 """
115 confirm_logged_in()
117 confirm_rotate = typer.confirm(
118 "Are you sure you want to rotate? This will invalidate the old URL."
119 )
121 if not confirm_rotate:
122 return
124 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree
125 async with get_cloud_client(host=get_current_settings().api.url) as client:
126 response = await client.request("POST", f"/webhooks/{webhook_id}/rotate")
127 app.console.print(f"Successfully rotated webhook URL to {response['slug']}")
130@webhook_app.command() 1a
131async def toggle( 1a
132 webhook_id: UUID,
133):
134 """
135 Toggle the enabled status of an existing Cloud webhook
136 """
137 confirm_logged_in()
139 status_lookup = {True: "enabled", False: "disabled"}
141 async with get_cloud_client(host=get_current_settings().api.url) as client:
142 response = await client.request("GET", f"/webhooks/{webhook_id}")
143 current_status = response["enabled"]
144 new_status = not current_status
146 await client.request(
147 "PATCH", f"/webhooks/{webhook_id}", json={"enabled": new_status}
148 )
149 app.console.print(f"Webhook is now {status_lookup[new_status]}")
152@webhook_app.command() 1a
153async def update( 1a
154 webhook_id: UUID,
155 webhook_name: str = typer.Option(None, "--name", "-n", help="Webhook name"),
156 description: str = typer.Option(
157 None, "--description", "-d", help="Description of the webhook"
158 ),
159 template: str = typer.Option(
160 None, "--template", "-t", help="Jinja2 template expression"
161 ),
162):
163 """
164 Partially update an existing Cloud webhook
165 """
166 confirm_logged_in()
168 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree
169 async with get_cloud_client(host=get_current_settings().api.url) as client:
170 response = await client.request("GET", f"/webhooks/{webhook_id}")
171 update_payload = {
172 "name": webhook_name or response["name"],
173 "description": description or response["description"],
174 "template": template or response["template"],
175 }
177 await client.request("PUT", f"/webhooks/{webhook_id}", json=update_payload)
178 app.console.print(f"Successfully updated webhook {webhook_id}")
181@webhook_app.command() 1a
182async def delete(webhook_id: UUID): 1a
183 """
184 Delete an existing Cloud webhook
185 """
186 confirm_logged_in()
188 if is_interactive() and not typer.confirm(
189 (f"Are you sure you want to delete webhook with id '{webhook_id!s}'?"),
190 default=False,
191 ):
192 exit_with_error("Deletion aborted.")
194 # The /webhooks API lives inside the /accounts/{id}/workspaces/{id} routing tree
195 async with get_cloud_client(host=get_current_settings().api.url) as client:
196 try:
197 await client.request("DELETE", f"/webhooks/{webhook_id}")
198 app.console.print(f"Successfully deleted webhook {webhook_id}")
199 except ObjectNotFound:
200 exit_with_error(f"Webhook with id '{webhook_id!s}' not found.")
201 except Exception as exc:
202 exit_with_error(f"Error deleting webhook: {exc}")