Coverage for /usr/local/lib/python3.12/site-packages/prefect/locking/filesystem.py: 0%

136 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import datetime 

2import time 

3from datetime import timedelta 

4from logging import Logger 

5from pathlib import Path 

6from typing import Optional 

7 

8import anyio 

9import pydantic_core 

10from typing_extensions import TypedDict 

11 

12from prefect.logging.loggers import get_logger 

13from prefect.types._datetime import now, parse_datetime 

14 

15from .protocol import LockManager 

16 

17logger: Logger = get_logger(__name__) 

18 

19 

20class _LockInfo(TypedDict): 

21 """ 

22 A dictionary containing information about a lock. 

23 

24 Attributes: 

25 holder: The holder of the lock. 

26 expiration: Datetime when the lock expires. 

27 path: Path to the lock file. 

28 """ 

29 

30 holder: str 

31 expiration: Optional[datetime.datetime] 

32 path: Path 

33 

34 

35class FileSystemLockManager(LockManager): 

36 """ 

37 A lock manager that implements locking using local files. 

38 

39 Attributes: 

40 lock_files_directory: the directory where lock files are stored 

41 """ 

42 

43 def __init__(self, lock_files_directory: Path) -> None: 

44 self.lock_files_directory: Path = lock_files_directory.expanduser().resolve() 

45 self._locks: dict[str, _LockInfo] = {} 

46 

47 def _ensure_lock_files_directory_exists(self) -> None: 

48 self.lock_files_directory.mkdir(parents=True, exist_ok=True) 

49 

50 def _lock_path_for_key(self, key: str) -> Path: 

51 if (lock_info := self._locks.get(key)) is not None: 

52 return lock_info["path"] 

53 return self.lock_files_directory.joinpath(key).with_suffix(".lock") 

54 

55 def _get_lock_info(self, key: str, use_cache: bool = True) -> Optional[_LockInfo]: 

56 if use_cache: 

57 if (lock_info := self._locks.get(key)) is not None: 

58 return lock_info 

59 

60 lock_path = self._lock_path_for_key(key) 

61 

62 try: 

63 with open(lock_path, "rb") as lock_file: 

64 lock_info = pydantic_core.from_json(lock_file.read()) 

65 lock_info["path"] = lock_path 

66 expiration = lock_info.get("expiration") 

67 lock_info["expiration"] = ( 

68 parse_datetime(expiration) if expiration is not None else None 

69 ) 

70 self._locks[key] = lock_info 

71 return lock_info 

72 except FileNotFoundError: 

73 return None 

74 

75 async def _aget_lock_info( 

76 self, key: str, use_cache: bool = True 

77 ) -> Optional[_LockInfo]: 

78 if use_cache: 

79 if (lock_info := self._locks.get(key)) is not None: 

80 return lock_info 

81 

82 lock_path = self._lock_path_for_key(key) 

83 

84 try: 

85 lock_info_bytes = await anyio.Path(lock_path).read_bytes() 

86 lock_info = pydantic_core.from_json(lock_info_bytes) 

87 lock_info["path"] = lock_path 

88 expiration = lock_info.get("expiration") 

89 lock_info["expiration"] = ( 

90 parse_datetime(expiration) if expiration is not None else None 

91 ) 

92 self._locks[key] = lock_info 

93 return lock_info 

94 except FileNotFoundError: 

95 return None 

96 

97 def acquire_lock( 

98 self, 

99 key: str, 

100 holder: str, 

101 acquire_timeout: Optional[float] = None, 

102 hold_timeout: Optional[float] = None, 

103 ) -> bool: 

104 self._ensure_lock_files_directory_exists() 

105 lock_path = self._lock_path_for_key(key) 

106 

107 if self.is_locked(key) and not self.is_lock_holder(key, holder): 

108 lock_free = self.wait_for_lock(key, acquire_timeout) 

109 if not lock_free: 

110 return False 

111 

112 try: 

113 Path(lock_path).touch(exist_ok=False) 

114 except FileExistsError: 

115 if not self.is_lock_holder(key, holder): 

116 logger.debug( 

117 f"Another actor acquired the lock for record with key {key}. Trying again." 

118 ) 

119 return self.acquire_lock(key, holder, acquire_timeout, hold_timeout) 

120 expiration = ( 

121 now("UTC") + timedelta(seconds=hold_timeout) 

122 if hold_timeout is not None 

123 else None 

124 ) 

125 

126 with open(Path(lock_path), "wb") as lock_file: 

127 lock_file.write( 

128 pydantic_core.to_json( 

129 { 

130 "holder": holder, 

131 "expiration": str(expiration) 

132 if expiration is not None 

133 else None, 

134 }, 

135 ) 

136 ) 

137 

138 self._locks[key] = { 

139 "holder": holder, 

140 "expiration": expiration, 

141 "path": lock_path, 

142 } 

143 

144 return True 

145 

146 async def aacquire_lock( 

147 self, 

148 key: str, 

149 holder: str, 

150 acquire_timeout: Optional[float] = None, 

151 hold_timeout: Optional[float] = None, 

152 ) -> bool: 

153 await anyio.Path(self.lock_files_directory).mkdir(parents=True, exist_ok=True) 

154 lock_path = self._lock_path_for_key(key) 

155 

156 if self.is_locked(key) and not self.is_lock_holder(key, holder): 

157 lock_free = await self.await_for_lock(key, acquire_timeout) 

158 if not lock_free: 

159 return False 

160 

161 try: 

162 await anyio.Path(lock_path).touch(exist_ok=False) 

163 except FileExistsError: 

164 if not self.is_lock_holder(key, holder): 

165 logger.debug( 

166 f"Another actor acquired the lock for record with key {key}. Trying again." 

167 ) 

168 return self.acquire_lock(key, holder, acquire_timeout, hold_timeout) 

169 expiration = ( 

170 now("UTC") + timedelta(seconds=hold_timeout) 

171 if hold_timeout is not None 

172 else None 

173 ) 

174 

175 async with await anyio.Path(lock_path).open("wb") as lock_file: 

176 await lock_file.write( 

177 pydantic_core.to_json( 

178 { 

179 "holder": holder, 

180 "expiration": str(expiration) 

181 if expiration is not None 

182 else None, 

183 }, 

184 ) 

185 ) 

186 

187 self._locks[key] = { 

188 "holder": holder, 

189 "expiration": expiration, 

190 "path": lock_path, 

191 } 

192 

193 return True 

194 

195 def release_lock(self, key: str, holder: str) -> None: 

196 lock_path = self._lock_path_for_key(key) 

197 if not self.is_locked(key): 

198 raise ValueError(f"No lock for transaction with key {key}") 

199 if self.is_lock_holder(key, holder): 

200 Path(lock_path).unlink(missing_ok=True) 

201 self._locks.pop(key, None) 

202 else: 

203 raise ValueError(f"No lock held by {holder} for transaction with key {key}") 

204 

205 def is_locked(self, key: str, use_cache: bool = False) -> bool: 

206 if (lock_info := self._get_lock_info(key, use_cache=use_cache)) is None: 

207 return False 

208 

209 if (expiration := lock_info.get("expiration")) is None: 

210 return True 

211 

212 expired = expiration < now("UTC") 

213 if expired: 

214 Path(lock_info["path"]).unlink() 

215 self._locks.pop(key, None) 

216 return False 

217 else: 

218 return True 

219 

220 def is_lock_holder(self, key: str, holder: str) -> bool: 

221 if not self.is_locked(key): 

222 return False 

223 

224 if not self.is_locked(key): 

225 return False 

226 if (lock_info := self._get_lock_info(key)) is None: 

227 return False 

228 return lock_info["holder"] == holder 

229 

230 def wait_for_lock(self, key: str, timeout: Optional[float] = None) -> bool: 

231 seconds_waited = 0 

232 while self.is_locked(key, use_cache=False): 

233 if timeout and seconds_waited >= timeout: 

234 return False 

235 seconds_waited += 0.1 

236 time.sleep(0.1) 

237 return True 

238 

239 async def await_for_lock(self, key: str, timeout: Optional[float] = None) -> bool: 

240 seconds_waited = 0 

241 while self.is_locked(key, use_cache=False): 

242 if timeout and seconds_waited >= timeout: 

243 return False 

244 seconds_waited += 0.1 

245 await anyio.sleep(0.1) 

246 return True