Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/dependencies/dependencies.py: 60%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1import tempfile 1b

2from collections.abc import Callable, Generator 1b

3from contextlib import contextmanager 1b

4from pathlib import Path 1b

5from shutil import rmtree 1b

6from uuid import uuid4 1b

7 

8import fastapi 1b

9import jwt 1b

10from fastapi import Depends, HTTPException, Request, status 1b

11from fastapi.security import OAuth2PasswordBearer 1b

12from jwt.exceptions import PyJWTError 1b

13from sqlalchemy.orm.session import Session 1b

14 

15from mealie.core import root_logger 1b

16from mealie.core.config import get_app_dirs, get_app_settings 1b

17from mealie.db.db_setup import generate_session 1b

18from mealie.repos.all_repositories import get_repositories 1b

19from mealie.schema.user import PrivateUser, TokenData 1b

20from mealie.schema.user.user import DEFAULT_INTEGRATION_ID, GroupInDB 1b

21 

22oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") 1b

23oauth2_scheme_soft_fail = OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False) 1b

24ALGORITHM = "HS256" 1b

25app_dirs = get_app_dirs() 1b

26settings = get_app_settings() 1b

27logger = root_logger.get_logger("dependencies") 1b

28 

29credentials_exception = HTTPException( 1b

30 status_code=status.HTTP_401_UNAUTHORIZED, 

31 detail="Could not validate credentials", 

32 headers={"WWW-Authenticate": "Bearer"}, 

33) 

34 

35 

36async def is_logged_in(token: str = Depends(oauth2_scheme_soft_fail), session=Depends(generate_session)) -> bool: 1b

37 """ 

38 When you need to determine if the user is logged in, but don't need the user, you can use this 

39 function to return a boolean value to represent if the user is logged in. No Auth exceptions are raised 

40 if the user is not logged in. This behavior is not the same as 'get_current_user' 

41 

42 Args: 

43 token (str, optional): [description]. Defaults to Depends(oauth2_scheme_soft_fail). 

44 session ([type], optional): [description]. Defaults to Depends(generate_session). 

45 

46 Returns: 

47 bool: True = Valid User / False = Not User 

48 """ 

49 try: 

50 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

51 user_id: str = payload.get("sub") 

52 long_token: str = payload.get("long_token") 

53 

54 if long_token is not None: 

55 try: 

56 if validate_long_live_token(session, token, payload.get("id")): 

57 return True 

58 except Exception: 

59 return False 

60 

61 return user_id is not None 

62 

63 except Exception: 

64 return False 

65 

66 

67async def get_public_group(group_slug: str = fastapi.Path(...), session=Depends(generate_session)) -> GroupInDB: 1b

68 repos = get_repositories(session) 1da

69 group = repos.groups.get_by_slug_or_id(group_slug) 1da

70 

71 if not group or group.preferences.private_group: 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was always true1da

72 raise HTTPException(404, "group not found") 1da

73 else: 

74 return group 

75 

76 

77async def try_get_current_user( 1b

78 request: Request, 

79 token: str = Depends(oauth2_scheme_soft_fail), 

80 session=Depends(generate_session), 

81) -> PrivateUser | None: 

82 try: 

83 return await get_current_user(request, token, session) 

84 except Exception: 

85 return None 

86 

87 

88async def get_current_user( 1b

89 request: Request, 

90 token: str | None = Depends(oauth2_scheme_soft_fail), 

91 session=Depends(generate_session), 

92) -> PrivateUser: 

93 if token is None and "mealie.access_token" in request.cookies: 93 ↛ 95line 93 didn't jump to line 95 because the condition on line 93 was never true1edwfxyczAghBCijklmnopqrstuva

94 # Try extract from cookie 

95 token = request.cookies.get("mealie.access_token", "") 

96 else: 

97 token = token or "" 1edwfxyczAghBCijklmnopqrstuva

98 

99 try: 1edwfxyczAghBCijklmnopqrstuva

100 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 1edwfxyczAghBCijklmnopqrstuva

101 user_id: str | None = payload.get("sub") 1edwfxyczAghBCijklmnopqrstuva

102 long_token: str | None = payload.get("long_token") 1edwfxyczAghBCijklmnopqrstuva

103 

104 if long_token is not None: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true1edwfxyczAghBCijklmnopqrstuva

105 return validate_long_live_token(session, token, payload.get("id")) 

106 

107 if user_id is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true1edwfxyczAghBCijklmnopqrstuva

108 raise credentials_exception 

109 

110 token_data = TokenData(user_id=user_id) 1edwfxyczAghBCijklmnopqrstuva

111 except PyJWTError as e: 

112 raise credentials_exception from e 

113 

114 repos = get_repositories(session, group_id=None, household_id=None) 1edwfxyczAghBCijklmnopqrstuva

115 

116 user = repos.users.get_one(token_data.user_id, "id", any_case=False) 1edwfxyczAghBCijklmnopqrstuva

117 

118 # If we don't commit here, lazy-loads from user relationships will leave some table lock in postgres 

119 # which can cause quite a bit of pain further down the line 

120 session.commit() 1edwfxyczAghBCijklmnopqrstuva

121 if user is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true1edwfxyczAghBCijklmnopqrstuva

122 raise credentials_exception 

123 return user 1edwfxyczAghBCijklmnopqrstuva

124 

125 

126async def get_integration_id(token: str = Depends(oauth2_scheme)) -> str: 1b

127 try: 1edwfxyczAghBCijklmnopqrstuva

128 decoded_token = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 1edwfxyczAghBCijklmnopqrstuva

129 return decoded_token.get("integration_id", DEFAULT_INTEGRATION_ID) 1edwfxyczAghBCijklmnopqrstuva

130 

131 except PyJWTError as e: 

132 raise credentials_exception from e 

133 

134 

135async def get_admin_user(current_user: PrivateUser = Depends(get_current_user)) -> PrivateUser: 1b

136 if not current_user.admin: 136 ↛ 138line 136 didn't jump to line 138 because the condition on line 136 was always true1efghijklmnopqrstuva

137 raise HTTPException(status.HTTP_403_FORBIDDEN) 1efghijklmnopqrstuva

138 return current_user 

139 

140 

141def validate_long_live_token(session: Session, client_token: str, user_id: str) -> PrivateUser: 1b

142 repos = get_repositories(session, group_id=None, household_id=None) 

143 

144 token = repos.api_tokens.multi_query({"token": client_token, "user_id": user_id}) 

145 

146 try: 

147 return token[0].user 

148 except IndexError as e: 

149 raise HTTPException(status.HTTP_401_UNAUTHORIZED) from e 

150 

151 

152def validate_file_token(token: str | None = None) -> Path: 1b

153 """ 

154 Args: 

155 token (Optional[str], optional): _description_. Defaults to None. 

156 

157 Raises: 

158 HTTPException: 400 Bad Request when no token or the file doesn't exist 

159 HTTPException: 401 Unauthorized when the token is invalid 

160 """ 

161 if not token: 

162 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

163 

164 try: 

165 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

166 file_path = Path(payload.get("file")) 

167 except PyJWTError as e: 

168 raise HTTPException( 

169 status_code=status.HTTP_401_UNAUTHORIZED, 

170 detail="could not validate file token", 

171 ) from e 

172 

173 if not file_path.exists(): 

174 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

175 

176 return file_path 

177 

178 

179def validate_recipe_token(token: str | None = None) -> str: 1b

180 """ 

181 Args: 

182 token (Optional[str], optional): _description_. Defaults to None. 

183 

184 Raises: 

185 HTTPException: 400 Bad Request when no token or the recipe doesn't exist 

186 HTTPException: 401 PyJWTError when token is invalid 

187 

188 Returns: 

189 str: token data 

190 """ 

191 if not token: 

192 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

193 

194 try: 

195 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

196 slug: str | None = payload.get("slug") 

197 except PyJWTError as e: 

198 raise HTTPException( 

199 status_code=status.HTTP_401_UNAUTHORIZED, 

200 detail="could not validate file token", 

201 ) from e 

202 

203 if slug is None: 

204 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

205 

206 return slug 

207 

208 

209@contextmanager 1b

210def get_temporary_zip_path(auto_unlink=True) -> Generator[Path, None, None]: 1b

211 app_dirs.TEMP_DIR.mkdir(exist_ok=True, parents=True) 1ca

212 temp_path = app_dirs.TEMP_DIR.joinpath("my_zip_archive.zip") 1ca

213 try: 1ca

214 yield temp_path 1ca

215 finally: 

216 if auto_unlink: 1ca

217 temp_path.unlink(missing_ok=True) 1ca

218 

219 

220@contextmanager 1b

221def get_temporary_path(auto_unlink=True) -> Generator[Path, None, None]: 1b

222 temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex) 

223 temp_path.mkdir(exist_ok=True, parents=True) 

224 try: 

225 yield temp_path 

226 finally: 

227 if auto_unlink: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 rmtree(temp_path) 

229 

230 

231def temporary_file(ext: str = "") -> Callable[[], Generator[tempfile._TemporaryFileWrapper, None, None]]: 1b

232 """ 

233 Returns a temporary file with the specified extension 

234 """ 

235 

236 def func(): 

237 temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex + ext) 

238 temp_path.touch() 

239 

240 with tempfile.NamedTemporaryFile(mode="w+b", suffix=ext) as f: 

241 try: 

242 yield f 

243 finally: 

244 temp_path.unlink(missing_ok=True) 

245 

246 return func