Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/security/providers/auth_provider.py: 88%
46 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1import abc 1c
2from datetime import UTC, datetime, timedelta 1c
4import jwt 1c
5from sqlalchemy.orm.session import Session 1c
7from mealie.core.config import get_app_settings 1c
8from mealie.repos.all_repositories import get_repositories 1c
9from mealie.schema.user.user import PrivateUser 1c
11ALGORITHM = "HS256" 1c
12ISS = "mealie" 1c
13remember_me_duration = timedelta(days=14) 1c
16class AuthProvider[T](metaclass=abc.ABCMeta): 1c
17 """Base Authentication Provider interface"""
19 def __init__(self, session: Session, data: T) -> None: 1c
20 self.session = session 1bdefghaijk
21 self.data = data 1bdefghaijk
22 self.user: PrivateUser | None = None 1bdefghaijk
23 self.__has_tried_user = False 1bdefghaijk
25 @classmethod 1c
26 def __subclasshook__(cls, __subclass: type) -> bool: 1c
27 return hasattr(__subclass, "authenticate") and callable(__subclass.authenticate)
29 def get_access_token(self, user: PrivateUser, remember_me=False) -> tuple[str, timedelta]: 1c
30 settings = get_app_settings() 1ba
32 duration = timedelta(hours=settings.TOKEN_TIME) 1ba
33 if remember_me: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true1ba
34 duration = max(remember_me_duration, duration)
36 return AuthProvider.create_access_token({"sub": str(user.id)}, duration) 1ba
38 @staticmethod 1c
39 def create_access_token(data: dict, expires_delta: timedelta | None = None) -> tuple[str, timedelta]: 1c
40 settings = get_app_settings() 1ba
42 to_encode = data.copy() 1ba
43 expires_delta = expires_delta or timedelta(hours=settings.TOKEN_TIME) 1ba
45 expire = datetime.now(UTC) + expires_delta 1ba
47 to_encode["exp"] = expire 1ba
48 to_encode["iss"] = ISS 1ba
49 return ( 1ba
50 jwt.encode(to_encode, settings.SECRET, algorithm=ALGORITHM),
51 expires_delta,
52 )
54 def try_get_user(self, username: str) -> PrivateUser | None: 1c
55 """Try to get a user from the database, first trying username, then trying email"""
56 if self.__has_tried_user: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true1bdefghaijk
57 return self.user
59 db = get_repositories(self.session, group_id=None, household_id=None) 1bdefghaijk
61 user = user = db.users.get_one(username, "username", any_case=True) 1bdefghaijk
62 if not user: 1bdefghaijk
63 user = db.users.get_one(username, "email", any_case=True) 1defghaijk
65 self.user = user 1bdefghaijk
66 return user 1bdefghaijk
68 @abc.abstractmethod 1c
69 def authenticate(self) -> tuple[str, timedelta] | None: 1c
70 """Attempt to authenticate a user"""
71 raise NotImplementedError