Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/security/providers/auth_provider.py: 88%
46 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1import abc 1p
2from datetime import UTC, datetime, timedelta 1p
4import jwt 1p
5from sqlalchemy.orm.session import Session 1p
7from mealie.core.config import get_app_settings 1p
8from mealie.repos.all_repositories import get_repositories 1p
9from mealie.schema.user.user import PrivateUser 1p
11ALGORITHM = "HS256" 1p
12ISS = "mealie" 1p
13remember_me_duration = timedelta(days=14) 1p
16class AuthProvider[T](metaclass=abc.ABCMeta): 1p
17 """Base Authentication Provider interface"""
19 def __init__(self, session: Session, data: T) -> None: 1p
20 self.session = session 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
21 self.data = data 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
22 self.user: PrivateUser | None = None 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
23 self.__has_tried_user = False 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
25 @classmethod 1p
26 def __subclasshook__(cls, __subclass: type) -> bool: 1p
27 return hasattr(__subclass, "authenticate") and callable(__subclass.authenticate)
29 def get_access_token(self, user: PrivateUser, remember_me=False) -> tuple[str, timedelta]: 1p
30 settings = get_app_settings() 1efghijklmnabcod
32 duration = timedelta(hours=settings.TOKEN_TIME) 1efghijklmnabcod
33 if remember_me: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true1efghijklmnabcod
34 duration = max(remember_me_duration, duration)
36 return AuthProvider.create_access_token({"sub": str(user.id)}, duration) 1efghijklmnabcod
38 @staticmethod 1p
39 def create_access_token(data: dict, expires_delta: timedelta | None = None) -> tuple[str, timedelta]: 1p
40 settings = get_app_settings() 1efghijklmnabcod
42 to_encode = data.copy() 1efghijklmnabcod
43 expires_delta = expires_delta or timedelta(hours=settings.TOKEN_TIME) 1efghijklmnabcod
45 expire = datetime.now(UTC) + expires_delta 1efghijklmnabcod
47 to_encode["exp"] = expire 1efghijklmnabcod
48 to_encode["iss"] = ISS 1efghijklmnabcod
49 return ( 1efghijklmnabcod
50 jwt.encode(to_encode, settings.SECRET, algorithm=ALGORITHM),
51 expires_delta,
52 )
54 def try_get_user(self, username: str) -> PrivateUser | None: 1p
55 """Try to get a user from the database, first trying username, then trying email"""
56 if self.__has_tried_user: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
57 return self.user
59 db = get_repositories(self.session, group_id=None, household_id=None) 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
61 user = user = db.users.get_one(username, "username", any_case=True) 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
62 if not user: 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
63 user = db.users.get_one(username, "email", any_case=True) 1qrstauvwxyzbABCDEFGcHIJKLMNOPQRSTUd
65 self.user = user 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
66 return user 1efgqrshijklmntauvwxyzbABCDEFGcHIJKLMNoOPQRSTUd
68 @abc.abstractmethod 1p
69 def authenticate(self) -> tuple[str, timedelta] | None: 1p
70 """Attempt to authenticate a user"""
71 raise NotImplementedError