Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/abstract.py: 66%

124 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import logging 1a

4import sys 1a

5from abc import ABC, abstractmethod 1a

6from contextlib import contextmanager 1a

7from logging import Logger 1a

8from pathlib import Path 1a

9from typing import ( 1a

10 Any, 

11 BinaryIO, 

12 Generator, 

13 Generic, 

14 TypeVar, 

15 Union, 

16) 

17 

18from typing_extensions import TYPE_CHECKING, Self, TypeAlias 1a

19 

20from prefect.blocks.core import Block 1a

21from prefect.exceptions import MissingContextError 1a

22from prefect.logging.loggers import get_logger, get_run_logger 1a

23 

24T = TypeVar("T") 1a

25 

26if sys.version_info >= (3, 12): 26 ↛ 29line 26 didn't jump to line 29 because the condition on line 26 was always true1a

27 LoggingAdapter = logging.LoggerAdapter[logging.Logger] 1a

28else: 

29 if TYPE_CHECKING: 

30 LoggingAdapter = logging.LoggerAdapter[logging.Logger] 

31 else: 

32 LoggingAdapter = logging.LoggerAdapter 

33 

34LoggerOrAdapter: TypeAlias = Union[Logger, LoggingAdapter] 1a

35 

36 

37class CredentialsBlock(Block, ABC): 1a

38 """ 

39 Stores credentials for an external system and exposes a client for interacting 

40 with that system. Can also hold config that is tightly coupled to credentials 

41 (domain, endpoint, account ID, etc.) Will often be composed with other blocks. 

42 Parent block should rely on the client provided by a credentials block for 

43 interacting with the corresponding external system. 

44 """ 

45 

46 @property 1a

47 def logger(self) -> LoggerOrAdapter: 1a

48 """ 

49 Returns a logger based on whether the CredentialsBlock 

50 is called from within a flow or task run context. 

51 If a run context is present, the logger property returns a run logger. 

52 Else, it returns a default logger labeled with the class's name. 

53 

54 Returns: 

55 The run logger or a default logger with the class's name. 

56 """ 

57 try: 

58 return get_run_logger() 

59 except MissingContextError: 

60 return get_logger(self.__class__.__name__) 

61 

62 @abstractmethod 1a

63 def get_client(self, *args: Any, **kwargs: Any) -> Any: 1a

64 """ 

65 Returns a client for interacting with the external system. 

66 

67 If a service offers various clients, this method can accept 

68 a `client_type` keyword argument to get the desired client 

69 within the service. 

70 """ 

71 

72 

73class NotificationError(Exception): 1a

74 """Raised if a notification block fails to send a notification.""" 

75 

76 def __init__(self, log: str) -> None: 1a

77 self.log = log 

78 

79 

80class NotificationBlock(Block, ABC): 1a

81 """ 

82 Block that represents a resource in an external system that is able to send notifications. 

83 """ 

84 

85 _block_schema_capabilities = ["notify"] 1a

86 _events_excluded_methods = Block._events_excluded_methods + ["notify"] 1a

87 

88 @property 1a

89 def logger(self) -> LoggerOrAdapter: 1a

90 """ 

91 Returns a logger based on whether the NotificationBlock 

92 is called from within a flow or task run context. 

93 If a run context is present, the logger property returns a run logger. 

94 Else, it returns a default logger labeled with the class's name. 

95 

96 Returns: 

97 The run logger or a default logger with the class's name. 

98 """ 

99 try: 

100 return get_run_logger() 

101 except MissingContextError: 

102 return get_logger(self.__class__.__name__) 

103 

104 @abstractmethod 1a

105 async def notify(self, body: str, subject: str | None = None) -> None: 1a

106 """ 

107 Send a notification. 

108 

109 Args: 

110 body: The body of the notification. 

111 subject: The subject of the notification. 

112 """ 

113 

114 _raise_on_failure: bool = False 1a

115 

116 @contextmanager 1a

117 def raise_on_failure(self) -> Generator[None, None, None]: 1a

118 """ 

119 Context manager that, while active, causes the block to raise errors if it 

120 encounters a failure sending notifications. 

121 """ 

122 self._raise_on_failure = True 

123 try: 

124 yield 

125 finally: 

126 self._raise_on_failure = False 

127 

128 

129class JobRun(ABC, Generic[T]): # not a block 1a

130 """ 

131 Represents a job run in an external system. Allows waiting 

132 for the job run's completion and fetching its results. 

133 """ 

134 

135 @property 1a

136 def logger(self) -> LoggerOrAdapter: 1a

137 """ 

138 Returns a logger based on whether the JobRun 

139 is called from within a flow or task run context. 

140 If a run context is present, the logger property returns a run logger. 

141 Else, it returns a default logger labeled with the class's name. 

142 

143 Returns: 

144 The run logger or a default logger with the class's name. 

145 """ 

146 try: 

147 return get_run_logger() 

148 except MissingContextError: 

149 return get_logger(self.__class__.__name__) 

150 

151 @abstractmethod 1a

152 async def wait_for_completion(self) -> Logger: 1a

153 """ 

154 Wait for the job run to complete. 

155 """ 

156 

157 @abstractmethod 1a

158 async def fetch_result(self) -> T: 1a

159 """ 

160 Retrieve the results of the job run and return them. 

161 """ 

162 

163 

164class JobBlock(Block, ABC, Generic[T]): 1a

165 """ 

166 Block that represents an entity in an external service 

167 that can trigger a long running execution. 

168 """ 

169 

170 @property 1a

171 def logger(self) -> LoggerOrAdapter: 1a

172 """ 

173 Returns a logger based on whether the JobBlock 

174 is called from within a flow or task run context. 

175 If a run context is present, the logger property returns a run logger. 

176 Else, it returns a default logger labeled with the class's name. 

177 

178 Returns: 

179 The run logger or a default logger with the class's name. 

180 """ 

181 try: 

182 return get_run_logger() 

183 except MissingContextError: 

184 return get_logger(self.__class__.__name__) 

185 

186 @abstractmethod 1a

187 async def trigger(self) -> JobRun[T]: 1a

188 """ 

189 Triggers a job run in an external service and returns a JobRun object 

190 to track the execution of the run. 

191 """ 

192 

193 

194# TODO: This interface is heavily influenced by 

195# [PEP 249](https://peps.python.org/pep-0249/) 

196# Primarily intended for use with relational databases. 

197# A separate interface may be necessary for 

198# non relational databases. 

199class DatabaseBlock(Block, ABC): 1a

200 """ 

201 An abstract block type that represents a database and 

202 provides an interface for interacting with it. 

203 

204 Blocks that implement this interface have the option to accept 

205 credentials directly via attributes or via a nested `CredentialsBlock`. 

206 

207 Use of a nested credentials block is recommended unless credentials 

208 are tightly coupled to database connection configuration. 

209 

210 Implementing either sync or async context management on `DatabaseBlock` 

211 implementations is recommended. 

212 """ 

213 

214 @property 1a

215 def logger(self) -> LoggerOrAdapter: 1a

216 """ 

217 Returns a logger based on whether the DatabaseBlock 

218 is called from within a flow or task run context. 

219 If a run context is present, the logger property returns a run logger. 

220 Else, it returns a default logger labeled with the class's name. 

221 

222 Returns: 

223 The run logger or a default logger with the class's name. 

224 """ 

225 try: 

226 return get_run_logger() 

227 except MissingContextError: 

228 return get_logger(self.__class__.__name__) 

229 

230 @abstractmethod 1a

231 async def fetch_one( 1a

232 self, 

233 operation: str, 

234 parameters: dict[str, Any] | None = None, 

235 **execution_kwargs: Any, 

236 ) -> tuple[Any, ...]: 

237 """ 

238 Fetch a single result from the database. 

239 

240 Args: 

241 operation: The SQL query or other operation to be executed. 

242 parameters: The parameters for the operation. 

243 **execution_kwargs: Additional keyword arguments to pass to execute. 

244 

245 Returns: 

246 A list of tuples containing the data returned by the database, 

247 where each row is a tuple and each column is a value in the tuple. 

248 """ 

249 

250 @abstractmethod 1a

251 async def fetch_many( 1a

252 self, 

253 operation: str, 

254 parameters: dict[str, Any] | None = None, 

255 size: int | None = None, 

256 **execution_kwargs: Any, 

257 ) -> list[tuple[Any, ...]]: 

258 """ 

259 Fetch a limited number of results from the database. 

260 

261 Args: 

262 operation: The SQL query or other operation to be executed. 

263 parameters: The parameters for the operation. 

264 size: The number of results to return. 

265 **execution_kwargs: Additional keyword arguments to pass to execute. 

266 

267 Returns: 

268 A list of tuples containing the data returned by the database, 

269 where each row is a tuple and each column is a value in the tuple. 

270 """ 

271 

272 @abstractmethod 1a

273 async def fetch_all( 1a

274 self, 

275 operation: str, 

276 parameters: dict[str, Any] | None = None, 

277 **execution_kwargs: Any, 

278 ) -> list[tuple[Any, ...]]: 

279 """ 

280 Fetch all results from the database. 

281 

282 Args: 

283 operation: The SQL query or other operation to be executed. 

284 parameters: The parameters for the operation. 

285 **execution_kwargs: Additional keyword arguments to pass to execute. 

286 

287 Returns: 

288 A list of tuples containing the data returned by the database, 

289 where each row is a tuple and each column is a value in the tuple. 

290 """ 

291 

292 @abstractmethod 1a

293 async def execute( 1a

294 self, 

295 operation: str, 

296 parameters: dict[str, Any] | None = None, 

297 **execution_kwargs: Any, 

298 ) -> None: 

299 """ 

300 Executes an operation on the database. This method is intended to be used 

301 for operations that do not return data, such as INSERT, UPDATE, or DELETE. 

302 

303 Args: 

304 operation: The SQL query or other operation to be executed. 

305 parameters: The parameters for the operation. 

306 **execution_kwargs: Additional keyword arguments to pass to execute. 

307 """ 

308 

309 @abstractmethod 1a

310 async def execute_many( 1a

311 self, 

312 operation: str, 

313 seq_of_parameters: list[dict[str, Any]], 

314 **execution_kwargs: Any, 

315 ) -> None: 

316 """ 

317 Executes multiple operations on the database. This method is intended to be used 

318 for operations that do not return data, such as INSERT, UPDATE, or DELETE. 

319 

320 Args: 

321 operation: The SQL query or other operation to be executed. 

322 seq_of_parameters: The sequence of parameters for the operation. 

323 **execution_kwargs: Additional keyword arguments to pass to execute. 

324 """ 

325 

326 # context management methods are not abstract methods because 

327 # they are not supported by all database drivers 

328 async def __aenter__(self) -> Self: 1a

329 """ 

330 Context management method for async databases. 

331 """ 

332 raise NotImplementedError( 

333 f"{self.__class__.__name__} does not support async context management." 

334 ) 

335 

336 async def __aexit__(self, *args: Any) -> None: 1a

337 """ 

338 Context management method for async databases. 

339 """ 

340 raise NotImplementedError( 

341 f"{self.__class__.__name__} does not support async context management." 

342 ) 

343 

344 def __enter__(self) -> Self: 1a

345 """ 

346 Context management method for databases. 

347 """ 

348 raise NotImplementedError( 

349 f"{self.__class__.__name__} does not support context management." 

350 ) 

351 

352 def __exit__(self, *args: Any) -> None: 1a

353 """ 

354 Context management method for databases. 

355 """ 

356 raise NotImplementedError( 

357 f"{self.__class__.__name__} does not support context management." 

358 ) 

359 

360 

361class ObjectStorageBlock(Block, ABC): 1a

362 """ 

363 Block that represents a resource in an external service that can store 

364 objects. 

365 """ 

366 

367 @property 1a

368 def logger(self) -> LoggerOrAdapter: 1a

369 """ 

370 Returns a logger based on whether the ObjectStorageBlock 

371 is called from within a flow or task run context. 

372 If a run context is present, the logger property returns a run logger. 

373 Else, it returns a default logger labeled with the class's name. 

374 

375 Returns: 

376 The run logger or a default logger with the class's name. 

377 """ 

378 try: 

379 return get_run_logger() 

380 except MissingContextError: 

381 return get_logger(self.__class__.__name__) 

382 

383 @abstractmethod 1a

384 async def download_object_to_path( 1a

385 self, 

386 from_path: str, 

387 to_path: str | Path, 

388 **download_kwargs: Any, 

389 ) -> Path: 

390 """ 

391 Downloads an object from the object storage service to a path. 

392 

393 Args: 

394 from_path: The path to download from. 

395 to_path: The path to download to. 

396 **download_kwargs: Additional keyword arguments to pass to download. 

397 

398 Returns: 

399 The path that the object was downloaded to. 

400 """ 

401 

402 @abstractmethod 1a

403 async def download_object_to_file_object( 1a

404 self, 

405 from_path: str, 

406 to_file_object: BinaryIO, 

407 **download_kwargs: Any, 

408 ) -> BinaryIO: 

409 """ 

410 Downloads an object from the object storage service to a file-like object, 

411 which can be a BytesIO object or a BufferedWriter. 

412 

413 Args: 

414 from_path: The path to download from. 

415 to_file_object: The file-like object to download to. 

416 **download_kwargs: Additional keyword arguments to pass to download. 

417 

418 Returns: 

419 The file-like object that the object was downloaded to. 

420 """ 

421 

422 @abstractmethod 1a

423 async def download_folder_to_path( 1a

424 self, 

425 from_folder: str, 

426 to_folder: str | Path, 

427 **download_kwargs: Any, 

428 ) -> Path: 

429 """ 

430 Downloads a folder from the object storage service to a path. 

431 

432 Args: 

433 from_folder: The path to the folder to download from. 

434 to_folder: The path to download the folder to. 

435 **download_kwargs: Additional keyword arguments to pass to download. 

436 

437 Returns: 

438 The path that the folder was downloaded to. 

439 """ 

440 

441 @abstractmethod 1a

442 async def upload_from_path( 1a

443 self, from_path: str | Path, to_path: str, **upload_kwargs: Any 

444 ) -> str: 

445 """ 

446 Uploads an object from a path to the object storage service. 

447 

448 Args: 

449 from_path: The path to the file to upload from. 

450 to_path: The path to upload the file to. 

451 **upload_kwargs: Additional keyword arguments to pass to upload. 

452 

453 Returns: 

454 The path that the object was uploaded to. 

455 """ 

456 

457 @abstractmethod 1a

458 async def upload_from_file_object( 1a

459 self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Any 

460 ) -> str: 

461 """ 

462 Uploads an object to the object storage service from a file-like object, 

463 which can be a BytesIO object or a BufferedReader. 

464 

465 Args: 

466 from_file_object: The file-like object to upload from. 

467 to_path: The path to upload the object to. 

468 **upload_kwargs: Additional keyword arguments to pass to upload. 

469 

470 Returns: 

471 The path that the object was uploaded to. 

472 """ 

473 

474 @abstractmethod 1a

475 async def upload_from_folder( 1a

476 self, 

477 from_folder: str | Path, 

478 to_folder: str, 

479 **upload_kwargs: Any, 

480 ) -> str: 

481 """ 

482 Uploads a folder to the object storage service from a path. 

483 

484 Args: 

485 from_folder: The path to the folder to upload from. 

486 to_folder: The path to upload the folder to. 

487 **upload_kwargs: Additional keyword arguments to pass to upload. 

488 

489 Returns: 

490 The path that the folder was uploaded to. 

491 """ 

492 

493 

494class SecretBlock(Block, ABC): 1a

495 """ 

496 Block that represents a resource that can store and retrieve secrets. 

497 """ 

498 

499 @property 1a

500 def logger(self) -> LoggerOrAdapter: 1a

501 """ 

502 Returns a logger based on whether the SecretBlock 

503 is called from within a flow or task run context. 

504 If a run context is present, the logger property returns a run logger. 

505 Else, it returns a default logger labeled with the class's name. 

506 

507 Returns: 

508 The run logger or a default logger with the class's name. 

509 """ 

510 try: 

511 return get_run_logger() 

512 except MissingContextError: 

513 return get_logger(self.__class__.__name__) 

514 

515 @abstractmethod 1a

516 async def read_secret(self) -> bytes: 1a

517 """ 

518 Reads the configured secret from the secret storage service. 

519 

520 Returns: 

521 The secret data. 

522 """ 

523 

524 @abstractmethod 1a

525 async def write_secret(self, secret_data: bytes) -> str: 1a

526 """ 

527 Writes secret data to the configured secret in the secret storage service. 

528 

529 Args: 

530 secret_data: The secret data to write. 

531 

532 Returns: 

533 The key of the secret that can be used for retrieval. 

534 """