Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/notifications.py: 53%

251 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import logging 1a

4from abc import ABC 1a

5from typing import Any, Optional, cast 1a

6 

7from pydantic import AnyHttpUrl, Field, HttpUrl, SecretStr 1a

8from typing_extensions import Literal 1a

9 

10from prefect.blocks.abstract import NotificationBlock, NotificationError 1a

11from prefect.logging import LogEavesdropper 1a

12from prefect.types import SecretDict 1a

13from prefect.utilities.asyncutils import sync_compatible 1a

14from prefect.utilities.templating import apply_values, find_placeholders 1a

15from prefect.utilities.urls import validate_restricted_url 1a

16 

17PREFECT_NOTIFY_TYPE_DEFAULT = "info" # Use a valid apprise type as default 1a

18 

19 

20class AbstractAppriseNotificationBlock(NotificationBlock, ABC): 1a

21 """ 

22 An abstract class for sending notifications using Apprise. 

23 """ 

24 

25 notify_type: Literal["info", "success", "warning", "failure"] = Field( 1a

26 default=PREFECT_NOTIFY_TYPE_DEFAULT, 

27 description="The type of notification being performed.", 

28 ) 

29 

30 def __init__(self, *args: Any, **kwargs: Any): 1a

31 super().__init__(*args, **kwargs) 

32 

33 def _start_apprise_client(self, url: SecretStr): 1a

34 from apprise import Apprise, AppriseAsset 

35 

36 # A custom `AppriseAsset` that ensures Prefect Notifications 

37 # appear correctly across multiple messaging platforms 

38 prefect_app_data = AppriseAsset( 

39 app_id="Prefect Notifications", 

40 app_desc="Prefect Notifications", 

41 app_url="https://prefect.io", 

42 ) 

43 

44 self._apprise_client = Apprise(asset=prefect_app_data) 

45 self._apprise_client.add(servers=url.get_secret_value()) # pyright: ignore[reportUnknownMemberType] 

46 

47 def block_initialization(self) -> None: 1a

48 self._start_apprise_client(getattr(self, "url")) 

49 

50 @sync_compatible 1a

51 async def notify( # pyright: ignore[reportIncompatibleMethodOverride] TODO: update to sync only once base class is updated 1a

52 self, 

53 body: str, 

54 subject: str | None = None, 

55 ) -> None: 

56 with LogEavesdropper("apprise", level=logging.DEBUG) as eavesdropper: 

57 result = await self._apprise_client.async_notify( # pyright: ignore[reportUnknownMemberType] incomplete type hints in apprise 

58 body=body, 

59 title=subject or "", 

60 notify_type=self.notify_type, # pyright: ignore[reportArgumentType] 

61 ) 

62 if not result and self._raise_on_failure: 

63 raise NotificationError(log=eavesdropper.text()) 

64 

65 

66class AppriseNotificationBlock(AbstractAppriseNotificationBlock, ABC): 1a

67 """ 

68 A base class for sending notifications using Apprise, through webhook URLs. 

69 """ 

70 

71 _documentation_url = HttpUrl( 1a

72 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

73 ) 

74 url: SecretStr = Field( 1a

75 default=..., 

76 title="Webhook URL", 

77 description="Incoming webhook URL used to send notifications.", 

78 examples=["https://hooks.example.com/XXX"], 

79 ) 

80 allow_private_urls: bool = Field( 1a

81 default=True, 

82 description="Whether to allow notifications to private URLs. Defaults to True.", 

83 ) 

84 

85 @sync_compatible 1a

86 async def notify( # pyright: ignore[reportIncompatibleMethodOverride] TODO: update to sync only once base class is updated 1a

87 self, 

88 body: str, 

89 subject: str | None = None, 

90 ): 

91 if not self.allow_private_urls: 

92 try: 

93 validate_restricted_url(self.url.get_secret_value()) 

94 except ValueError as exc: 

95 if self._raise_on_failure: 

96 raise NotificationError(str(exc)) 

97 raise 

98 

99 await super().notify(body, subject) # pyright: ignore[reportGeneralTypeIssues] TODO: update to sync only once base class is updated 

100 

101 

102# TODO: Move to prefect-slack once collection block auto-registration is 

103# available 

104class SlackWebhook(AppriseNotificationBlock): 1a

105 """ 

106 Enables sending notifications via a provided Slack webhook. 

107 

108 Examples: 

109 Load a saved Slack webhook and send a message: 

110 ```python 

111 from prefect.blocks.notifications import SlackWebhook 

112 

113 slack_webhook_block = SlackWebhook.load("BLOCK_NAME") 

114 slack_webhook_block.notify("Hello from Prefect!") 

115 ``` 

116 """ 

117 

118 _block_type_name = "Slack Webhook" 1a

119 _logo_url = HttpUrl( 1a

120 "https://cdn.sanity.io/images/3ugk85nk/production/c1965ecbf8704ee1ea20d77786de9a41ce1087d1-500x500.png" 

121 ) 

122 _documentation_url = HttpUrl( 1a

123 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

124 ) 

125 

126 url: SecretStr = Field( 1a

127 default=..., 

128 title="Webhook URL", 

129 description="Slack incoming webhook URL used to send notifications.", 

130 examples=["https://hooks.slack.com/XXX"], 

131 ) 

132 

133 

134class MicrosoftTeamsWebhook(AppriseNotificationBlock): 1a

135 """ 

136 Enables sending notifications via a provided Microsoft Teams webhook. 

137 

138 Examples: 

139 Load a saved Teams webhook and send a message: 

140 ```python 

141 from prefect.blocks.notifications import MicrosoftTeamsWebhook 

142 teams_webhook_block = MicrosoftTeamsWebhook.load("BLOCK_NAME") 

143 teams_webhook_block.notify("Hello from Prefect!") 

144 ``` 

145 """ 

146 

147 _block_type_name = "Microsoft Teams Webhook" 1a

148 _block_type_slug = "ms-teams-webhook" 1a

149 _logo_url = HttpUrl( 1a

150 "https://cdn.sanity.io/images/3ugk85nk/production/817efe008a57f0a24f3587414714b563e5e23658-250x250.png" 

151 ) 

152 _documentation_url = HttpUrl( 1a

153 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

154 ) 

155 

156 url: SecretStr = Field( 1a

157 default=..., 

158 title="Webhook URL", 

159 description="The Microsoft Power Automate (Workflows) URL used to send notifications to Teams.", 

160 examples=[ 

161 "https://prod-NO.LOCATION.logic.azure.com:443/workflows/WFID/triggers/manual/paths/invoke?api-version=2016-06-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=SIGNATURE" 

162 ], 

163 ) 

164 

165 include_image: bool = Field( 1a

166 default=True, 

167 description="Include an image with the notification.", 

168 ) 

169 

170 wrap: bool = Field( 1a

171 default=True, 

172 description="Wrap the notification text.", 

173 ) 

174 

175 def block_initialization(self) -> None: 1a

176 """see https://github.com/caronc/apprise/pull/1172""" 

177 from apprise.plugins.workflows import NotifyWorkflows 

178 

179 if not ( 

180 parsed_url := cast( 

181 dict[str, Any], 

182 NotifyWorkflows.parse_native_url(self.url.get_secret_value()), # pyright: ignore[reportUnknownMemberType] incomplete type hints in apprise 

183 ) 

184 ): 

185 raise ValueError("Invalid Microsoft Teams Workflow URL provided.") 

186 

187 parsed_url |= {"include_image": self.include_image, "wrap": self.wrap} 

188 

189 self._start_apprise_client(SecretStr(NotifyWorkflows(**parsed_url).url())) # pyright: ignore[reportUnknownMemberType] incomplete type hints in apprise 

190 

191 

192class PagerDutyWebHook(AbstractAppriseNotificationBlock): 1a

193 """ 

194 Enables sending notifications via a provided PagerDuty webhook. 

195 See [Apprise notify_pagerduty docs](https://github.com/caronc/apprise/wiki/Notify_pagerduty) 

196 for more info on formatting the URL. 

197 

198 Examples: 

199 Load a saved PagerDuty webhook and send a message: 

200 ```python 

201 from prefect.blocks.notifications import PagerDutyWebHook 

202 pagerduty_webhook_block = PagerDutyWebHook.load("BLOCK_NAME") 

203 pagerduty_webhook_block.notify("Hello from Prefect!") 

204 ``` 

205 """ 

206 

207 _description = "Enables sending notifications via a provided PagerDuty webhook." 1a

208 

209 _block_type_name = "Pager Duty Webhook" 1a

210 _block_type_slug = "pager-duty-webhook" 1a

211 _logo_url = HttpUrl( 1a

212 "https://cdn.sanity.io/images/3ugk85nk/production/8dbf37d17089c1ce531708eac2e510801f7b3aee-250x250.png" 

213 ) 

214 _documentation_url = HttpUrl( 1a

215 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

216 ) 

217 

218 # PagerDuty requires a valid severity level from its PAGERDUTY_SEVERITY_MAP 

219 notify_type: Literal["info", "success", "warning", "failure"] = Field( # pyright: ignore[reportIncompatibleVariableOverride] 1a

220 default="info", description="The severity of the notification." 

221 ) 

222 

223 integration_key: SecretStr = Field( 1a

224 default=..., 

225 description=( 

226 "This can be found on the Events API V2 " 

227 "integration's detail page, and is also referred to as a Routing Key. " 

228 "This must be provided alongside `api_key`, but will error if provided " 

229 "alongside `url`." 

230 ), 

231 ) 

232 

233 api_key: SecretStr = Field( 1a

234 default=..., 

235 title="API Key", 

236 description=( 

237 "This can be found under Integrations. " 

238 "This must be provided alongside `integration_key`, but will error if " 

239 "provided alongside `url`." 

240 ), 

241 ) 

242 

243 source: Optional[str] = Field( 1a

244 default="Prefect", description="The source string as part of the payload." 

245 ) 

246 

247 component: str = Field( 1a

248 default="Notification", 

249 description="The component string as part of the payload.", 

250 ) 

251 

252 group: Optional[str] = Field( 1a

253 default=None, description="The group string as part of the payload." 

254 ) 

255 

256 class_id: Optional[str] = Field( 1a

257 default=None, 

258 title="Class ID", 

259 description="The class string as part of the payload.", 

260 ) 

261 

262 region_name: Literal["us", "eu"] = Field( 1a

263 default="us", description="The region name." 

264 ) 

265 

266 clickable_url: Optional[AnyHttpUrl] = Field( 1a

267 default=None, 

268 title="Clickable URL", 

269 description="A clickable URL to associate with the notice.", 

270 ) 

271 

272 include_image: bool = Field( 1a

273 default=True, 

274 description="Associate the notification status via a represented icon.", 

275 ) 

276 

277 custom_details: Optional[dict[str, str]] = Field( 1a

278 default=None, 

279 description="Additional details to include as part of the payload.", 

280 examples=['{"disk_space_left": "145GB"}'], 

281 ) 

282 

283 def block_initialization(self) -> None: 1a

284 try: 

285 # Try importing for apprise>=1.18.0 

286 from apprise.plugins.pagerduty import NotifyPagerDuty 

287 except ImportError: 

288 # Fallback for versions apprise<1.18.0 

289 from apprise.plugins.NotifyPagerDuty import ( # pyright: ignore[reportMissingImports] this is a fallback 

290 NotifyPagerDuty, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise 

291 ) 

292 

293 url = SecretStr( 

294 NotifyPagerDuty( 

295 apikey=self.api_key.get_secret_value(), 

296 integrationkey=self.integration_key.get_secret_value(), 

297 source=self.source, 

298 component=self.component, 

299 group=self.group, 

300 class_id=self.class_id, 

301 region_name=self.region_name, 

302 click=self.clickable_url, 

303 include_image=self.include_image, 

304 details=self.custom_details, 

305 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise 

306 ) 

307 self._start_apprise_client(url) 

308 

309 @sync_compatible 1a

310 async def notify( # pyright: ignore[reportIncompatibleMethodOverride] TODO: update to sync only once base class is updated 1a

311 self, 

312 body: str, 

313 subject: str | None = None, 

314 ): 

315 """ 

316 Apprise will combine subject and body by default, so we need to move 

317 the body into the custom_details field. custom_details is part of the 

318 webhook url, so we need to update the url and restart the client. 

319 """ 

320 if subject: 

321 self.custom_details = self.custom_details or {} 

322 self.custom_details.update( 

323 {"Prefect Notification Body": body.replace(" ", "%20")} 

324 ) 

325 body = " " 

326 self.block_initialization() 

327 

328 await super().notify(body, subject) # pyright: ignore[reportGeneralTypeIssues] TODO: update to sync only once base class is updated 

329 

330 

331class TwilioSMS(AbstractAppriseNotificationBlock): 1a

332 """Enables sending notifications via Twilio SMS. 

333 Find more on sending Twilio SMS messages in the [docs](https://www.twilio.com/docs/sms). 

334 

335 Examples: 

336 Load a saved `TwilioSMS` block and send a message: 

337 ```python 

338 from prefect.blocks.notifications import TwilioSMS 

339 twilio_webhook_block = TwilioSMS.load("BLOCK_NAME") 

340 twilio_webhook_block.notify("Hello from Prefect!") 

341 ``` 

342 """ 

343 

344 _description = "Enables sending notifications via Twilio SMS." 1a

345 _block_type_name = "Twilio SMS" 1a

346 _block_type_slug = "twilio-sms" 1a

347 _logo_url = HttpUrl( 1a

348 "https://cdn.sanity.io/images/3ugk85nk/production/8bd8777999f82112c09b9c8d57083ac75a4a0d65-250x250.png" 

349 ) # noqa 

350 _documentation_url = HttpUrl( 1a

351 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

352 ) 

353 

354 account_sid: str = Field( 1a

355 default=..., 

356 description=( 

357 "The Twilio Account SID - it can be found on the homepage " 

358 "of the Twilio console." 

359 ), 

360 ) 

361 

362 auth_token: SecretStr = Field( 1a

363 default=..., 

364 description=( 

365 "The Twilio Authentication Token - " 

366 "it can be found on the homepage of the Twilio console." 

367 ), 

368 ) 

369 

370 from_phone_number: str = Field( 1a

371 default=..., 

372 description="The valid Twilio phone number to send the message from.", 

373 examples=["18001234567"], 

374 ) 

375 

376 to_phone_numbers: list[str] = Field( 1a

377 default=..., 

378 description="A list of valid Twilio phone number(s) to send the message to.", 

379 # not wrapped in brackets because of the way UI displays examples; in code should be ["18004242424"] 

380 examples=["18004242424"], 

381 ) 

382 

383 def block_initialization(self) -> None: 1a

384 try: 

385 # Try importing for apprise>=1.18.0 

386 from apprise.plugins.twilio import NotifyTwilio 

387 except ImportError: 

388 # Fallback for versions apprise<1.18.0 

389 from apprise.plugins.NotifyTwilio import ( # pyright: ignore[reportMissingImports] this is a fallback 

390 NotifyTwilio, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise 

391 ) 

392 

393 url = SecretStr( 

394 NotifyTwilio( 

395 account_sid=self.account_sid, 

396 auth_token=self.auth_token.get_secret_value(), 

397 source=self.from_phone_number, 

398 targets=self.to_phone_numbers, 

399 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise 

400 ) 

401 self._start_apprise_client(url) 

402 

403 

404class OpsgenieWebhook(AbstractAppriseNotificationBlock): 1a

405 """ 

406 Enables sending notifications via a provided Opsgenie webhook. 

407 See [Apprise notify_opsgenie docs](https://github.com/caronc/apprise/wiki/Notify_opsgenie) 

408 for more info on formatting the URL. 

409 

410 Examples: 

411 Load a saved Opsgenie webhook and send a message: 

412 ```python 

413 from prefect.blocks.notifications import OpsgenieWebhook 

414 opsgenie_webhook_block = OpsgenieWebhook.load("BLOCK_NAME") 

415 opsgenie_webhook_block.notify("Hello from Prefect!") 

416 ``` 

417 """ 

418 

419 _description = "Enables sending notifications via a provided Opsgenie webhook." 1a

420 

421 _block_type_name = "Opsgenie Webhook" 1a

422 _block_type_slug = "opsgenie-webhook" 1a

423 _logo_url = HttpUrl( 1a

424 "https://cdn.sanity.io/images/3ugk85nk/production/d8b5bc6244ae6cd83b62ec42f10d96e14d6e9113-280x280.png" 

425 ) 

426 _documentation_url = HttpUrl( 1a

427 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

428 ) 

429 

430 apikey: SecretStr = Field( 1a

431 default=..., 

432 title="API Key", 

433 description="The API Key associated with your Opsgenie account.", 

434 ) 

435 

436 target_user: Optional[list[str]] = Field( 1a

437 default=None, description="The user(s) you wish to notify." 

438 ) 

439 

440 target_team: Optional[list[str]] = Field( 1a

441 default=None, description="The team(s) you wish to notify." 

442 ) 

443 

444 target_schedule: Optional[list[str]] = Field( 1a

445 default=None, description="The schedule(s) you wish to notify." 

446 ) 

447 

448 target_escalation: Optional[list[str]] = Field( 1a

449 default=None, description="The escalation(s) you wish to notify." 

450 ) 

451 

452 region_name: Literal["us", "eu"] = Field( 1a

453 default="us", description="The 2-character region code." 

454 ) 

455 

456 batch: bool = Field( 1a

457 default=False, 

458 description="Notify all targets in batches (instead of individually).", 

459 ) 

460 

461 tags: Optional[list[str]] = Field( 1a

462 default=None, 

463 description=( 

464 "A comma-separated list of tags you can associate with your Opsgenie" 

465 " message." 

466 ), 

467 examples=['["tag1", "tag2"]'], 

468 ) 

469 

470 priority: Optional[int] = Field( 1a

471 default=3, 

472 description=( 

473 "The priority to associate with the message. It is on a scale between 1" 

474 " (LOW) and 5 (EMERGENCY)." 

475 ), 

476 ) 

477 

478 alias: Optional[str] = Field( 1a

479 default=None, description="The alias to associate with the message." 

480 ) 

481 

482 entity: Optional[str] = Field( 1a

483 default=None, description="The entity to associate with the message." 

484 ) 

485 

486 details: Optional[dict[str, str]] = Field( 1a

487 default=None, 

488 description="Additional details composed of key/values pairs.", 

489 examples=['{"key1": "value1", "key2": "value2"}'], 

490 ) 

491 

492 def block_initialization(self) -> None: 1a

493 try: 

494 # Try importing for apprise>=1.18.0 

495 from apprise.plugins.opsgenie import NotifyOpsgenie 

496 except ImportError: 

497 # Fallback for versions apprise<1.18.0 

498 from apprise.plugins.NotifyOpsgenie import ( # pyright: ignore[reportMissingImports] this is a fallback 

499 NotifyOpsgenie, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise 

500 ) 

501 

502 targets: list[str] = [] 

503 if self.target_user: 

504 [targets.append(f"@{x}") for x in self.target_user] 

505 if self.target_team: 

506 [targets.append(f"#{x}") for x in self.target_team] 

507 if self.target_schedule: 

508 [targets.append(f"*{x}") for x in self.target_schedule] 

509 if self.target_escalation: 

510 [targets.append(f"^{x}") for x in self.target_escalation] 

511 url = SecretStr( 

512 NotifyOpsgenie( 

513 apikey=self.apikey.get_secret_value(), 

514 targets=targets, 

515 region_name=self.region_name, 

516 details=self.details, 

517 priority=self.priority, 

518 alias=self.alias, 

519 entity=self.entity, 

520 batch=self.batch, 

521 tags=self.tags, 

522 action="new", 

523 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise 

524 ) 

525 self._start_apprise_client(url) 

526 

527 

528class MattermostWebhook(AbstractAppriseNotificationBlock): 1a

529 """ 

530 Enables sending notifications via a provided Mattermost webhook. 

531 See [Apprise notify_Mattermost docs](https://github.com/caronc/apprise/wiki/Notify_Mattermost) # noqa 

532 

533 

534 Examples: 

535 Load a saved Mattermost webhook and send a message: 

536 ```python 

537 from prefect.blocks.notifications import MattermostWebhook 

538 

539 mattermost_webhook_block = MattermostWebhook.load("BLOCK_NAME") 

540 

541 mattermost_webhook_block.notify("Hello from Prefect!") 

542 ``` 

543 """ 

544 

545 _description = "Enables sending notifications via a provided Mattermost webhook." 1a

546 _block_type_name = "Mattermost Webhook" 1a

547 _block_type_slug = "mattermost-webhook" 1a

548 _logo_url = HttpUrl( 1a

549 "https://cdn.sanity.io/images/3ugk85nk/production/1350a147130bf82cbc799a5f868d2c0116207736-250x250.png" 

550 ) 

551 _documentation_url = HttpUrl( 1a

552 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

553 ) 

554 

555 hostname: str = Field( 1a

556 default=..., 

557 description="The hostname of your Mattermost server.", 

558 examples=["Mattermost.example.com"], 

559 ) 

560 secure: bool = Field( 1a

561 default=False, 

562 description="Whether to use secure https connection.", 

563 ) 

564 

565 token: SecretStr = Field( 1a

566 default=..., 

567 description="The token associated with your Mattermost webhook.", 

568 ) 

569 

570 botname: Optional[str] = Field( 1a

571 title="Bot name", 

572 default=None, 

573 description="The name of the bot that will send the message.", 

574 ) 

575 

576 channels: Optional[list[str]] = Field( 1a

577 default=None, 

578 description="The channel(s) you wish to notify.", 

579 ) 

580 

581 include_image: bool = Field( 1a

582 default=False, 

583 description="Whether to include the Apprise status image in the message.", 

584 ) 

585 

586 path: Optional[str] = Field( 1a

587 default=None, 

588 description="An optional sub-path specification to append to the hostname.", 

589 ) 

590 

591 port: int = Field( 1a

592 default=8065, 

593 description="The port of your Mattermost server.", 

594 ) 

595 

596 def block_initialization(self) -> None: 1a

597 try: 

598 # Try importing for apprise>=1.18.0 

599 from apprise.plugins.mattermost import NotifyMattermost 

600 except ImportError: 

601 # Fallback for versions apprise<1.18.0 

602 from apprise.plugins.NotifyMattermost import ( # pyright: ignore[reportMissingImports] this is a fallback 

603 NotifyMattermost, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise 

604 ) 

605 

606 url = SecretStr( 

607 NotifyMattermost( 

608 token=self.token.get_secret_value(), 

609 fullpath=self.path, 

610 host=self.hostname, 

611 botname=self.botname, 

612 channels=self.channels, 

613 include_image=self.include_image, 

614 port=self.port, 

615 secure=self.secure, 

616 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise 

617 ) 

618 self._start_apprise_client(url) 

619 

620 

621class DiscordWebhook(AbstractAppriseNotificationBlock): 1a

622 """ 

623 Enables sending notifications via a provided Discord webhook. 

624 See [Apprise notify_Discord docs](https://github.com/caronc/apprise/wiki/Notify_Discord) # noqa 

625 

626 Examples: 

627 Load a saved Discord webhook and send a message: 

628 ```python 

629 from prefect.blocks.notifications import DiscordWebhook 

630 

631 discord_webhook_block = DiscordWebhook.load("BLOCK_NAME") 

632 

633 discord_webhook_block.notify("Hello from Prefect!") 

634 ``` 

635 """ 

636 

637 _description = "Enables sending notifications via a provided Discord webhook." 1a

638 _block_type_name = "Discord Webhook" 1a

639 _block_type_slug = "discord-webhook" 1a

640 _logo_url = HttpUrl( 1a

641 "https://cdn.sanity.io/images/3ugk85nk/production/9e94976c80ef925b66d24e5d14f0d47baa6b8f88-250x250.png" 

642 ) 

643 _documentation_url = HttpUrl( 1a

644 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

645 ) 

646 

647 webhook_id: SecretStr = Field( 1a

648 default=..., 

649 description=( 

650 "The first part of 2 tokens provided to you after creating a" 

651 " incoming-webhook." 

652 ), 

653 ) 

654 

655 webhook_token: SecretStr = Field( 1a

656 default=..., 

657 description=( 

658 "The second part of 2 tokens provided to you after creating a" 

659 " incoming-webhook." 

660 ), 

661 ) 

662 

663 botname: Optional[str] = Field( 1a

664 title="Bot name", 

665 default=None, 

666 description=( 

667 "Identify the name of the bot that should issue the message. If one isn't" 

668 " specified then the default is to just use your account (associated with" 

669 " the incoming-webhook)." 

670 ), 

671 ) 

672 

673 tts: bool = Field( 1a

674 default=False, 

675 description="Whether to enable Text-To-Speech.", 

676 ) 

677 

678 include_image: bool = Field( 1a

679 default=False, 

680 description=( 

681 "Whether to include an image in-line with the message describing the" 

682 " notification type." 

683 ), 

684 ) 

685 

686 avatar: bool = Field( 1a

687 default=False, 

688 description="Whether to override the default discord avatar icon.", 

689 ) 

690 

691 avatar_url: Optional[str] = Field( 1a

692 title="Avatar URL", 

693 default=None, 

694 description=( 

695 "Over-ride the default discord avatar icon URL. By default this is not set" 

696 " and Apprise chooses the URL dynamically based on the type of message" 

697 " (info, success, warning, or error)." 

698 ), 

699 ) 

700 

701 def block_initialization(self) -> None: 1a

702 try: 

703 # Try importing for apprise>=1.18.0 

704 from apprise.plugins.discord import NotifyDiscord 

705 except ImportError: 

706 # Fallback for versions apprise<1.18.0 

707 from apprise.plugins.NotifyDiscord import ( # pyright: ignore[reportMissingImports] this is a fallback 

708 NotifyDiscord, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise 

709 ) 

710 

711 url = SecretStr( 

712 NotifyDiscord( 

713 webhook_id=self.webhook_id.get_secret_value(), 

714 webhook_token=self.webhook_token.get_secret_value(), 

715 botname=self.botname, 

716 tts=self.tts, 

717 include_image=self.include_image, 

718 avatar=self.avatar, 

719 avatar_url=self.avatar_url, 

720 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise 

721 ) 

722 self._start_apprise_client(url) 

723 

724 

725class CustomWebhookNotificationBlock(NotificationBlock): 1a

726 """ 

727 Enables sending notifications via any custom webhook. 

728 

729 All nested string param contains `{{key}}` will be substituted with value from context/secrets. 

730 

731 Context values include: `subject`, `body` and `name`. 

732 

733 Examples: 

734 Load a saved custom webhook and send a message: 

735 ```python 

736 from prefect.blocks.notifications import CustomWebhookNotificationBlock 

737 

738 custom_webhook_block = CustomWebhookNotificationBlock.load("BLOCK_NAME") 

739 

740 custom_webhook_block.notify("Hello from Prefect!") 

741 ``` 

742 """ 

743 

744 _block_type_name = "Custom Webhook" 1a

745 _logo_url = HttpUrl( 1a

746 "https://cdn.sanity.io/images/3ugk85nk/production/c7247cb359eb6cf276734d4b1fbf00fb8930e89e-250x250.png" 

747 ) 

748 _documentation_url = HttpUrl( 1a

749 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

750 ) 

751 

752 name: str = Field(title="Name", description="Name of the webhook.") 1a

753 

754 url: str = Field( 1a

755 title="Webhook URL", 

756 description="The webhook URL.", 

757 examples=["https://hooks.slack.com/XXX"], 

758 ) 

759 

760 method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"] = Field( 1a

761 default="POST", description="The webhook request method. Defaults to `POST`." 

762 ) 

763 

764 params: Optional[dict[str, str]] = Field( 1a

765 default=None, title="Query Params", description="Custom query params." 

766 ) 

767 json_data: Optional[dict[str, Any]] = Field( 1a

768 default=None, 

769 title="JSON Data", 

770 description="Send json data as payload.", 

771 examples=[ 

772 '{"text": "{{subject}}\\n{{body}}", "title": "{{name}}", "token":' 

773 ' "{{tokenFromSecrets}}"}' 

774 ], 

775 ) 

776 form_data: Optional[dict[str, str]] = Field( 1a

777 default=None, 

778 title="Form Data", 

779 description=( 

780 "Send form data as payload. Should not be used together with _JSON Data_." 

781 ), 

782 examples=[ 

783 '{"text": "{{subject}}\\n{{body}}", "title": "{{name}}", "token":' 

784 ' "{{tokenFromSecrets}}"}' 

785 ], 

786 ) 

787 

788 headers: Optional[dict[str, str]] = Field(None, description="Custom headers.") 1a

789 cookies: Optional[dict[str, str]] = Field(None, description="Custom cookies.") 1a

790 

791 timeout: float = Field( 1a

792 default=10, description="Request timeout in seconds. Defaults to 10." 

793 ) 

794 

795 secrets: SecretDict = Field( 1a

796 default_factory=lambda: SecretDict(dict()), 

797 title="Custom Secret Values", 

798 description="A dictionary of secret values to be substituted in other configs.", 

799 examples=['{"tokenFromSecrets":"SomeSecretToken"}'], 

800 ) 

801 

802 def _build_request_args(self, body: str, subject: str | None) -> dict[str, Any]: 1a

803 """Build kwargs for httpx.AsyncClient.request""" 

804 # prepare values 

805 values = self.secrets.get_secret_value() 

806 # use 'null' when subject is None 

807 values.update( 

808 { 

809 "subject": "null" if subject is None else subject, 

810 "body": body, 

811 "name": self.name, 

812 } 

813 ) 

814 # do substution 

815 return apply_values( 

816 { 

817 "method": self.method, 

818 "url": self.url, 

819 "params": self.params, 

820 "data": self.form_data, 

821 "json": self.json_data, 

822 "headers": self.headers, 

823 "cookies": self.cookies, 

824 "timeout": self.timeout, 

825 }, 

826 values, 

827 ) 

828 

829 def block_initialization(self) -> None: 1a

830 # check form_data and json_data 

831 if self.form_data is not None and self.json_data is not None: 

832 raise ValueError("both `Form Data` and `JSON Data` provided") 

833 allowed_keys = {"subject", "body", "name"}.union( 

834 self.secrets.get_secret_value().keys() 

835 ) 

836 # test template to raise a error early 

837 for name in ["url", "params", "form_data", "json_data", "headers", "cookies"]: 

838 template = getattr(self, name) 

839 if template is None: 

840 continue 

841 # check for placeholders not in predefined keys and secrets 

842 placeholders = find_placeholders(template) 

843 for placeholder in placeholders: 

844 if placeholder.name not in allowed_keys: 

845 raise KeyError(f"{name}/{placeholder}") 

846 

847 @sync_compatible 1a

848 async def notify(self, body: str, subject: str | None = None) -> None: # pyright: ignore[reportIncompatibleMethodOverride] 1a

849 import httpx 

850 

851 request_args = self._build_request_args(body, subject) 

852 cookies = request_args.pop("cookies", dict()) 

853 # make request with httpx 

854 client = httpx.AsyncClient( 

855 headers={"user-agent": "Prefect Notifications"}, cookies=cookies 

856 ) 

857 async with client: 

858 resp = await client.request(**request_args) 

859 resp.raise_for_status() 

860 

861 

862class SendgridEmail(AbstractAppriseNotificationBlock): 1a

863 """ 

864 Enables sending notifications via any sendgrid account. 

865 See [Apprise Notify_sendgrid docs](https://github.com/caronc/apprise/wiki/Notify_Sendgrid) 

866 

867 Examples: 

868 Load a saved Sendgrid and send a email message: 

869 ```python 

870 from prefect.blocks.notifications import SendgridEmail 

871 

872 sendgrid_block = SendgridEmail.load("BLOCK_NAME") 

873 

874 sendgrid_block.notify("Hello from Prefect!") 

875 ``` 

876 """ 

877 

878 _description = "Enables sending notifications via Sendgrid email service." 1a

879 _block_type_name = "Sendgrid Email" 1a

880 _block_type_slug = "sendgrid-email" 1a

881 _logo_url = HttpUrl( 1a

882 "https://cdn.sanity.io/images/3ugk85nk/production/82bc6ed16ca42a2252a5512c72233a253b8a58eb-250x250.png" 

883 ) 

884 _documentation_url = HttpUrl( 1a

885 "https://docs.prefect.io/latest/automate/events/automations-triggers#sending-notifications-with-automations" 

886 ) 

887 

888 api_key: SecretStr = Field( 1a

889 default=..., 

890 title="API Key", 

891 description="The API Key associated with your sendgrid account.", 

892 ) 

893 

894 sender_email: str = Field( 1a

895 title="Sender email id", 

896 description="The sender email id.", 

897 examples=["test-support@gmail.com"], 

898 ) 

899 

900 to_emails: list[str] = Field( 1a

901 default=..., 

902 title="Recipient emails", 

903 description="Email ids of all recipients.", 

904 examples=['"recipient1@gmail.com"'], 

905 ) 

906 

907 def block_initialization(self) -> None: 1a

908 try: 

909 # Try importing for apprise>=1.18.0 

910 from apprise.plugins.sendgrid import NotifySendGrid 

911 except ImportError: 

912 # Fallback for versions apprise<1.18.0 

913 from apprise.plugins.NotifySendGrid import ( # pyright: ignore[reportMissingImports] this is a fallback 

914 NotifySendGrid, # pyright: ignore[reportUnknownVariableType] incomplete type hints in apprise 

915 ) 

916 

917 self._NotifySendGrid = NotifySendGrid # Cache the working import 

918 url = self._build_sendgrid_url() 

919 self._start_apprise_client(url) 

920 

921 def _build_sendgrid_url(self) -> SecretStr: 1a

922 """Build the SendGrid URL with current to_emails.""" 

923 return SecretStr( 

924 self._NotifySendGrid( 

925 apikey=self.api_key.get_secret_value(), 

926 from_email=self.sender_email, 

927 targets=self.to_emails, 

928 ).url() # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType] incomplete type hints in apprise 

929 ) 

930 

931 @sync_compatible 1a

932 async def notify( 1a

933 self, 

934 body: str, 

935 subject: str | None = None, 

936 ): 

937 # Update apprise client with current to_emails before sending 

938 if hasattr(self, "_apprise_client") and self._apprise_client: 

939 self._apprise_client.clear() 

940 self._apprise_client.add( 

941 servers=self._build_sendgrid_url().get_secret_value() 

942 ) 

943 

944 await super().notify(body, subject)