Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/dependencies/dependencies.py: 54%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:32 +0000

1import tempfile 1a

2from collections.abc import Callable, Generator 1a

3from contextlib import contextmanager 1a

4from pathlib import Path 1a

5from shutil import rmtree 1a

6from uuid import uuid4 1a

7 

8import fastapi 1a

9import jwt 1a

10from fastapi import Depends, HTTPException, Request, status 1a

11from fastapi.security import OAuth2PasswordBearer 1a

12from jwt.exceptions import PyJWTError 1a

13from sqlalchemy.orm.session import Session 1a

14 

15from mealie.core import root_logger 1a

16from mealie.core.config import get_app_dirs, get_app_settings 1a

17from mealie.db.db_setup import generate_session 1a

18from mealie.repos.all_repositories import get_repositories 1a

19from mealie.schema.user import PrivateUser, TokenData 1a

20from mealie.schema.user.user import DEFAULT_INTEGRATION_ID, GroupInDB 1a

21 

22oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") 1a

23oauth2_scheme_soft_fail = OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False) 1a

24ALGORITHM = "HS256" 1a

25app_dirs = get_app_dirs() 1a

26settings = get_app_settings() 1a

27logger = root_logger.get_logger("dependencies") 1a

28 

29credentials_exception = HTTPException( 1a

30 status_code=status.HTTP_401_UNAUTHORIZED, 

31 detail="Could not validate credentials", 

32 headers={"WWW-Authenticate": "Bearer"}, 

33) 

34 

35 

36async def is_logged_in(token: str = Depends(oauth2_scheme_soft_fail), session=Depends(generate_session)) -> bool: 1a

37 """ 

38 When you need to determine if the user is logged in, but don't need the user, you can use this 

39 function to return a boolean value to represent if the user is logged in. No Auth exceptions are raised 

40 if the user is not logged in. This behavior is not the same as 'get_current_user' 

41 

42 Args: 

43 token (str, optional): [description]. Defaults to Depends(oauth2_scheme_soft_fail). 

44 session ([type], optional): [description]. Defaults to Depends(generate_session). 

45 

46 Returns: 

47 bool: True = Valid User / False = Not User 

48 """ 

49 try: 

50 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

51 user_id: str = payload.get("sub") 

52 long_token: str = payload.get("long_token") 

53 

54 if long_token is not None: 

55 try: 

56 if validate_long_live_token(session, token, payload.get("id")): 

57 return True 

58 except Exception: 

59 return False 

60 

61 return user_id is not None 

62 

63 except Exception: 

64 return False 

65 

66 

67async def get_public_group(group_slug: str = fastapi.Path(...), session=Depends(generate_session)) -> GroupInDB: 1a

68 repos = get_repositories(session) 1c

69 group = repos.groups.get_by_slug_or_id(group_slug) 1c

70 

71 if not group or group.preferences.private_group: 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was always true1c

72 raise HTTPException(404, "group not found") 1c

73 else: 

74 return group 

75 

76 

77async def try_get_current_user( 1a

78 request: Request, 

79 token: str = Depends(oauth2_scheme_soft_fail), 

80 session=Depends(generate_session), 

81) -> PrivateUser | None: 

82 try: 

83 return await get_current_user(request, token, session) 

84 except Exception: 

85 return None 

86 

87 

88async def get_current_user( 1a

89 request: Request, 

90 token: str | None = Depends(oauth2_scheme_soft_fail), 

91 session=Depends(generate_session), 

92) -> PrivateUser: 

93 if token is None and "mealie.access_token" in request.cookies: 93 ↛ 95line 93 didn't jump to line 95 because the condition on line 93 was never true1cdefghijklmb

94 # Try extract from cookie 

95 token = request.cookies.get("mealie.access_token", "") 

96 else: 

97 token = token or "" 1cdefghijklmb

98 

99 try: 1cdefghijklmb

100 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 1cdefghijklmb

101 user_id: str | None = payload.get("sub") 1cdefghijklmb

102 long_token: str | None = payload.get("long_token") 1cdefghijklmb

103 

104 if long_token is not None: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true1cdefghijklmb

105 return validate_long_live_token(session, token, payload.get("id")) 

106 

107 if user_id is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true1cdefghijklmb

108 raise credentials_exception 

109 

110 token_data = TokenData(user_id=user_id) 1cdefghijklmb

111 except PyJWTError as e: 

112 raise credentials_exception from e 

113 

114 repos = get_repositories(session, group_id=None, household_id=None) 1cdefghijklmb

115 

116 user = repos.users.get_one(token_data.user_id, "id", any_case=False) 1cdefghijklmb

117 

118 # If we don't commit here, lazy-loads from user relationships will leave some table lock in postgres 

119 # which can cause quite a bit of pain further down the line 

120 session.commit() 1cdefghijklmb

121 if user is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true1cdefghijklmb

122 raise credentials_exception 

123 return user 1cdefghijklmb

124 

125 

126async def get_integration_id(token: str = Depends(oauth2_scheme)) -> str: 1a

127 try: 1cdefghijklmb

128 decoded_token = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 1cdefghijklmb

129 return decoded_token.get("integration_id", DEFAULT_INTEGRATION_ID) 1cdefghijklmb

130 

131 except PyJWTError as e: 

132 raise credentials_exception from e 

133 

134 

135async def get_admin_user(current_user: PrivateUser = Depends(get_current_user)) -> PrivateUser: 1a

136 if not current_user.admin: 136 ↛ 138line 136 didn't jump to line 138 because the condition on line 136 was always true1cdefghijklmb

137 raise HTTPException(status.HTTP_403_FORBIDDEN) 1cdefghijklmb

138 return current_user 

139 

140 

141def validate_long_live_token(session: Session, client_token: str, user_id: str) -> PrivateUser: 1a

142 repos = get_repositories(session, group_id=None, household_id=None) 

143 

144 token = repos.api_tokens.multi_query({"token": client_token, "user_id": user_id}) 

145 

146 try: 

147 return token[0].user 

148 except IndexError as e: 

149 raise HTTPException(status.HTTP_401_UNAUTHORIZED) from e 

150 

151 

152def validate_file_token(token: str | None = None) -> Path: 1a

153 """ 

154 Args: 

155 token (Optional[str], optional): _description_. Defaults to None. 

156 

157 Raises: 

158 HTTPException: 400 Bad Request when no token or the file doesn't exist 

159 HTTPException: 401 Unauthorized when the token is invalid 

160 """ 

161 if not token: 161 ↛ 164line 161 didn't jump to line 164 because the condition on line 161 was always true

162 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

163 

164 try: 

165 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

166 file_path = Path(payload.get("file")) 

167 except PyJWTError as e: 

168 raise HTTPException( 

169 status_code=status.HTTP_401_UNAUTHORIZED, 

170 detail="could not validate file token", 

171 ) from e 

172 

173 if not file_path.exists(): 

174 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

175 

176 return file_path 

177 

178 

179def validate_recipe_token(token: str | None = None) -> str: 1a

180 """ 

181 Args: 

182 token (Optional[str], optional): _description_. Defaults to None. 

183 

184 Raises: 

185 HTTPException: 400 Bad Request when no token or the recipe doesn't exist 

186 HTTPException: 401 PyJWTError when token is invalid 

187 

188 Returns: 

189 str: token data 

190 """ 

191 if not token: 191 ↛ 194line 191 didn't jump to line 194 because the condition on line 191 was always true

192 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

193 

194 try: 

195 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

196 slug: str | None = payload.get("slug") 

197 except PyJWTError as e: 

198 raise HTTPException( 

199 status_code=status.HTTP_401_UNAUTHORIZED, 

200 detail="could not validate file token", 

201 ) from e 

202 

203 if slug is None: 

204 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

205 

206 return slug 

207 

208 

209@contextmanager 1a

210def get_temporary_zip_path(auto_unlink=True) -> Generator[Path, None, None]: 1a

211 app_dirs.TEMP_DIR.mkdir(exist_ok=True, parents=True) 

212 temp_path = app_dirs.TEMP_DIR.joinpath("my_zip_archive.zip") 

213 try: 

214 yield temp_path 

215 finally: 

216 if auto_unlink: 

217 temp_path.unlink(missing_ok=True) 

218 

219 

220@contextmanager 1a

221def get_temporary_path(auto_unlink=True) -> Generator[Path, None, None]: 1a

222 temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex) 

223 temp_path.mkdir(exist_ok=True, parents=True) 

224 try: 

225 yield temp_path 

226 finally: 

227 if auto_unlink: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 rmtree(temp_path) 

229 

230 

231def temporary_file(ext: str = "") -> Callable[[], Generator[tempfile._TemporaryFileWrapper, None, None]]: 1a

232 """ 

233 Returns a temporary file with the specified extension 

234 """ 

235 

236 def func(): 

237 temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex + ext) 

238 temp_path.touch() 

239 

240 with tempfile.NamedTemporaryFile(mode="w+b", suffix=ext) as f: 

241 try: 

242 yield f 

243 finally: 

244 temp_path.unlink(missing_ok=True) 

245 

246 return func