Coverage for /usr/local/lib/python3.12/site-packages/prefect/locking/filesystem.py: 0%
136 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import datetime
2import time
3from datetime import timedelta
4from logging import Logger
5from pathlib import Path
6from typing import Optional
8import anyio
9import pydantic_core
10from typing_extensions import TypedDict
12from prefect.logging.loggers import get_logger
13from prefect.types._datetime import now, parse_datetime
15from .protocol import LockManager
17logger: Logger = get_logger(__name__)
20class _LockInfo(TypedDict):
21 """
22 A dictionary containing information about a lock.
24 Attributes:
25 holder: The holder of the lock.
26 expiration: Datetime when the lock expires.
27 path: Path to the lock file.
28 """
30 holder: str
31 expiration: Optional[datetime.datetime]
32 path: Path
35class FileSystemLockManager(LockManager):
36 """
37 A lock manager that implements locking using local files.
39 Attributes:
40 lock_files_directory: the directory where lock files are stored
41 """
43 def __init__(self, lock_files_directory: Path) -> None:
44 self.lock_files_directory: Path = lock_files_directory.expanduser().resolve()
45 self._locks: dict[str, _LockInfo] = {}
47 def _ensure_lock_files_directory_exists(self) -> None:
48 self.lock_files_directory.mkdir(parents=True, exist_ok=True)
50 def _lock_path_for_key(self, key: str) -> Path:
51 if (lock_info := self._locks.get(key)) is not None:
52 return lock_info["path"]
53 return self.lock_files_directory.joinpath(key).with_suffix(".lock")
55 def _get_lock_info(self, key: str, use_cache: bool = True) -> Optional[_LockInfo]:
56 if use_cache:
57 if (lock_info := self._locks.get(key)) is not None:
58 return lock_info
60 lock_path = self._lock_path_for_key(key)
62 try:
63 with open(lock_path, "rb") as lock_file:
64 lock_info = pydantic_core.from_json(lock_file.read())
65 lock_info["path"] = lock_path
66 expiration = lock_info.get("expiration")
67 lock_info["expiration"] = (
68 parse_datetime(expiration) if expiration is not None else None
69 )
70 self._locks[key] = lock_info
71 return lock_info
72 except FileNotFoundError:
73 return None
75 async def _aget_lock_info(
76 self, key: str, use_cache: bool = True
77 ) -> Optional[_LockInfo]:
78 if use_cache:
79 if (lock_info := self._locks.get(key)) is not None:
80 return lock_info
82 lock_path = self._lock_path_for_key(key)
84 try:
85 lock_info_bytes = await anyio.Path(lock_path).read_bytes()
86 lock_info = pydantic_core.from_json(lock_info_bytes)
87 lock_info["path"] = lock_path
88 expiration = lock_info.get("expiration")
89 lock_info["expiration"] = (
90 parse_datetime(expiration) if expiration is not None else None
91 )
92 self._locks[key] = lock_info
93 return lock_info
94 except FileNotFoundError:
95 return None
97 def acquire_lock(
98 self,
99 key: str,
100 holder: str,
101 acquire_timeout: Optional[float] = None,
102 hold_timeout: Optional[float] = None,
103 ) -> bool:
104 self._ensure_lock_files_directory_exists()
105 lock_path = self._lock_path_for_key(key)
107 if self.is_locked(key) and not self.is_lock_holder(key, holder):
108 lock_free = self.wait_for_lock(key, acquire_timeout)
109 if not lock_free:
110 return False
112 try:
113 Path(lock_path).touch(exist_ok=False)
114 except FileExistsError:
115 if not self.is_lock_holder(key, holder):
116 logger.debug(
117 f"Another actor acquired the lock for record with key {key}. Trying again."
118 )
119 return self.acquire_lock(key, holder, acquire_timeout, hold_timeout)
120 expiration = (
121 now("UTC") + timedelta(seconds=hold_timeout)
122 if hold_timeout is not None
123 else None
124 )
126 with open(Path(lock_path), "wb") as lock_file:
127 lock_file.write(
128 pydantic_core.to_json(
129 {
130 "holder": holder,
131 "expiration": str(expiration)
132 if expiration is not None
133 else None,
134 },
135 )
136 )
138 self._locks[key] = {
139 "holder": holder,
140 "expiration": expiration,
141 "path": lock_path,
142 }
144 return True
146 async def aacquire_lock(
147 self,
148 key: str,
149 holder: str,
150 acquire_timeout: Optional[float] = None,
151 hold_timeout: Optional[float] = None,
152 ) -> bool:
153 await anyio.Path(self.lock_files_directory).mkdir(parents=True, exist_ok=True)
154 lock_path = self._lock_path_for_key(key)
156 if self.is_locked(key) and not self.is_lock_holder(key, holder):
157 lock_free = await self.await_for_lock(key, acquire_timeout)
158 if not lock_free:
159 return False
161 try:
162 await anyio.Path(lock_path).touch(exist_ok=False)
163 except FileExistsError:
164 if not self.is_lock_holder(key, holder):
165 logger.debug(
166 f"Another actor acquired the lock for record with key {key}. Trying again."
167 )
168 return self.acquire_lock(key, holder, acquire_timeout, hold_timeout)
169 expiration = (
170 now("UTC") + timedelta(seconds=hold_timeout)
171 if hold_timeout is not None
172 else None
173 )
175 async with await anyio.Path(lock_path).open("wb") as lock_file:
176 await lock_file.write(
177 pydantic_core.to_json(
178 {
179 "holder": holder,
180 "expiration": str(expiration)
181 if expiration is not None
182 else None,
183 },
184 )
185 )
187 self._locks[key] = {
188 "holder": holder,
189 "expiration": expiration,
190 "path": lock_path,
191 }
193 return True
195 def release_lock(self, key: str, holder: str) -> None:
196 lock_path = self._lock_path_for_key(key)
197 if not self.is_locked(key):
198 raise ValueError(f"No lock for transaction with key {key}")
199 if self.is_lock_holder(key, holder):
200 Path(lock_path).unlink(missing_ok=True)
201 self._locks.pop(key, None)
202 else:
203 raise ValueError(f"No lock held by {holder} for transaction with key {key}")
205 def is_locked(self, key: str, use_cache: bool = False) -> bool:
206 if (lock_info := self._get_lock_info(key, use_cache=use_cache)) is None:
207 return False
209 if (expiration := lock_info.get("expiration")) is None:
210 return True
212 expired = expiration < now("UTC")
213 if expired:
214 Path(lock_info["path"]).unlink()
215 self._locks.pop(key, None)
216 return False
217 else:
218 return True
220 def is_lock_holder(self, key: str, holder: str) -> bool:
221 if not self.is_locked(key):
222 return False
224 if not self.is_locked(key):
225 return False
226 if (lock_info := self._get_lock_info(key)) is None:
227 return False
228 return lock_info["holder"] == holder
230 def wait_for_lock(self, key: str, timeout: Optional[float] = None) -> bool:
231 seconds_waited = 0
232 while self.is_locked(key, use_cache=False):
233 if timeout and seconds_waited >= timeout:
234 return False
235 seconds_waited += 0.1
236 time.sleep(0.1)
237 return True
239 async def await_for_lock(self, key: str, timeout: Optional[float] = None) -> bool:
240 seconds_waited = 0
241 while self.is_locked(key, use_cache=False):
242 if timeout and seconds_waited >= timeout:
243 return False
244 seconds_waited += 0.1
245 await anyio.sleep(0.1)
246 return True