Coverage for /usr/local/lib/python3.12/site-packages/prefect/blocks/redis.py: 0%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 

2 

3from contextlib import asynccontextmanager 

4from pathlib import Path 

5from typing import AsyncGenerator, Optional 

6 

7try: 

8 import redis 

9 import redis.asyncio as aioredis 

10except ImportError: 

11 raise ImportError( 

12 "`redis-py` must be installed to use the `RedisStorageContainer` block. " 

13 "You can install it with `pip install redis>=5.0.1`" 

14 ) 

15 

16from pydantic import Field, HttpUrl 

17from pydantic.types import SecretStr 

18from typing_extensions import Self 

19 

20from prefect._internal.compatibility.async_dispatch import async_dispatch 

21from prefect.filesystems import WritableFileSystem 

22 

23 

24class RedisStorageContainer(WritableFileSystem): 

25 """ 

26 Block used to interact with Redis as a filesystem 

27 

28 Attributes: 

29 host (str): The value to store. 

30 port (int): The value to store. 

31 db (int): The value to store. 

32 username (str): The value to store. 

33 password (str): The value to store. 

34 connection_string (str): The value to store. 

35 

36 Example: 

37 Create a new block from hostname, username and password: 

38 ```python 

39 from prefect.blocks.redis import RedisStorageContainer 

40 

41 block = RedisStorageContainer.from_host( 

42 host="myredishost.com", username="redis", password="SuperSecret") 

43 block.save("BLOCK_NAME") 

44 ``` 

45 

46 Create a new block from a connection string 

47 ```python 

48 from prefect.blocks.redis import RedisStorageContainer 

49 block = RedisStorageContainer.from_url(""redis://redis:SuperSecret@myredishost.com:6379") 

50 block.save("BLOCK_NAME") 

51 ``` 

52 """ 

53 

54 _logo_url = HttpUrl( 

55 "https://cdn.sanity.io/images/3ugk85nk/production/dfb02cfce09ce3ca88fea097659a83554dd7a850-596x512.png" 

56 ) 

57 

58 host: Optional[str] = Field(default=None, description="Redis hostname") 

59 port: int = Field(default=6379, description="Redis port") 

60 db: int = Field(default=0, description="Redis DB index") 

61 username: Optional[SecretStr] = Field(default=None, description="Redis username") 

62 password: Optional[SecretStr] = Field(default=None, description="Redis password") 

63 connection_string: Optional[SecretStr] = Field( 

64 default=None, description="Redis connection string" 

65 ) 

66 

67 def block_initialization(self) -> None: 

68 if self.connection_string: 

69 return 

70 if not self.host: 

71 raise ValueError("Initialization error: 'host' is required but missing.") 

72 if self.username and not self.password: 

73 raise ValueError( 

74 "Initialization error: 'username' is provided, but 'password' is missing. Both are required." 

75 ) 

76 

77 async def aread_path(self, path: Path | str) -> Optional[bytes]: 

78 """Read the redis content at `path` 

79 

80 Args: 

81 path: Redis key to read from 

82 

83 Returns: 

84 Contents at key as bytes, or None if key does not exist 

85 """ 

86 async with self._client() as client: 

87 return await client.get(str(path)) 

88 

89 @async_dispatch(aread_path) 

90 def read_path(self, path: Path | str) -> Optional[bytes]: 

91 """Read the redis content at `path` 

92 

93 Args: 

94 path: Redis key to read from 

95 

96 Returns: 

97 Contents at key as bytes, or None if key does not exist 

98 """ 

99 if self.connection_string: 

100 client = redis.Redis.from_url(self.connection_string.get_secret_value()) 

101 else: 

102 assert self.host 

103 client = redis.Redis( 

104 host=self.host, 

105 port=self.port, 

106 username=self.username.get_secret_value() if self.username else None, 

107 password=self.password.get_secret_value() if self.password else None, 

108 db=self.db, 

109 ) 

110 

111 try: 

112 return client.get(str(path)) 

113 finally: 

114 client.close() 

115 

116 async def awrite_path(self, path: Path | str, content: bytes) -> bool: 

117 """Write `content` to the redis at `path` 

118 

119 Args: 

120 path: Redis key to write to 

121 content: Binary object to write 

122 

123 Returns: 

124 True if the key was set successfully 

125 """ 

126 

127 async with self._client() as client: 

128 return await client.set(str(path), content) 

129 

130 @async_dispatch(awrite_path) 

131 def write_path(self, path: Path | str, content: bytes) -> bool: 

132 """Write `content` to the redis at `path` 

133 

134 Args: 

135 path: Redis key to write to 

136 content: Binary object to write 

137 

138 Returns: 

139 True if the key was set successfully 

140 """ 

141 if self.connection_string: 

142 client = redis.Redis.from_url(self.connection_string.get_secret_value()) 

143 else: 

144 assert self.host 

145 client = redis.Redis( 

146 host=self.host, 

147 port=self.port, 

148 username=self.username.get_secret_value() if self.username else None, 

149 password=self.password.get_secret_value() if self.password else None, 

150 db=self.db, 

151 ) 

152 

153 try: 

154 return client.set(str(path), content) 

155 finally: 

156 client.close() 

157 

158 @asynccontextmanager 

159 async def _client(self) -> AsyncGenerator[aioredis.Redis, None]: 

160 if self.connection_string: 

161 client = aioredis.Redis.from_url(self.connection_string.get_secret_value()) # pyright: ignore[reportUnknownMemberType] incomplete typing for redis-py 

162 else: 

163 assert self.host 

164 client = aioredis.Redis( 

165 host=self.host, 

166 port=self.port, 

167 username=self.username.get_secret_value() if self.username else None, 

168 password=self.password.get_secret_value() if self.password else None, 

169 db=self.db, 

170 ) 

171 

172 try: 

173 yield client 

174 finally: 

175 await client.aclose() 

176 

177 @classmethod 

178 def from_host( 

179 cls, 

180 host: str, 

181 port: int = 6379, 

182 db: int = 0, 

183 username: None | str | SecretStr = None, 

184 password: None | str | SecretStr = None, 

185 ) -> Self: 

186 """Create block from hostname, username and password 

187 

188 Args: 

189 host: Redis hostname 

190 username: Redis username 

191 password: Redis password 

192 port: Redis port 

193 

194 Returns: 

195 `RedisStorageContainer` instance 

196 """ 

197 

198 username = SecretStr(username) if isinstance(username, str) else username 

199 password = SecretStr(password) if isinstance(password, str) else password 

200 

201 return cls(host=host, port=port, db=db, username=username, password=password) 

202 

203 @classmethod 

204 def from_connection_string(cls, connection_string: str | SecretStr) -> Self: 

205 """Create block from a Redis connection string 

206 

207 Supports the following URL schemes: 

208 - `redis://` creates a TCP socket connection 

209 - `rediss://` creates a SSL wrapped TCP socket connection 

210 - `unix://` creates a Unix Domain Socket connection 

211 

212 See [Redis docs](https://redis.readthedocs.io/en/stable/examples 

213 /connection_examples.html#Connecting-to-Redis-instances-by-specifying-a-URL 

214 -scheme.) for more info. 

215 

216 Args: 

217 connection_string: Redis connection string 

218 

219 Returns: 

220 `RedisStorageContainer` instance 

221 """ 

222 

223 connection_string = ( 

224 SecretStr(connection_string) 

225 if isinstance(connection_string, str) 

226 else connection_string 

227 ) 

228 

229 return cls(connection_string=connection_string)