Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/abstract.py: 66%
124 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import logging 1a
4import sys 1a
5from abc import ABC, abstractmethod 1a
6from contextlib import contextmanager 1a
7from logging import Logger 1a
8from pathlib import Path 1a
9from typing import ( 1a
10 Any,
11 BinaryIO,
12 Generator,
13 Generic,
14 TypeVar,
15 Union,
16)
18from typing_extensions import TYPE_CHECKING, Self, TypeAlias 1a
20from prefect.blocks.core import Block 1a
21from prefect.exceptions import MissingContextError 1a
22from prefect.logging.loggers import get_logger, get_run_logger 1a
24T = TypeVar("T") 1a
26if sys.version_info >= (3, 12): 26 ↛ 29line 26 didn't jump to line 29 because the condition on line 26 was always true1a
27 LoggingAdapter = logging.LoggerAdapter[logging.Logger] 1a
28else:
29 if TYPE_CHECKING:
30 LoggingAdapter = logging.LoggerAdapter[logging.Logger]
31 else:
32 LoggingAdapter = logging.LoggerAdapter
34LoggerOrAdapter: TypeAlias = Union[Logger, LoggingAdapter] 1a
37class CredentialsBlock(Block, ABC): 1a
38 """
39 Stores credentials for an external system and exposes a client for interacting
40 with that system. Can also hold config that is tightly coupled to credentials
41 (domain, endpoint, account ID, etc.) Will often be composed with other blocks.
42 Parent block should rely on the client provided by a credentials block for
43 interacting with the corresponding external system.
44 """
46 @property 1a
47 def logger(self) -> LoggerOrAdapter: 1a
48 """
49 Returns a logger based on whether the CredentialsBlock
50 is called from within a flow or task run context.
51 If a run context is present, the logger property returns a run logger.
52 Else, it returns a default logger labeled with the class's name.
54 Returns:
55 The run logger or a default logger with the class's name.
56 """
57 try:
58 return get_run_logger()
59 except MissingContextError:
60 return get_logger(self.__class__.__name__)
62 @abstractmethod 1a
63 def get_client(self, *args: Any, **kwargs: Any) -> Any: 1a
64 """
65 Returns a client for interacting with the external system.
67 If a service offers various clients, this method can accept
68 a `client_type` keyword argument to get the desired client
69 within the service.
70 """
73class NotificationError(Exception): 1a
74 """Raised if a notification block fails to send a notification."""
76 def __init__(self, log: str) -> None: 1a
77 self.log = log
80class NotificationBlock(Block, ABC): 1a
81 """
82 Block that represents a resource in an external system that is able to send notifications.
83 """
85 _block_schema_capabilities = ["notify"] 1a
86 _events_excluded_methods = Block._events_excluded_methods + ["notify"] 1a
88 @property 1a
89 def logger(self) -> LoggerOrAdapter: 1a
90 """
91 Returns a logger based on whether the NotificationBlock
92 is called from within a flow or task run context.
93 If a run context is present, the logger property returns a run logger.
94 Else, it returns a default logger labeled with the class's name.
96 Returns:
97 The run logger or a default logger with the class's name.
98 """
99 try:
100 return get_run_logger()
101 except MissingContextError:
102 return get_logger(self.__class__.__name__)
104 @abstractmethod 1a
105 async def notify(self, body: str, subject: str | None = None) -> None: 1a
106 """
107 Send a notification.
109 Args:
110 body: The body of the notification.
111 subject: The subject of the notification.
112 """
114 _raise_on_failure: bool = False 1a
116 @contextmanager 1a
117 def raise_on_failure(self) -> Generator[None, None, None]: 1a
118 """
119 Context manager that, while active, causes the block to raise errors if it
120 encounters a failure sending notifications.
121 """
122 self._raise_on_failure = True
123 try:
124 yield
125 finally:
126 self._raise_on_failure = False
129class JobRun(ABC, Generic[T]): # not a block 1a
130 """
131 Represents a job run in an external system. Allows waiting
132 for the job run's completion and fetching its results.
133 """
135 @property 1a
136 def logger(self) -> LoggerOrAdapter: 1a
137 """
138 Returns a logger based on whether the JobRun
139 is called from within a flow or task run context.
140 If a run context is present, the logger property returns a run logger.
141 Else, it returns a default logger labeled with the class's name.
143 Returns:
144 The run logger or a default logger with the class's name.
145 """
146 try:
147 return get_run_logger()
148 except MissingContextError:
149 return get_logger(self.__class__.__name__)
151 @abstractmethod 1a
152 async def wait_for_completion(self) -> Logger: 1a
153 """
154 Wait for the job run to complete.
155 """
157 @abstractmethod 1a
158 async def fetch_result(self) -> T: 1a
159 """
160 Retrieve the results of the job run and return them.
161 """
164class JobBlock(Block, ABC, Generic[T]): 1a
165 """
166 Block that represents an entity in an external service
167 that can trigger a long running execution.
168 """
170 @property 1a
171 def logger(self) -> LoggerOrAdapter: 1a
172 """
173 Returns a logger based on whether the JobBlock
174 is called from within a flow or task run context.
175 If a run context is present, the logger property returns a run logger.
176 Else, it returns a default logger labeled with the class's name.
178 Returns:
179 The run logger or a default logger with the class's name.
180 """
181 try:
182 return get_run_logger()
183 except MissingContextError:
184 return get_logger(self.__class__.__name__)
186 @abstractmethod 1a
187 async def trigger(self) -> JobRun[T]: 1a
188 """
189 Triggers a job run in an external service and returns a JobRun object
190 to track the execution of the run.
191 """
194# TODO: This interface is heavily influenced by
195# [PEP 249](https://peps.python.org/pep-0249/)
196# Primarily intended for use with relational databases.
197# A separate interface may be necessary for
198# non relational databases.
199class DatabaseBlock(Block, ABC): 1a
200 """
201 An abstract block type that represents a database and
202 provides an interface for interacting with it.
204 Blocks that implement this interface have the option to accept
205 credentials directly via attributes or via a nested `CredentialsBlock`.
207 Use of a nested credentials block is recommended unless credentials
208 are tightly coupled to database connection configuration.
210 Implementing either sync or async context management on `DatabaseBlock`
211 implementations is recommended.
212 """
214 @property 1a
215 def logger(self) -> LoggerOrAdapter: 1a
216 """
217 Returns a logger based on whether the DatabaseBlock
218 is called from within a flow or task run context.
219 If a run context is present, the logger property returns a run logger.
220 Else, it returns a default logger labeled with the class's name.
222 Returns:
223 The run logger or a default logger with the class's name.
224 """
225 try:
226 return get_run_logger()
227 except MissingContextError:
228 return get_logger(self.__class__.__name__)
230 @abstractmethod 1a
231 async def fetch_one( 1a
232 self,
233 operation: str,
234 parameters: dict[str, Any] | None = None,
235 **execution_kwargs: Any,
236 ) -> tuple[Any, ...]:
237 """
238 Fetch a single result from the database.
240 Args:
241 operation: The SQL query or other operation to be executed.
242 parameters: The parameters for the operation.
243 **execution_kwargs: Additional keyword arguments to pass to execute.
245 Returns:
246 A list of tuples containing the data returned by the database,
247 where each row is a tuple and each column is a value in the tuple.
248 """
250 @abstractmethod 1a
251 async def fetch_many( 1a
252 self,
253 operation: str,
254 parameters: dict[str, Any] | None = None,
255 size: int | None = None,
256 **execution_kwargs: Any,
257 ) -> list[tuple[Any, ...]]:
258 """
259 Fetch a limited number of results from the database.
261 Args:
262 operation: The SQL query or other operation to be executed.
263 parameters: The parameters for the operation.
264 size: The number of results to return.
265 **execution_kwargs: Additional keyword arguments to pass to execute.
267 Returns:
268 A list of tuples containing the data returned by the database,
269 where each row is a tuple and each column is a value in the tuple.
270 """
272 @abstractmethod 1a
273 async def fetch_all( 1a
274 self,
275 operation: str,
276 parameters: dict[str, Any] | None = None,
277 **execution_kwargs: Any,
278 ) -> list[tuple[Any, ...]]:
279 """
280 Fetch all results from the database.
282 Args:
283 operation: The SQL query or other operation to be executed.
284 parameters: The parameters for the operation.
285 **execution_kwargs: Additional keyword arguments to pass to execute.
287 Returns:
288 A list of tuples containing the data returned by the database,
289 where each row is a tuple and each column is a value in the tuple.
290 """
292 @abstractmethod 1a
293 async def execute( 1a
294 self,
295 operation: str,
296 parameters: dict[str, Any] | None = None,
297 **execution_kwargs: Any,
298 ) -> None:
299 """
300 Executes an operation on the database. This method is intended to be used
301 for operations that do not return data, such as INSERT, UPDATE, or DELETE.
303 Args:
304 operation: The SQL query or other operation to be executed.
305 parameters: The parameters for the operation.
306 **execution_kwargs: Additional keyword arguments to pass to execute.
307 """
309 @abstractmethod 1a
310 async def execute_many( 1a
311 self,
312 operation: str,
313 seq_of_parameters: list[dict[str, Any]],
314 **execution_kwargs: Any,
315 ) -> None:
316 """
317 Executes multiple operations on the database. This method is intended to be used
318 for operations that do not return data, such as INSERT, UPDATE, or DELETE.
320 Args:
321 operation: The SQL query or other operation to be executed.
322 seq_of_parameters: The sequence of parameters for the operation.
323 **execution_kwargs: Additional keyword arguments to pass to execute.
324 """
326 # context management methods are not abstract methods because
327 # they are not supported by all database drivers
328 async def __aenter__(self) -> Self: 1a
329 """
330 Context management method for async databases.
331 """
332 raise NotImplementedError(
333 f"{self.__class__.__name__} does not support async context management."
334 )
336 async def __aexit__(self, *args: Any) -> None: 1a
337 """
338 Context management method for async databases.
339 """
340 raise NotImplementedError(
341 f"{self.__class__.__name__} does not support async context management."
342 )
344 def __enter__(self) -> Self: 1a
345 """
346 Context management method for databases.
347 """
348 raise NotImplementedError(
349 f"{self.__class__.__name__} does not support context management."
350 )
352 def __exit__(self, *args: Any) -> None: 1a
353 """
354 Context management method for databases.
355 """
356 raise NotImplementedError(
357 f"{self.__class__.__name__} does not support context management."
358 )
361class ObjectStorageBlock(Block, ABC): 1a
362 """
363 Block that represents a resource in an external service that can store
364 objects.
365 """
367 @property 1a
368 def logger(self) -> LoggerOrAdapter: 1a
369 """
370 Returns a logger based on whether the ObjectStorageBlock
371 is called from within a flow or task run context.
372 If a run context is present, the logger property returns a run logger.
373 Else, it returns a default logger labeled with the class's name.
375 Returns:
376 The run logger or a default logger with the class's name.
377 """
378 try:
379 return get_run_logger()
380 except MissingContextError:
381 return get_logger(self.__class__.__name__)
383 @abstractmethod 1a
384 async def download_object_to_path( 1a
385 self,
386 from_path: str,
387 to_path: str | Path,
388 **download_kwargs: Any,
389 ) -> Path:
390 """
391 Downloads an object from the object storage service to a path.
393 Args:
394 from_path: The path to download from.
395 to_path: The path to download to.
396 **download_kwargs: Additional keyword arguments to pass to download.
398 Returns:
399 The path that the object was downloaded to.
400 """
402 @abstractmethod 1a
403 async def download_object_to_file_object( 1a
404 self,
405 from_path: str,
406 to_file_object: BinaryIO,
407 **download_kwargs: Any,
408 ) -> BinaryIO:
409 """
410 Downloads an object from the object storage service to a file-like object,
411 which can be a BytesIO object or a BufferedWriter.
413 Args:
414 from_path: The path to download from.
415 to_file_object: The file-like object to download to.
416 **download_kwargs: Additional keyword arguments to pass to download.
418 Returns:
419 The file-like object that the object was downloaded to.
420 """
422 @abstractmethod 1a
423 async def download_folder_to_path( 1a
424 self,
425 from_folder: str,
426 to_folder: str | Path,
427 **download_kwargs: Any,
428 ) -> Path:
429 """
430 Downloads a folder from the object storage service to a path.
432 Args:
433 from_folder: The path to the folder to download from.
434 to_folder: The path to download the folder to.
435 **download_kwargs: Additional keyword arguments to pass to download.
437 Returns:
438 The path that the folder was downloaded to.
439 """
441 @abstractmethod 1a
442 async def upload_from_path( 1a
443 self, from_path: str | Path, to_path: str, **upload_kwargs: Any
444 ) -> str:
445 """
446 Uploads an object from a path to the object storage service.
448 Args:
449 from_path: The path to the file to upload from.
450 to_path: The path to upload the file to.
451 **upload_kwargs: Additional keyword arguments to pass to upload.
453 Returns:
454 The path that the object was uploaded to.
455 """
457 @abstractmethod 1a
458 async def upload_from_file_object( 1a
459 self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Any
460 ) -> str:
461 """
462 Uploads an object to the object storage service from a file-like object,
463 which can be a BytesIO object or a BufferedReader.
465 Args:
466 from_file_object: The file-like object to upload from.
467 to_path: The path to upload the object to.
468 **upload_kwargs: Additional keyword arguments to pass to upload.
470 Returns:
471 The path that the object was uploaded to.
472 """
474 @abstractmethod 1a
475 async def upload_from_folder( 1a
476 self,
477 from_folder: str | Path,
478 to_folder: str,
479 **upload_kwargs: Any,
480 ) -> str:
481 """
482 Uploads a folder to the object storage service from a path.
484 Args:
485 from_folder: The path to the folder to upload from.
486 to_folder: The path to upload the folder to.
487 **upload_kwargs: Additional keyword arguments to pass to upload.
489 Returns:
490 The path that the folder was uploaded to.
491 """
494class SecretBlock(Block, ABC): 1a
495 """
496 Block that represents a resource that can store and retrieve secrets.
497 """
499 @property 1a
500 def logger(self) -> LoggerOrAdapter: 1a
501 """
502 Returns a logger based on whether the SecretBlock
503 is called from within a flow or task run context.
504 If a run context is present, the logger property returns a run logger.
505 Else, it returns a default logger labeled with the class's name.
507 Returns:
508 The run logger or a default logger with the class's name.
509 """
510 try:
511 return get_run_logger()
512 except MissingContextError:
513 return get_logger(self.__class__.__name__)
515 @abstractmethod 1a
516 async def read_secret(self) -> bytes: 1a
517 """
518 Reads the configured secret from the secret storage service.
520 Returns:
521 The secret data.
522 """
524 @abstractmethod 1a
525 async def write_secret(self, secret_data: bytes) -> str: 1a
526 """
527 Writes secret data to the configured secret in the secret storage service.
529 Args:
530 secret_data: The secret data to write.
532 Returns:
533 The key of the secret that can be used for retrieval.
534 """