Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/dependencies/dependencies.py: 60%
135 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1import tempfile 1b
2from collections.abc import Callable, Generator 1b
3from contextlib import contextmanager 1b
4from pathlib import Path 1b
5from shutil import rmtree 1b
6from uuid import uuid4 1b
8import fastapi 1b
9import jwt 1b
10from fastapi import Depends, HTTPException, Request, status 1b
11from fastapi.security import OAuth2PasswordBearer 1b
12from jwt.exceptions import PyJWTError 1b
13from sqlalchemy.orm.session import Session 1b
15from mealie.core import root_logger 1b
16from mealie.core.config import get_app_dirs, get_app_settings 1b
17from mealie.db.db_setup import generate_session 1b
18from mealie.repos.all_repositories import get_repositories 1b
19from mealie.schema.user import PrivateUser, TokenData 1b
20from mealie.schema.user.user import DEFAULT_INTEGRATION_ID, GroupInDB 1b
22oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") 1b
23oauth2_scheme_soft_fail = OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False) 1b
24ALGORITHM = "HS256" 1b
25app_dirs = get_app_dirs() 1b
26settings = get_app_settings() 1b
27logger = root_logger.get_logger("dependencies") 1b
29credentials_exception = HTTPException( 1b
30 status_code=status.HTTP_401_UNAUTHORIZED,
31 detail="Could not validate credentials",
32 headers={"WWW-Authenticate": "Bearer"},
33)
36async def is_logged_in(token: str = Depends(oauth2_scheme_soft_fail), session=Depends(generate_session)) -> bool: 1b
37 """
38 When you need to determine if the user is logged in, but don't need the user, you can use this
39 function to return a boolean value to represent if the user is logged in. No Auth exceptions are raised
40 if the user is not logged in. This behavior is not the same as 'get_current_user'
42 Args:
43 token (str, optional): [description]. Defaults to Depends(oauth2_scheme_soft_fail).
44 session ([type], optional): [description]. Defaults to Depends(generate_session).
46 Returns:
47 bool: True = Valid User / False = Not User
48 """
49 try:
50 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
51 user_id: str = payload.get("sub")
52 long_token: str = payload.get("long_token")
54 if long_token is not None:
55 try:
56 if validate_long_live_token(session, token, payload.get("id")):
57 return True
58 except Exception:
59 return False
61 return user_id is not None
63 except Exception:
64 return False
67async def get_public_group(group_slug: str = fastapi.Path(...), session=Depends(generate_session)) -> GroupInDB: 1b
68 repos = get_repositories(session) 2AbBba
69 group = repos.groups.get_by_slug_or_id(group_slug) 2AbBba
71 if not group or group.preferences.private_group: 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was always true2AbBba
72 raise HTTPException(404, "group not found") 2AbBba
73 else:
74 return group
77async def try_get_current_user( 1b
78 request: Request,
79 token: str = Depends(oauth2_scheme_soft_fail),
80 session=Depends(generate_session),
81) -> PrivateUser | None:
82 try:
83 return await get_current_user(request, token, session)
84 except Exception:
85 return None
88async def get_current_user( 1b
89 request: Request,
90 token: str | None = Depends(oauth2_scheme_soft_fail),
91 session=Depends(generate_session),
92) -> PrivateUser:
93 if token is None and "mealie.access_token" in request.cookies: 93 ↛ 95line 93 didn't jump to line 95 because the condition on line 93 was never true2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
94 # Try extract from cookie
95 token = request.cookies.get("mealie.access_token", "")
96 else:
97 token = token or "" 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
99 try: 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
100 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
101 user_id: str | None = payload.get("sub") 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
102 long_token: str | None = payload.get("long_token") 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
104 if long_token is not None: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
105 return validate_long_live_token(session, token, payload.get("id"))
107 if user_id is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
108 raise credentials_exception
110 token_data = TokenData(user_id=user_id) 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
111 except PyJWTError as e:
112 raise credentials_exception from e
114 repos = get_repositories(session, group_id=None, household_id=None) 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tba
116 user = repos.users.get_one(token_data.user_id, "id", any_case=False) 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tbzba
118 # If we don't commit here, lazy-loads from user relationships will leave some table lock in postgres
119 # which can cause quite a bit of pain further down the line
120 session.commit() 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tbzba
121 if user is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tbzba
122 raise credentials_exception
123 return user 2d Q R S T U V ubW X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f vb? @ [ ] ^ _ ` { wbxb| } ~ abbbcbdbebfbgbhbibjbkbg ybh lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tbzba
126async def get_integration_id(token: str = Depends(oauth2_scheme)) -> str: 1b
127 try: 2d Q R S T U V W X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkbg h lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tbzba
128 decoded_token = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) 2d Q R S T U V W X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkbg h lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tbzba
129 return decoded_token.get("integration_id", DEFAULT_INTEGRATION_ID) 2d Q R S T U V W X Y Z 0 1 2 3 4 5 6 7 8 c 9 ! # $ % ' ( ) * + , - . / : e ; = f ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkbg h lbmbnbobpbqbrbi j k l m n o p q r s t u v w x y z A B C D E F G sbH I J K L M N O P tbzba
131 except PyJWTError as e:
132 raise credentials_exception from e
135async def get_admin_user(current_user: PrivateUser = Depends(get_current_user)) -> PrivateUser: 1b
136 if not current_user.admin: 136 ↛ 138line 136 didn't jump to line 138 because the condition on line 136 was always true2d ube f vbwbxbg ybh i j k l m n o p q r s t u v w x y z A B C D E F G H I J K L M N O P a
137 raise HTTPException(status.HTTP_403_FORBIDDEN) 2d ube f vbwbxbg ybh i j k l m n o p q r s t u v w x y z A B C D E F G H I J K L M N O P a
138 return current_user
141def validate_long_live_token(session: Session, client_token: str, user_id: str) -> PrivateUser: 1b
142 repos = get_repositories(session, group_id=None, household_id=None)
144 token = repos.api_tokens.multi_query({"token": client_token, "user_id": user_id})
146 try:
147 return token[0].user
148 except IndexError as e:
149 raise HTTPException(status.HTTP_401_UNAUTHORIZED) from e
152def validate_file_token(token: str | None = None) -> Path: 1b
153 """
154 Args:
155 token (Optional[str], optional): _description_. Defaults to None.
157 Raises:
158 HTTPException: 400 Bad Request when no token or the file doesn't exist
159 HTTPException: 401 Unauthorized when the token is invalid
160 """
161 if not token:
162 raise HTTPException(status.HTTP_400_BAD_REQUEST)
164 try:
165 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
166 file_path = Path(payload.get("file"))
167 except PyJWTError as e:
168 raise HTTPException(
169 status_code=status.HTTP_401_UNAUTHORIZED,
170 detail="could not validate file token",
171 ) from e
173 if not file_path.exists():
174 raise HTTPException(status.HTTP_400_BAD_REQUEST)
176 return file_path
179def validate_recipe_token(token: str | None = None) -> str: 1b
180 """
181 Args:
182 token (Optional[str], optional): _description_. Defaults to None.
184 Raises:
185 HTTPException: 400 Bad Request when no token or the recipe doesn't exist
186 HTTPException: 401 PyJWTError when token is invalid
188 Returns:
189 str: token data
190 """
191 if not token:
192 raise HTTPException(status.HTTP_400_BAD_REQUEST)
194 try:
195 payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
196 slug: str | None = payload.get("slug")
197 except PyJWTError as e:
198 raise HTTPException(
199 status_code=status.HTTP_401_UNAUTHORIZED,
200 detail="could not validate file token",
201 ) from e
203 if slug is None:
204 raise HTTPException(status.HTTP_400_BAD_REQUEST)
206 return slug
209@contextmanager 1b
210def get_temporary_zip_path(auto_unlink=True) -> Generator[Path, None, None]: 1b
211 app_dirs.TEMP_DIR.mkdir(exist_ok=True, parents=True) 1ca
212 temp_path = app_dirs.TEMP_DIR.joinpath("my_zip_archive.zip") 1ca
213 try: 1ca
214 yield temp_path 1ca
215 finally:
216 if auto_unlink: 1ca
217 temp_path.unlink(missing_ok=True) 1ca
220@contextmanager 1b
221def get_temporary_path(auto_unlink=True) -> Generator[Path, None, None]: 1b
222 temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex)
223 temp_path.mkdir(exist_ok=True, parents=True)
224 try:
225 yield temp_path
226 finally:
227 if auto_unlink: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 rmtree(temp_path)
231def temporary_file(ext: str = "") -> Callable[[], Generator[tempfile._TemporaryFileWrapper, None, None]]: 1b
232 """
233 Returns a temporary file with the specified extension
234 """
236 def func():
237 temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex + ext)
238 temp_path.touch()
240 with tempfile.NamedTemporaryFile(mode="w+b", suffix=ext) as f:
241 try:
242 yield f
243 finally:
244 temp_path.unlink(missing_ok=True)
246 return func