Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/webhook.py: 58%

30 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3from typing import Any 1a

4 

5from httpx import AsyncClient, AsyncHTTPTransport, Response 1a

6from pydantic import Field, HttpUrl, SecretStr 1a

7from typing_extensions import Literal 1a

8 

9from prefect.blocks.core import Block 1a

10from prefect.types import SecretDict 1a

11from prefect.utilities.urls import validate_restricted_url 1a

12 

13# Use a global HTTP transport to maintain a process-wide connection pool for 

14# interservice requests 

15_http_transport = AsyncHTTPTransport() 1a

16_insecure_http_transport = AsyncHTTPTransport(verify=False) 1a

17 

18 

19class Webhook(Block): 1a

20 """ 

21 Block that enables calling webhooks. 

22 """ 

23 

24 _block_type_name = "Webhook" 1a

25 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/c7247cb359eb6cf276734d4b1fbf00fb8930e89e-250x250.png" # type: ignore 1a

26 _documentation_url = HttpUrl( 1a

27 "https://docs.prefect.io/latest/automate/events/webhook-triggers" 

28 ) 

29 

30 method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"] = Field( 1a

31 default="POST", description="The webhook request method. Defaults to `POST`." 

32 ) 

33 

34 url: SecretStr = Field( 1a

35 default=..., 

36 title="Webhook URL", 

37 description="The webhook URL.", 

38 examples=["https://hooks.slack.com/XXX"], 

39 ) 

40 

41 headers: SecretDict = Field( 1a

42 default_factory=lambda: SecretDict(dict()), 

43 title="Webhook Headers", 

44 description="A dictionary of headers to send with the webhook request.", 

45 ) 

46 allow_private_urls: bool = Field( 1a

47 default=True, 

48 description="Whether to allow notifications to private URLs. Defaults to True.", 

49 ) 

50 verify: bool = Field( 1a

51 default=True, 

52 description="Whether or not to enforce a secure connection to the webhook.", 

53 ) 

54 

55 def block_initialization(self) -> None: 1a

56 if self.verify: 

57 self._client = AsyncClient(transport=_http_transport) 

58 else: 

59 self._client = AsyncClient(transport=_insecure_http_transport) 

60 

61 async def call(self, payload: dict[str, Any] | str | None = None) -> Response: 1a

62 """ 

63 Call the webhook. 

64 

65 Args: 

66 payload: an optional payload to send when calling the webhook. 

67 """ 

68 if not self.allow_private_urls: 

69 validate_restricted_url(self.url.get_secret_value()) 

70 

71 async with self._client: 

72 if isinstance(payload, str): 

73 return await self._client.request( 

74 method=self.method, 

75 url=self.url.get_secret_value(), 

76 headers=self.headers.get_secret_value(), 

77 content=payload, 

78 ) 

79 else: 

80 return await self._client.request( 

81 method=self.method, 

82 url=self.url.get_secret_value(), 

83 headers=self.headers.get_secret_value(), 

84 json=payload, 

85 )