Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/notifications.py: 53%
251 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import logging 1a
4from abc import ABC 1a
5from typing import Any, Optional, cast 1a
7from pydantic import AnyHttpUrl, Field, HttpUrl, SecretStr 1a
8from typing_extensions import Literal 1a
10from prefect.blocks.abstract import NotificationBlock, NotificationError 1a
11from prefect.logging import LogEavesdropper 1a
12from prefect.types import SecretDict 1a
13from prefect.utilities.asyncutils import sync_compatible 1a
14from prefect.utilities.templating import apply_values, find_placeholders 1a
15from prefect.utilities.urls import validate_restricted_url 1a
17PREFECT_NOTIFY_TYPE_DEFAULT = "info" # Use a valid apprise type as default 1a
20class AbstractAppriseNotificationBlock(NotificationBlock, ABC): 1a
21 """
22 An abstract class for sending notifications using Apprise.
23 """
25 notify_type: Literal["info", "success", "warning", "failure"] = Field( 1a
26 default=PREFECT_NOTIFY_TYPE_DEFAULT,
27 description="The type of notification being performed.",
28 )
30 def __init__(self, *args: Any, **kwargs: Any): 1a
31 super().__init__(*args, **kwargs)
33 def _start_apprise_client(self, url: SecretStr): 1a
34 from apprise import Apprise, AppriseAsset
36 # A custom `AppriseAsset` that ensures Prefect Notifications
37 # appear correctly across multiple messaging platforms
38 prefect_app_data = AppriseAsset(
39 app_id="Prefect Notifications",
40 app_desc="Prefect Notifications",
41 app_url="https://prefect.io",
42 )
44 self._apprise_client = Apprise(asset=prefect_app_data)
45 self._apprise_client.add(servers=url.get_secret_value()) # pyright: ignore[reportUnknownMemberType]
47 def block_initialization(self) -> None: 1a
48 self._start_apprise_client(getattr(self, "url"))
50 @sync_compatible 1a
51 async def notify( # pyright: ignore[reportIncompatibleMethodOverride] TODO: update to sync only once base class is updated 1a
52 self,
53 body: str,
54 subject: str | None = None,
55 ) -> None:
56 with LogEavesdropper("apprise", level=logging.DEBUG) as eavesdropper:
57 result = await self._apprise_client.async_notify( # pyright: ignore[reportUnknownMemberType] incomplete type hints in apprise
58 body=body,
59 title=subject or "",
60 notify_type=self.notify_type, # pyright: ignore[reportArgumentType]
61 )
62 if not result and self._raise_on_failure:
63 raise NotificationError(log=eavesdropper.text())
66class AppriseNotificationBlock(AbstractAppriseNotificationBlock, ABC): 1a
67 """
68 A base class for sending notifications using Apprise, through webhook URLs.
69 """
71 _documentation_url = HttpUrl( 1a
72 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
73 )
74 url: SecretStr = Field( 1a
75 default=...,
76 title="Webhook URL",
77 description="Incoming webhook URL used to send notifications.",
78 examples=["https://hooks.example.com/XXX"],
79 )
80 allow_private_urls: bool = Field( 1a
81 default=True,
82 description="Whether to allow notifications to private URLs. Defaults to True.",
83 )
85 @sync_compatible 1a
86 async def notify( # pyright: ignore[reportIncompatibleMethodOverride] TODO: update to sync only once base class is updated 1a
87 self,
88 body: str,
89 subject: str | None = None,
90 ):
91 if not self.allow_private_urls:
92 try:
93 validate_restricted_url(self.url.get_secret_value())
94 except ValueError as exc:
95 if self._raise_on_failure:
96 raise NotificationError(str(exc))
97 raise
99 await super().notify(body, subject) # pyright: ignore[reportGeneralTypeIssues] TODO: update to sync only once base class is updated
102# TODO: Move to prefect-slack once collection block auto-registration is
103# available
104class SlackWebhook(AppriseNotificationBlock): 1a
105 """
106 Enables sending notifications via a provided Slack webhook.
108 Examples:
109 Load a saved Slack webhook and send a message:
110 ```python
111 from prefect.blocks.notifications import SlackWebhook
113 slack_webhook_block = SlackWebhook.load("BLOCK_NAME")
114 slack_webhook_block.notify("Hello from Prefect!")
115 ```
116 """
118 _block_type_name = "Slack Webhook" 1a
119 _logo_url = HttpUrl( 1a
120 "https://cdn.sanity.io/images/3ugk85nk/production/c1965ecbf8704ee1ea20d77786de9a41ce1087d1-500x500.png"
121 )
122 _documentation_url = HttpUrl( 1a
123 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
124 )
126 url: SecretStr = Field( 1a
127 default=...,
128 title="Webhook URL",
129 description="Slack incoming webhook URL used to send notifications.",
130 examples=["https://hooks.slack.com/XXX"],
131 )
134class MicrosoftTeamsWebhook(AppriseNotificationBlock): 1a
135 """
136 Enables sending notifications via a provided Microsoft Teams webhook.
138 Examples:
139 Load a saved Teams webhook and send a message:
140 ```python
141 from prefect.blocks.notifications import MicrosoftTeamsWebhook
142 teams_webhook_block = MicrosoftTeamsWebhook.load("BLOCK_NAME")
143 teams_webhook_block.notify("Hello from Prefect!")
144 ```
145 """
147 _block_type_name = "Microsoft Teams Webhook" 1a
148 _block_type_slug = "ms-teams-webhook" 1a
149 _logo_url = HttpUrl( 1a
150 "https://cdn.sanity.io/images/3ugk85nk/production/817efe008a57f0a24f3587414714b563e5e23658-250x250.png"
151 )
152 _documentation_url = HttpUrl( 1a
153 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
154 )
156 url: SecretStr = Field( 1a
157 default=...,
158 title="Webhook URL",
159 description="The Microsoft Power Automate (Workflows) URL used to send notifications to Teams.",
160 examples=[
161 "https://prod-NO.LOCATION.logic.azure.com:443/workflows/WFID/triggers/manual/paths/invoke?api-version=2016-06-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=SIGNATURE"
162 ],
163 )
165 include_image: bool = Field( 1a
166 default=True,
167 description="Include an image with the notification.",
168 )
170 wrap: bool = Field( 1a
171 default=True,
172 description="Wrap the notification text.",
173 )
175 def block_initialization(self) -> None: 1a
176 """see https://github.com/caronc/apprise/pull/1172"""
177 from apprise.plugins.workflows import NotifyWorkflows
179 if not (
180 parsed_url := cast(
181 dict[str, Any],
182 NotifyWorkflows.parse_native_url(self.url.get_secret_value()), # pyright: ignore[reportUnknownMemberType] incomplete type hints in apprise
183 )
184 ):
185 raise ValueError("Invalid Microsoft Teams Workflow URL provided.")
187 parsed_url |= {"include_image": self.include_image, "wrap": self.wrap}
189 self._start_apprise_client(SecretStr(NotifyWorkflows(**parsed_url).url())) # pyright: ignore[reportUnknownMemberType] incomplete type hints in apprise
192class PagerDutyWebHook(AbstractAppriseNotificationBlock): 1a
193 """
194 Enables sending notifications via a provided PagerDuty webhook.
195 See [Apprise notify_pagerduty docs](https://github.com/caronc/apprise/wiki/Notify_pagerduty)
196 for more info on formatting the URL.
198 Examples:
199 Load a saved PagerDuty webhook and send a message:
200 ```python
201 from prefect.blocks.notifications import PagerDutyWebHook
202 pagerduty_webhook_block = PagerDutyWebHook.load("BLOCK_NAME")
203 pagerduty_webhook_block.notify("Hello from Prefect!")
204 ```
205 """
207 _description = "Enables sending notifications via a provided PagerDuty webhook." 1a
209 _block_type_name = "Pager Duty Webhook" 1a
210 _block_type_slug = "pager-duty-webhook" 1a
211 _logo_url = HttpUrl( 1a
212 "https://cdn.sanity.io/images/3ugk85nk/production/8dbf37d17089c1ce531708eac2e510801f7b3aee-250x250.png"
213 )
214 _documentation_url = HttpUrl( 1a
215 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
216 )
218 # PagerDuty requires a valid severity level from its PAGERDUTY_SEVERITY_MAP
219 notify_type: Literal["info", "success", "warning", "failure"] = Field( # pyright: ignore[reportIncompatibleVariableOverride] 1a
220 default="info", description="The severity of the notification."
221 )
223 integration_key: SecretStr = Field( 1a
224 default=...,
225 description=(
226 "This can be found on the Events API V2 "
227 "integration's detail page, and is also referred to as a Routing Key. "
228 "This must be provided alongside `api_key`, but will error if provided "
229 "alongside `url`."
230 ),
231 )
233 api_key: SecretStr = Field( 1a
234 default=...,
235 title="API Key",
236 description=(
237 "This can be found under Integrations. "
238 "This must be provided alongside `integration_key`, but will error if "
239 "provided alongside `url`."
240 ),
241 )
243 source: Optional[str] = Field( 1a
244 default="Prefect", description="The source string as part of the payload."
245 )
247 component: str = Field( 1a
248 default="Notification",
249 description="The component string as part of the payload.",
250 )
252 group: Optional[str] = Field( 1a
253 default=None, description="The group string as part of the payload."
254 )
256 class_id: Optional[str] = Field( 1a
257 default=None,
258 title="Class ID",
259 description="The class string as part of the payload.",
260 )
262 region_name: Literal["us", "eu"] = Field( 1a
263 default="us", description="The region name."
264 )
266 clickable_url: Optional[AnyHttpUrl] = Field( 1a
267 default=None,
268 title="Clickable URL",
269 description="A clickable URL to associate with the notice.",
270 )
272 include_image: bool = Field( 1a
273 default=True,
274 description="Associate the notification status via a represented icon.",
275 )
277 custom_details: Optional[dict[str, str]] = Field( 1a
278 default=None,
279 description="Additional details to include as part of the payload.",
280 examples=['{"disk_space_left": "145GB"}'],
281 )
283 def block_initialization(self) -> None: 1a
284 try:
285 # Try importing for apprise>=1.18.0
286 from apprise.plugins.pagerduty import NotifyPagerDuty
287 except ImportError:
288 # Fallback for versions apprise<1.18.0
289 from apprise.plugins.NotifyPagerDuty import ( # pyright: ignore[reportMissingImports] this is a fallback
290 NotifyPagerDuty, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise
291 )
293 url = SecretStr(
294 NotifyPagerDuty(
295 apikey=self.api_key.get_secret_value(),
296 integrationkey=self.integration_key.get_secret_value(),
297 source=self.source,
298 component=self.component,
299 group=self.group,
300 class_id=self.class_id,
301 region_name=self.region_name,
302 click=self.clickable_url,
303 include_image=self.include_image,
304 details=self.custom_details,
305 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise
306 )
307 self._start_apprise_client(url)
309 @sync_compatible 1a
310 async def notify( # pyright: ignore[reportIncompatibleMethodOverride] TODO: update to sync only once base class is updated 1a
311 self,
312 body: str,
313 subject: str | None = None,
314 ):
315 """
316 Apprise will combine subject and body by default, so we need to move
317 the body into the custom_details field. custom_details is part of the
318 webhook url, so we need to update the url and restart the client.
319 """
320 if subject:
321 self.custom_details = self.custom_details or {}
322 self.custom_details.update(
323 {"Prefect Notification Body": body.replace(" ", "%20")}
324 )
325 body = " "
326 self.block_initialization()
328 await super().notify(body, subject) # pyright: ignore[reportGeneralTypeIssues] TODO: update to sync only once base class is updated
331class TwilioSMS(AbstractAppriseNotificationBlock): 1a
332 """Enables sending notifications via Twilio SMS.
333 Find more on sending Twilio SMS messages in the [docs](https://www.twilio.com/docs/sms).
335 Examples:
336 Load a saved `TwilioSMS` block and send a message:
337 ```python
338 from prefect.blocks.notifications import TwilioSMS
339 twilio_webhook_block = TwilioSMS.load("BLOCK_NAME")
340 twilio_webhook_block.notify("Hello from Prefect!")
341 ```
342 """
344 _description = "Enables sending notifications via Twilio SMS." 1a
345 _block_type_name = "Twilio SMS" 1a
346 _block_type_slug = "twilio-sms" 1a
347 _logo_url = HttpUrl( 1a
348 "https://cdn.sanity.io/images/3ugk85nk/production/8bd8777999f82112c09b9c8d57083ac75a4a0d65-250x250.png"
349 ) # noqa
350 _documentation_url = HttpUrl( 1a
351 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
352 )
354 account_sid: str = Field( 1a
355 default=...,
356 description=(
357 "The Twilio Account SID - it can be found on the homepage "
358 "of the Twilio console."
359 ),
360 )
362 auth_token: SecretStr = Field( 1a
363 default=...,
364 description=(
365 "The Twilio Authentication Token - "
366 "it can be found on the homepage of the Twilio console."
367 ),
368 )
370 from_phone_number: str = Field( 1a
371 default=...,
372 description="The valid Twilio phone number to send the message from.",
373 examples=["18001234567"],
374 )
376 to_phone_numbers: list[str] = Field( 1a
377 default=...,
378 description="A list of valid Twilio phone number(s) to send the message to.",
379 # not wrapped in brackets because of the way UI displays examples; in code should be ["18004242424"]
380 examples=["18004242424"],
381 )
383 def block_initialization(self) -> None: 1a
384 try:
385 # Try importing for apprise>=1.18.0
386 from apprise.plugins.twilio import NotifyTwilio
387 except ImportError:
388 # Fallback for versions apprise<1.18.0
389 from apprise.plugins.NotifyTwilio import ( # pyright: ignore[reportMissingImports] this is a fallback
390 NotifyTwilio, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise
391 )
393 url = SecretStr(
394 NotifyTwilio(
395 account_sid=self.account_sid,
396 auth_token=self.auth_token.get_secret_value(),
397 source=self.from_phone_number,
398 targets=self.to_phone_numbers,
399 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise
400 )
401 self._start_apprise_client(url)
404class OpsgenieWebhook(AbstractAppriseNotificationBlock): 1a
405 """
406 Enables sending notifications via a provided Opsgenie webhook.
407 See [Apprise notify_opsgenie docs](https://github.com/caronc/apprise/wiki/Notify_opsgenie)
408 for more info on formatting the URL.
410 Examples:
411 Load a saved Opsgenie webhook and send a message:
412 ```python
413 from prefect.blocks.notifications import OpsgenieWebhook
414 opsgenie_webhook_block = OpsgenieWebhook.load("BLOCK_NAME")
415 opsgenie_webhook_block.notify("Hello from Prefect!")
416 ```
417 """
419 _description = "Enables sending notifications via a provided Opsgenie webhook." 1a
421 _block_type_name = "Opsgenie Webhook" 1a
422 _block_type_slug = "opsgenie-webhook" 1a
423 _logo_url = HttpUrl( 1a
424 "https://cdn.sanity.io/images/3ugk85nk/production/d8b5bc6244ae6cd83b62ec42f10d96e14d6e9113-280x280.png"
425 )
426 _documentation_url = HttpUrl( 1a
427 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
428 )
430 apikey: SecretStr = Field( 1a
431 default=...,
432 title="API Key",
433 description="The API Key associated with your Opsgenie account.",
434 )
436 target_user: Optional[list[str]] = Field( 1a
437 default=None, description="The user(s) you wish to notify."
438 )
440 target_team: Optional[list[str]] = Field( 1a
441 default=None, description="The team(s) you wish to notify."
442 )
444 target_schedule: Optional[list[str]] = Field( 1a
445 default=None, description="The schedule(s) you wish to notify."
446 )
448 target_escalation: Optional[list[str]] = Field( 1a
449 default=None, description="The escalation(s) you wish to notify."
450 )
452 region_name: Literal["us", "eu"] = Field( 1a
453 default="us", description="The 2-character region code."
454 )
456 batch: bool = Field( 1a
457 default=False,
458 description="Notify all targets in batches (instead of individually).",
459 )
461 tags: Optional[list[str]] = Field( 1a
462 default=None,
463 description=(
464 "A comma-separated list of tags you can associate with your Opsgenie"
465 " message."
466 ),
467 examples=['["tag1", "tag2"]'],
468 )
470 priority: Optional[int] = Field( 1a
471 default=3,
472 description=(
473 "The priority to associate with the message. It is on a scale between 1"
474 " (LOW) and 5 (EMERGENCY)."
475 ),
476 )
478 alias: Optional[str] = Field( 1a
479 default=None, description="The alias to associate with the message."
480 )
482 entity: Optional[str] = Field( 1a
483 default=None, description="The entity to associate with the message."
484 )
486 details: Optional[dict[str, str]] = Field( 1a
487 default=None,
488 description="Additional details composed of key/values pairs.",
489 examples=['{"key1": "value1", "key2": "value2"}'],
490 )
492 def block_initialization(self) -> None: 1a
493 try:
494 # Try importing for apprise>=1.18.0
495 from apprise.plugins.opsgenie import NotifyOpsgenie
496 except ImportError:
497 # Fallback for versions apprise<1.18.0
498 from apprise.plugins.NotifyOpsgenie import ( # pyright: ignore[reportMissingImports] this is a fallback
499 NotifyOpsgenie, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise
500 )
502 targets: list[str] = []
503 if self.target_user:
504 [targets.append(f"@{x}") for x in self.target_user]
505 if self.target_team:
506 [targets.append(f"#{x}") for x in self.target_team]
507 if self.target_schedule:
508 [targets.append(f"*{x}") for x in self.target_schedule]
509 if self.target_escalation:
510 [targets.append(f"^{x}") for x in self.target_escalation]
511 url = SecretStr(
512 NotifyOpsgenie(
513 apikey=self.apikey.get_secret_value(),
514 targets=targets,
515 region_name=self.region_name,
516 details=self.details,
517 priority=self.priority,
518 alias=self.alias,
519 entity=self.entity,
520 batch=self.batch,
521 tags=self.tags,
522 action="new",
523 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise
524 )
525 self._start_apprise_client(url)
528class MattermostWebhook(AbstractAppriseNotificationBlock): 1a
529 """
530 Enables sending notifications via a provided Mattermost webhook.
531 See [Apprise notify_Mattermost docs](https://github.com/caronc/apprise/wiki/Notify_Mattermost) # noqa
534 Examples:
535 Load a saved Mattermost webhook and send a message:
536 ```python
537 from prefect.blocks.notifications import MattermostWebhook
539 mattermost_webhook_block = MattermostWebhook.load("BLOCK_NAME")
541 mattermost_webhook_block.notify("Hello from Prefect!")
542 ```
543 """
545 _description = "Enables sending notifications via a provided Mattermost webhook." 1a
546 _block_type_name = "Mattermost Webhook" 1a
547 _block_type_slug = "mattermost-webhook" 1a
548 _logo_url = HttpUrl( 1a
549 "https://cdn.sanity.io/images/3ugk85nk/production/1350a147130bf82cbc799a5f868d2c0116207736-250x250.png"
550 )
551 _documentation_url = HttpUrl( 1a
552 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
553 )
555 hostname: str = Field( 1a
556 default=...,
557 description="The hostname of your Mattermost server.",
558 examples=["Mattermost.example.com"],
559 )
560 secure: bool = Field( 1a
561 default=False,
562 description="Whether to use secure https connection.",
563 )
565 token: SecretStr = Field( 1a
566 default=...,
567 description="The token associated with your Mattermost webhook.",
568 )
570 botname: Optional[str] = Field( 1a
571 title="Bot name",
572 default=None,
573 description="The name of the bot that will send the message.",
574 )
576 channels: Optional[list[str]] = Field( 1a
577 default=None,
578 description="The channel(s) you wish to notify.",
579 )
581 include_image: bool = Field( 1a
582 default=False,
583 description="Whether to include the Apprise status image in the message.",
584 )
586 path: Optional[str] = Field( 1a
587 default=None,
588 description="An optional sub-path specification to append to the hostname.",
589 )
591 port: int = Field( 1a
592 default=8065,
593 description="The port of your Mattermost server.",
594 )
596 def block_initialization(self) -> None: 1a
597 try:
598 # Try importing for apprise>=1.18.0
599 from apprise.plugins.mattermost import NotifyMattermost
600 except ImportError:
601 # Fallback for versions apprise<1.18.0
602 from apprise.plugins.NotifyMattermost import ( # pyright: ignore[reportMissingImports] this is a fallback
603 NotifyMattermost, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise
604 )
606 url = SecretStr(
607 NotifyMattermost(
608 token=self.token.get_secret_value(),
609 fullpath=self.path,
610 host=self.hostname,
611 botname=self.botname,
612 channels=self.channels,
613 include_image=self.include_image,
614 port=self.port,
615 secure=self.secure,
616 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise
617 )
618 self._start_apprise_client(url)
621class DiscordWebhook(AbstractAppriseNotificationBlock): 1a
622 """
623 Enables sending notifications via a provided Discord webhook.
624 See [Apprise notify_Discord docs](https://github.com/caronc/apprise/wiki/Notify_Discord) # noqa
626 Examples:
627 Load a saved Discord webhook and send a message:
628 ```python
629 from prefect.blocks.notifications import DiscordWebhook
631 discord_webhook_block = DiscordWebhook.load("BLOCK_NAME")
633 discord_webhook_block.notify("Hello from Prefect!")
634 ```
635 """
637 _description = "Enables sending notifications via a provided Discord webhook." 1a
638 _block_type_name = "Discord Webhook" 1a
639 _block_type_slug = "discord-webhook" 1a
640 _logo_url = HttpUrl( 1a
641 "https://cdn.sanity.io/images/3ugk85nk/production/9e94976c80ef925b66d24e5d14f0d47baa6b8f88-250x250.png"
642 )
643 _documentation_url = HttpUrl( 1a
644 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
645 )
647 webhook_id: SecretStr = Field( 1a
648 default=...,
649 description=(
650 "The first part of 2 tokens provided to you after creating a"
651 " incoming-webhook."
652 ),
653 )
655 webhook_token: SecretStr = Field( 1a
656 default=...,
657 description=(
658 "The second part of 2 tokens provided to you after creating a"
659 " incoming-webhook."
660 ),
661 )
663 botname: Optional[str] = Field( 1a
664 title="Bot name",
665 default=None,
666 description=(
667 "Identify the name of the bot that should issue the message. If one isn't"
668 " specified then the default is to just use your account (associated with"
669 " the incoming-webhook)."
670 ),
671 )
673 tts: bool = Field( 1a
674 default=False,
675 description="Whether to enable Text-To-Speech.",
676 )
678 include_image: bool = Field( 1a
679 default=False,
680 description=(
681 "Whether to include an image in-line with the message describing the"
682 " notification type."
683 ),
684 )
686 avatar: bool = Field( 1a
687 default=False,
688 description="Whether to override the default discord avatar icon.",
689 )
691 avatar_url: Optional[str] = Field( 1a
692 title="Avatar URL",
693 default=None,
694 description=(
695 "Over-ride the default discord avatar icon URL. By default this is not set"
696 " and Apprise chooses the URL dynamically based on the type of message"
697 " (info, success, warning, or error)."
698 ),
699 )
701 def block_initialization(self) -> None: 1a
702 try:
703 # Try importing for apprise>=1.18.0
704 from apprise.plugins.discord import NotifyDiscord
705 except ImportError:
706 # Fallback for versions apprise<1.18.0
707 from apprise.plugins.NotifyDiscord import ( # pyright: ignore[reportMissingImports] this is a fallback
708 NotifyDiscord, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise
709 )
711 url = SecretStr(
712 NotifyDiscord(
713 webhook_id=self.webhook_id.get_secret_value(),
714 webhook_token=self.webhook_token.get_secret_value(),
715 botname=self.botname,
716 tts=self.tts,
717 include_image=self.include_image,
718 avatar=self.avatar,
719 avatar_url=self.avatar_url,
720 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise
721 )
722 self._start_apprise_client(url)
725class CustomWebhookNotificationBlock(NotificationBlock): 1a
726 """
727 Enables sending notifications via any custom webhook.
729 All nested string param contains `{{key}}` will be substituted with value from context/secrets.
731 Context values include: `subject`, `body` and `name`.
733 Examples:
734 Load a saved custom webhook and send a message:
735 ```python
736 from prefect.blocks.notifications import CustomWebhookNotificationBlock
738 custom_webhook_block = CustomWebhookNotificationBlock.load("BLOCK_NAME")
740 custom_webhook_block.notify("Hello from Prefect!")
741 ```
742 """
744 _block_type_name = "Custom Webhook" 1a
745 _logo_url = HttpUrl( 1a
746 "https://cdn.sanity.io/images/3ugk85nk/production/c7247cb359eb6cf276734d4b1fbf00fb8930e89e-250x250.png"
747 )
748 _documentation_url = HttpUrl( 1a
749 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
750 )
752 name: str = Field(title="Name", description="Name of the webhook.") 1a
754 url: str = Field( 1a
755 title="Webhook URL",
756 description="The webhook URL.",
757 examples=["https://hooks.slack.com/XXX"],
758 )
760 method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"] = Field( 1a
761 default="POST", description="The webhook request method. Defaults to `POST`."
762 )
764 params: Optional[dict[str, str]] = Field( 1a
765 default=None, title="Query Params", description="Custom query params."
766 )
767 json_data: Optional[dict[str, Any]] = Field( 1a
768 default=None,
769 title="JSON Data",
770 description="Send json data as payload.",
771 examples=[
772 '{"text": "{{subject}}\\n{{body}}", "title": "{{name}}", "token":'
773 ' "{{tokenFromSecrets}}"}'
774 ],
775 )
776 form_data: Optional[dict[str, str]] = Field( 1a
777 default=None,
778 title="Form Data",
779 description=(
780 "Send form data as payload. Should not be used together with _JSON Data_."
781 ),
782 examples=[
783 '{"text": "{{subject}}\\n{{body}}", "title": "{{name}}", "token":'
784 ' "{{tokenFromSecrets}}"}'
785 ],
786 )
788 headers: Optional[dict[str, str]] = Field(None, description="Custom headers.") 1a
789 cookies: Optional[dict[str, str]] = Field(None, description="Custom cookies.") 1a
791 timeout: float = Field( 1a
792 default=10, description="Request timeout in seconds. Defaults to 10."
793 )
795 secrets: SecretDict = Field( 1a
796 default_factory=lambda: SecretDict(dict()),
797 title="Custom Secret Values",
798 description="A dictionary of secret values to be substituted in other configs.",
799 examples=['{"tokenFromSecrets":"SomeSecretToken"}'],
800 )
802 def _build_request_args(self, body: str, subject: str | None) -> dict[str, Any]: 1a
803 """Build kwargs for httpx.AsyncClient.request"""
804 # prepare values
805 values = self.secrets.get_secret_value()
806 # use 'null' when subject is None
807 values.update(
808 {
809 "subject": "null" if subject is None else subject,
810 "body": body,
811 "name": self.name,
812 }
813 )
814 # do substution
815 return apply_values(
816 {
817 "method": self.method,
818 "url": self.url,
819 "params": self.params,
820 "data": self.form_data,
821 "json": self.json_data,
822 "headers": self.headers,
823 "cookies": self.cookies,
824 "timeout": self.timeout,
825 },
826 values,
827 )
829 def block_initialization(self) -> None: 1a
830 # check form_data and json_data
831 if self.form_data is not None and self.json_data is not None:
832 raise ValueError("both `Form Data` and `JSON Data` provided")
833 allowed_keys = {"subject", "body", "name"}.union(
834 self.secrets.get_secret_value().keys()
835 )
836 # test template to raise a error early
837 for name in ["url", "params", "form_data", "json_data", "headers", "cookies"]:
838 template = getattr(self, name)
839 if template is None:
840 continue
841 # check for placeholders not in predefined keys and secrets
842 placeholders = find_placeholders(template)
843 for placeholder in placeholders:
844 if placeholder.name not in allowed_keys:
845 raise KeyError(f"{name}/{placeholder}")
847 @sync_compatible 1a
848 async def notify(self, body: str, subject: str | None = None) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 1a
849 import httpx
851 request_args = self._build_request_args(body, subject)
852 cookies = request_args.pop("cookies", dict())
853 # make request with httpx
854 client = httpx.AsyncClient(
855 headers={"user-agent": "Prefect Notifications"}, cookies=cookies
856 )
857 async with client:
858 resp = await client.request(**request_args)
859 resp.raise_for_status()
862class SendgridEmail(AbstractAppriseNotificationBlock): 1a
863 """
864 Enables sending notifications via any sendgrid account.
865 See [Apprise Notify_sendgrid docs](https://github.com/caronc/apprise/wiki/Notify_Sendgrid)
867 Examples:
868 Load a saved Sendgrid and send a email message:
869 ```python
870 from prefect.blocks.notifications import SendgridEmail
872 sendgrid_block = SendgridEmail.load("BLOCK_NAME")
874 sendgrid_block.notify("Hello from Prefect!")
875 ```
876 """
878 _description = "Enables sending notifications via Sendgrid email service." 1a
879 _block_type_name = "Sendgrid Email" 1a
880 _block_type_slug = "sendgrid-email" 1a
881 _logo_url = HttpUrl( 1a
882 "https://cdn.sanity.io/images/3ugk85nk/production/82bc6ed16ca42a2252a5512c72233a253b8a58eb-250x250.png"
883 )
884 _documentation_url = HttpUrl( 1a
885 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations"
886 )
888 api_key: SecretStr = Field( 1a
889 default=...,
890 title="API Key",
891 description="The API Key associated with your sendgrid account.",
892 )
894 sender_email: str = Field( 1a
895 title="Sender email id",
896 description="The sender email id.",
897 examples=["test-support@gmail.com"],
898 )
900 to_emails: list[str] = Field( 1a
901 default=...,
902 title="Recipient emails",
903 description="Email ids of all recipients.",
904 examples=['"recipient1@gmail.com"'],
905 )
907 def block_initialization(self) -> None: 1a
908 try:
909 # Try importing for apprise>=1.18.0
910 from apprise.plugins.sendgrid import NotifySendGrid
911 except ImportError:
912 # Fallback for versions apprise<1.18.0
913 from apprise.plugins.NotifySendGrid import ( # pyright: ignore[reportMissingImports] this is a fallback
914 NotifySendGrid, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise
915 )
917 self._NotifySendGrid = NotifySendGrid # Cache the working import
918 url = self._build_sendgrid_url()
919 self._start_apprise_client(url)
921 def _build_sendgrid_url(self) -> SecretStr: 1a
922 """Build the SendGrid URL with current to_emails."""
923 return SecretStr(
924 self._NotifySendGrid(
925 apikey=self.api_key.get_secret_value(),
926 from_email=self.sender_email,
927 targets=self.to_emails,
928 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise
929 )
931 @sync_compatible 1a
932 async def notify( 1a
933 self,
934 body: str,
935 subject: str | None = None,
936 ):
937 # Update apprise client with current to_emails before sending
938 if hasattr(self, "_apprise_client") and self._apprise_client:
939 self._apprise_client.clear()
940 self._apprise_client.add(
941 servers=self._build_sendgrid_url().get_secret_value()
942 )
944 await super().notify(body, subject)