Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/dependencies/dependencies.py: 55%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:45 +0000

1import tempfile 1a

2from collections.abc import Callable, Generator 1a

3from contextlib import contextmanager 1a

4from pathlib import Path 1a

5from shutil import rmtree 1a

6from uuid import uuid4 1a

7 

8import fastapi 1a

9import jwt 1a

10from fastapi import Depends, HTTPException, Request, status 1a

11from fastapi.security import OAuth2PasswordBearer 1a

12from jwt.exceptions import PyJWTError 1a

13from sqlalchemy.orm.session import Session 1a

14 

15from mealie.core import root_logger 1a

16from mealie.core.config import get_app_dirs, get_app_settings 1a

17from mealie.db.db_setup import generate_session 1a

18from mealie.repos.all_repositories import get_repositories 1a

19from mealie.schema.user import PrivateUser, TokenData 1a

20from mealie.schema.user.user import DEFAULT_INTEGRATION_ID, GroupInDB 1a

21 

22oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") 1a

23oauth2_scheme_soft_fail = OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False) 1a

24ALGORITHM = "HS256" 1a

25app_dirs = get_app_dirs() 1a

26settings = get_app_settings() 1a

27logger = root_logger.get_logger("dependencies") 1a

28 

29credentials_exception = HTTPException( 1a

30 status_code=status.HTTP_401_UNAUTHORIZED, 

31 detail="Could not validate credentials", 

32 headers={"WWW-Authenticate": "Bearer"}, 

33) 

34 

35 

36async def is_logged_in(token: str = Depends(oauth2_scheme_soft_fail), session=Depends(generate_session)) -> bool: 1a

37 """ 

38 When you need to determine if the user is logged in, but don't need the user, you can use this 

39 function to return a boolean value to represent if the user is logged in. No Auth exceptions are raised 

40 if the user is not logged in. This behavior is not the same as 'get_current_user' 

41 

42 Args: 

43 token (str, optional): [description]. Defaults to Depends(oauth2_scheme_soft_fail). 

44 session ([type], optional): [description]. Defaults to Depends(generate_session). 

45 

46 Returns: 

47 bool: True = Valid User / False = Not User 

48 """ 

49 try: 

50 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

51 user_id: str = payload.get("sub") 

52 long_token: str = payload.get("long_token") 

53 

54 if long_token is not None: 

55 try: 

56 if validate_long_live_token(session, token, payload.get("id")): 

57 return True 

58 except Exception: 

59 return False 

60 

61 return user_id is not None 

62 

63 except Exception: 

64 return False 

65 

66 

67async def get_public_group(group_slug: str = fastapi.Path(...), session=Depends(generate_session)) -> GroupInDB: 1a

68 repos = get_repositories(session) 1b

69 group = repos.groups.get_by_slug_or_id(group_slug) 1b

70 

71 if not group or group.preferences.private_group: 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was always true1b

72 raise HTTPException(404, "group not found") 1b

73 else: 

74 return group 

75 

76 

77async def try_get_current_user( 1a

78 request: Request, 

79 token: str = Depends(oauth2_scheme_soft_fail), 

80 session=Depends(generate_session), 

81) -> PrivateUser | None: 

82 try: 

83 return await get_current_user(request, token, session) 

84 except Exception: 

85 return None 

86 

87 

88async def get_current_user( 1a

89 request: Request, 

90 token: str | None = Depends(oauth2_scheme_soft_fail), 

91 session=Depends(generate_session), 

92) -> PrivateUser: 

93 if token is None and "mealie.access_token" in request.cookies: 93 ↛ 95line 93 didn't jump to line 95 because the condition on line 93 was never true1bpdefghijklmnoc

94 # Try extract from cookie 

95 token = request.cookies.get("mealie.access_token", "") 

96 else: 

97 token = token or "" 1bpdefghijklmnoc

98 

99 try: 1bpdefghijklmnoc

100 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 1bpdefghijklmnoc

101 user_id: str | None = payload.get("sub") 1bpdefghijklmnoc

102 long_token: str | None = payload.get("long_token") 1bpdefghijklmnoc

103 

104 if long_token is not None: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true1bpdefghijklmnoc

105 return validate_long_live_token(session, token, payload.get("id")) 

106 

107 if user_id is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true1bpdefghijklmnoc

108 raise credentials_exception 

109 

110 token_data = TokenData(user_id=user_id) 1bpdefghijklmnoc

111 except PyJWTError as e: 

112 raise credentials_exception from e 

113 

114 repos = get_repositories(session, group_id=None, household_id=None) 1bpdefghijklmnoc

115 

116 user = repos.users.get_one(token_data.user_id, "id", any_case=False) 1bpdefghijklmnoc

117 

118 # If we don't commit here, lazy-loads from user relationships will leave some table lock in postgres 

119 # which can cause quite a bit of pain further down the line 

120 session.commit() 1bpdefghijklmnoc

121 if user is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true1bpdefghijklmnoc

122 raise credentials_exception 

123 return user 1bpdefghijklmnoc

124 

125 

126async def get_integration_id(token: str = Depends(oauth2_scheme)) -> str: 1a

127 try: 1bpdefghijklmnoc

128 decoded_token = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 1bpdefghijklmnoc

129 return decoded_token.get("integration_id", DEFAULT_INTEGRATION_ID) 1bpdefghijklmnoc

130 

131 except PyJWTError as e: 

132 raise credentials_exception from e 

133 

134 

135async def get_admin_user(current_user: PrivateUser = Depends(get_current_user)) -> PrivateUser: 1a

136 if not current_user.admin: 136 ↛ 138line 136 didn't jump to line 138 because the condition on line 136 was always true1befghijklmnoc

137 raise HTTPException(status.HTTP_403_FORBIDDEN) 1befghijklmnoc

138 return current_user 

139 

140 

141def validate_long_live_token(session: Session, client_token: str, user_id: str) -> PrivateUser: 1a

142 repos = get_repositories(session, group_id=None, household_id=None) 

143 

144 token = repos.api_tokens.multi_query({"token": client_token, "user_id": user_id}) 

145 

146 try: 

147 return token[0].user 

148 except IndexError as e: 

149 raise HTTPException(status.HTTP_401_UNAUTHORIZED) from e 

150 

151 

152def validate_file_token(token: str | None = None) -> Path: 1a

153 """ 

154 Args: 

155 token (Optional[str], optional): _description_. Defaults to None. 

156 

157 Raises: 

158 HTTPException: 400 Bad Request when no token or the file doesn't exist 

159 HTTPException: 401 Unauthorized when the token is invalid 

160 """ 

161 if not token: 161 ↛ 164line 161 didn't jump to line 164 because the condition on line 161 was always true1b

162 raise HTTPException(status.HTTP_400_BAD_REQUEST) 1b

163 

164 try: 

165 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

166 file_path = Path(payload.get("file")) 

167 except PyJWTError as e: 

168 raise HTTPException( 

169 status_code=status.HTTP_401_UNAUTHORIZED, 

170 detail="could not validate file token", 

171 ) from e 

172 

173 if not file_path.exists(): 

174 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

175 

176 return file_path 

177 

178 

179def validate_recipe_token(token: str | None = None) -> str: 1a

180 """ 

181 Args: 

182 token (Optional[str], optional): _description_. Defaults to None. 

183 

184 Raises: 

185 HTTPException: 400 Bad Request when no token or the recipe doesn't exist 

186 HTTPException: 401 PyJWTError when token is invalid 

187 

188 Returns: 

189 str: token data 

190 """ 

191 if not token: 191 ↛ 194line 191 didn't jump to line 194 because the condition on line 191 was always true1b

192 raise HTTPException(status.HTTP_400_BAD_REQUEST) 1b

193 

194 try: 

195 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 

196 slug: str | None = payload.get("slug") 

197 except PyJWTError as e: 

198 raise HTTPException( 

199 status_code=status.HTTP_401_UNAUTHORIZED, 

200 detail="could not validate file token", 

201 ) from e 

202 

203 if slug is None: 

204 raise HTTPException(status.HTTP_400_BAD_REQUEST) 

205 

206 return slug 

207 

208 

209@contextmanager 1a

210def get_temporary_zip_path(auto_unlink=True) -> Generator[Path, None, None]: 1a

211 app_dirs.TEMP_DIR.mkdir(exist_ok=True, parents=True) 1bdc

212 temp_path = app_dirs.TEMP_DIR.joinpath("my_zip_archive.zip") 1bdc

213 try: 1bdc

214 yield temp_path 1bdc

215 finally: 

216 if auto_unlink: 1bdc

217 temp_path.unlink(missing_ok=True) 1dc

218 

219 

220@contextmanager 1a

221def get_temporary_path(auto_unlink=True) -> Generator[Path, None, None]: 1a

222 temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex) 1bc

223 temp_path.mkdir(exist_ok=True, parents=True) 1bc

224 try: 1bc

225 yield temp_path 1bc

226 finally: 

227 if auto_unlink: 1bc

228 rmtree(temp_path) 

229 

230 

231def temporary_file(ext: str = "") -> Callable[[], Generator[tempfile._TemporaryFileWrapper, None, None]]: 1a

232 """ 

233 Returns a temporary file with the specified extension 

234 """ 

235 

236 def func(): 

237 temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex + ext) 

238 temp_path.touch() 

239 

240 with tempfile.NamedTemporaryFile(mode="w+b", suffix=ext) as f: 

241 try: 

242 yield f 

243 finally: 

244 temp_path.unlink(missing_ok=True) 

245 

246 return func