Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/redis.py: 0%
71 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations
3from contextlib import asynccontextmanager
4from pathlib import Path
5from typing import AsyncGenerator, Optional
7try:
8 import redis
9 import redis.asyncio as aioredis
10except ImportError:
11 raise ImportError(
12 "`redis-py` must be installed to use the `RedisStorageContainer` block. "
13 "You can install it with `pip install redis>=5.0.1`"
14 )
16from pydantic import Field, HttpUrl
17from pydantic.types import SecretStr
18from typing_extensions import Self
20from prefect._internal.compatibility.async_dispatch import async_dispatch
21from prefect.filesystems import WritableFileSystem
24class RedisStorageContainer(WritableFileSystem):
25 """
26 Block used to interact with Redis as a filesystem
28 Attributes:
29 host (str): The value to store.
30 port (int): The value to store.
31 db (int): The value to store.
32 username (str): The value to store.
33 password (str): The value to store.
34 connection_string (str): The value to store.
36 Example:
37 Create a new block from hostname, username and password:
38 ```python
39 from prefect.blocks.redis import RedisStorageContainer
41 block = RedisStorageContainer.from_host(
42 host="myredishost.com", username="redis", password="SuperSecret")
43 block.save("BLOCK_NAME")
44 ```
46 Create a new block from a connection string
47 ```python
48 from prefect.blocks.redis import RedisStorageContainer
49 block = RedisStorageContainer.from_url(""redis://redis:SuperSecret@myredishost.com:6379")
50 block.save("BLOCK_NAME")
51 ```
52 """
54 _logo_url = HttpUrl(
55 "https://cdn.sanity.io/images/3ugk85nk/production/dfb02cfce09ce3ca88fea097659a83554dd7a850-596x512.png"
56 )
58 host: Optional[str] = Field(default=None, description="Redis hostname")
59 port: int = Field(default=6379, description="Redis port")
60 db: int = Field(default=0, description="Redis DB index")
61 username: Optional[SecretStr] = Field(default=None, description="Redis username")
62 password: Optional[SecretStr] = Field(default=None, description="Redis password")
63 connection_string: Optional[SecretStr] = Field(
64 default=None, description="Redis connection string"
65 )
67 def block_initialization(self) -> None:
68 if self.connection_string:
69 return
70 if not self.host:
71 raise ValueError("Initialization error: 'host' is required but missing.")
72 if self.username and not self.password:
73 raise ValueError(
74 "Initialization error: 'username' is provided, but 'password' is missing. Both are required."
75 )
77 async def aread_path(self, path: Path | str) -> Optional[bytes]:
78 """Read the redis content at `path`
80 Args:
81 path: Redis key to read from
83 Returns:
84 Contents at key as bytes, or None if key does not exist
85 """
86 async with self._client() as client:
87 return await client.get(str(path))
89 @async_dispatch(aread_path)
90 def read_path(self, path: Path | str) -> Optional[bytes]:
91 """Read the redis content at `path`
93 Args:
94 path: Redis key to read from
96 Returns:
97 Contents at key as bytes, or None if key does not exist
98 """
99 if self.connection_string:
100 client = redis.Redis.from_url(self.connection_string.get_secret_value())
101 else:
102 assert self.host
103 client = redis.Redis(
104 host=self.host,
105 port=self.port,
106 username=self.username.get_secret_value() if self.username else None,
107 password=self.password.get_secret_value() if self.password else None,
108 db=self.db,
109 )
111 try:
112 return client.get(str(path))
113 finally:
114 client.close()
116 async def awrite_path(self, path: Path | str, content: bytes) -> bool:
117 """Write `content` to the redis at `path`
119 Args:
120 path: Redis key to write to
121 content: Binary object to write
123 Returns:
124 True if the key was set successfully
125 """
127 async with self._client() as client:
128 return await client.set(str(path), content)
130 @async_dispatch(awrite_path)
131 def write_path(self, path: Path | str, content: bytes) -> bool:
132 """Write `content` to the redis at `path`
134 Args:
135 path: Redis key to write to
136 content: Binary object to write
138 Returns:
139 True if the key was set successfully
140 """
141 if self.connection_string:
142 client = redis.Redis.from_url(self.connection_string.get_secret_value())
143 else:
144 assert self.host
145 client = redis.Redis(
146 host=self.host,
147 port=self.port,
148 username=self.username.get_secret_value() if self.username else None,
149 password=self.password.get_secret_value() if self.password else None,
150 db=self.db,
151 )
153 try:
154 return client.set(str(path), content)
155 finally:
156 client.close()
158 @asynccontextmanager
159 async def _client(self) -> AsyncGenerator[aioredis.Redis, None]:
160 if self.connection_string:
161 client = aioredis.Redis.from_url(self.connection_string.get_secret_value()) # pyright: ignore[reportUnknownMemberType] incomplete typing for redis-py
162 else:
163 assert self.host
164 client = aioredis.Redis(
165 host=self.host,
166 port=self.port,
167 username=self.username.get_secret_value() if self.username else None,
168 password=self.password.get_secret_value() if self.password else None,
169 db=self.db,
170 )
172 try:
173 yield client
174 finally:
175 await client.aclose()
177 @classmethod
178 def from_host(
179 cls,
180 host: str,
181 port: int = 6379,
182 db: int = 0,
183 username: None | str | SecretStr = None,
184 password: None | str | SecretStr = None,
185 ) -> Self:
186 """Create block from hostname, username and password
188 Args:
189 host: Redis hostname
190 username: Redis username
191 password: Redis password
192 port: Redis port
194 Returns:
195 `RedisStorageContainer` instance
196 """
198 username = SecretStr(username) if isinstance(username, str) else username
199 password = SecretStr(password) if isinstance(password, str) else password
201 return cls(host=host, port=port, db=db, username=username, password=password)
203 @classmethod
204 def from_connection_string(cls, connection_string: str | SecretStr) -> Self:
205 """Create block from a Redis connection string
207 Supports the following URL schemes:
208 - `redis://` creates a TCP socket connection
209 - `rediss://` creates a SSL wrapped TCP socket connection
210 - `unix://` creates a Unix Domain Socket connection
212 See [Redis docs](https://redis.readthedocs.io/en/stable/examples
213 /connection_examples.html#Connecting-to-Redis-instances-by-specifying-a-URL
214 -scheme.) for more info.
216 Args:
217 connection_string: Redis connection string
219 Returns:
220 `RedisStorageContainer` instance
221 """
223 connection_string = (
224 SecretStr(connection_string)
225 if isinstance(connection_string, str)
226 else connection_string
227 )
229 return cls(connection_string=connection_string)