Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/security/providers/auth_provider.py: 88%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1import abc 1e

2from datetime import UTC, datetime, timedelta 1e

3 

4import jwt 1e

5from sqlalchemy.orm.session import Session 1e

6 

7from mealie.core.config import get_app_settings 1e

8from mealie.repos.all_repositories import get_repositories 1e

9from mealie.schema.user.user import PrivateUser 1e

10 

11ALGORITHM = "HS256" 1e

12ISS = "mealie" 1e

13remember_me_duration = timedelta(days=14) 1e

14 

15 

16class AuthProvider[T](metaclass=abc.ABCMeta): 1e

17 """Base Authentication Provider interface""" 

18 

19 def __init__(self, session: Session, data: T) -> None: 1e

20 self.session = session 1cfdghaijklmnopbqr

21 self.data = data 1cfdghaijklmnopbqr

22 self.user: PrivateUser | None = None 1cfdghaijklmnopbqr

23 self.__has_tried_user = False 1cfdghaijklmnopbqr

24 

25 @classmethod 1e

26 def __subclasshook__(cls, __subclass: type) -> bool: 1e

27 return hasattr(__subclass, "authenticate") and callable(__subclass.authenticate) 

28 

29 def get_access_token(self, user: PrivateUser, remember_me=False) -> tuple[str, timedelta]: 1e

30 settings = get_app_settings() 1cdab

31 

32 duration = timedelta(hours=settings.TOKEN_TIME) 1cdab

33 if remember_me: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true1cdab

34 duration = max(remember_me_duration, duration) 

35 

36 return AuthProvider.create_access_token({"sub": str(user.id)}, duration) 1cdab

37 

38 @staticmethod 1e

39 def create_access_token(data: dict, expires_delta: timedelta | None = None) -> tuple[str, timedelta]: 1e

40 settings = get_app_settings() 1cdab

41 

42 to_encode = data.copy() 1cdab

43 expires_delta = expires_delta or timedelta(hours=settings.TOKEN_TIME) 1cdab

44 

45 expire = datetime.now(UTC) + expires_delta 1cdab

46 

47 to_encode["exp"] = expire 1cdab

48 to_encode["iss"] = ISS 1cdab

49 return ( 1cdab

50 jwt.encode(to_encode, settings.SECRET, algorithm=ALGORITHM), 

51 expires_delta, 

52 ) 

53 

54 def try_get_user(self, username: str) -> PrivateUser | None: 1e

55 """Try to get a user from the database, first trying username, then trying email""" 

56 if self.__has_tried_user: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true1cfdghaijklmnopbqr

57 return self.user 

58 

59 db = get_repositories(self.session, group_id=None, household_id=None) 1cfdghaijklmnopbqr

60 

61 user = user = db.users.get_one(username, "username", any_case=True) 1cfdghaijklmnopbqr

62 if not user: 1cfdghaijklmnopbqr

63 user = db.users.get_one(username, "email", any_case=True) 1fghaijklmnopbqr

64 

65 self.user = user 1cfdghaijklmnopbqr

66 return user 1cfdghaijklmnopbqr

67 

68 @abc.abstractmethod 1e

69 def authenticate(self) -> tuple[str, timedelta] | None: 1e

70 """Attempt to authenticate a user""" 

71 raise NotImplementedError