Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/webhook.py: 58%
30 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3from typing import Any 1a
5from httpx import AsyncClient, AsyncHTTPTransport, Response 1a
6from pydantic import Field, HttpUrl, SecretStr 1a
7from typing_extensions import Literal 1a
9from prefect.blocks.core import Block 1a
10from prefect.types import SecretDict 1a
11from prefect.utilities.urls import validate_restricted_url 1a
13# Use a global HTTP transport to maintain a process-wide connection pool for
14# interservice requests
15_http_transport = AsyncHTTPTransport() 1a
16_insecure_http_transport = AsyncHTTPTransport(verify=False) 1a
19class Webhook(Block): 1a
20 """
21 Block that enables calling webhooks.
22 """
24 _block_type_name = "Webhook" 1a
25 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/c7247cb359eb6cf276734d4b1fbf00fb8930e89e-250x250.png" # type: ignore 1a
26 _documentation_url = HttpUrl( 1a
27 "https://docs.prefect.io/latest/automate/events/webhook-triggers"
28 )
30 method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"] = Field( 1a
31 default="POST", description="The webhook request method. Defaults to `POST`."
32 )
34 url: SecretStr = Field( 1a
35 default=...,
36 title="Webhook URL",
37 description="The webhook URL.",
38 examples=["https://hooks.slack.com/XXX"],
39 )
41 headers: SecretDict = Field( 1a
42 default_factory=lambda: SecretDict(dict()),
43 title="Webhook Headers",
44 description="A dictionary of headers to send with the webhook request.",
45 )
46 allow_private_urls: bool = Field( 1a
47 default=True,
48 description="Whether to allow notifications to private URLs. Defaults to True.",
49 )
50 verify: bool = Field( 1a
51 default=True,
52 description="Whether or not to enforce a secure connection to the webhook.",
53 )
55 def block_initialization(self) -> None: 1a
56 if self.verify:
57 self._client = AsyncClient(transport=_http_transport)
58 else:
59 self._client = AsyncClient(transport=_insecure_http_transport)
61 async def call(self, payload: dict[str, Any] | str | None = None) -> Response: 1a
62 """
63 Call the webhook.
65 Args:
66 payload: an optional payload to send when calling the webhook.
67 """
68 if not self.allow_private_urls:
69 validate_restricted_url(self.url.get_secret_value())
71 async with self._client:
72 if isinstance(payload, str):
73 return await self._client.request(
74 method=self.method,
75 url=self.url.get_secret_value(),
76 headers=self.headers.get_secret_value(),
77 content=payload,
78 )
79 else:
80 return await self._client.request(
81 method=self.method,
82 url=self.url.get_secret_value(),
83 headers=self.headers.get_secret_value(),
84 json=payload,
85 )