Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/security/providers/auth_provider.py: 88%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:32 +0000

1import abc 1c

2from datetime import UTC, datetime, timedelta 1c

3 

4import jwt 1c

5from sqlalchemy.orm.session import Session 1c

6 

7from mealie.core.config import get_app_settings 1c

8from mealie.repos.all_repositories import get_repositories 1c

9from mealie.schema.user.user import PrivateUser 1c

10 

11ALGORITHM = "HS256" 1c

12ISS = "mealie" 1c

13remember_me_duration = timedelta(days=14) 1c

14 

15 

16class AuthProvider[T](metaclass=abc.ABCMeta): 1c

17 """Base Authentication Provider interface""" 

18 

19 def __init__(self, session: Session, data: T) -> None: 1c

20 self.session = session 1bdefghaijk

21 self.data = data 1bdefghaijk

22 self.user: PrivateUser | None = None 1bdefghaijk

23 self.__has_tried_user = False 1bdefghaijk

24 

25 @classmethod 1c

26 def __subclasshook__(cls, __subclass: type) -> bool: 1c

27 return hasattr(__subclass, "authenticate") and callable(__subclass.authenticate) 

28 

29 def get_access_token(self, user: PrivateUser, remember_me=False) -> tuple[str, timedelta]: 1c

30 settings = get_app_settings() 1ba

31 

32 duration = timedelta(hours=settings.TOKEN_TIME) 1ba

33 if remember_me: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true1ba

34 duration = max(remember_me_duration, duration) 

35 

36 return AuthProvider.create_access_token({"sub": str(user.id)}, duration) 1ba

37 

38 @staticmethod 1c

39 def create_access_token(data: dict, expires_delta: timedelta | None = None) -> tuple[str, timedelta]: 1c

40 settings = get_app_settings() 1ba

41 

42 to_encode = data.copy() 1ba

43 expires_delta = expires_delta or timedelta(hours=settings.TOKEN_TIME) 1ba

44 

45 expire = datetime.now(UTC) + expires_delta 1ba

46 

47 to_encode["exp"] = expire 1ba

48 to_encode["iss"] = ISS 1ba

49 return ( 1ba

50 jwt.encode(to_encode, settings.SECRET, algorithm=ALGORITHM), 

51 expires_delta, 

52 ) 

53 

54 def try_get_user(self, username: str) -> PrivateUser | None: 1c

55 """Try to get a user from the database, first trying username, then trying email""" 

56 if self.__has_tried_user: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true1bdefghaijk

57 return self.user 

58 

59 db = get_repositories(self.session, group_id=None, household_id=None) 1bdefghaijk

60 

61 user = user = db.users.get_one(username, "username", any_case=True) 1bdefghaijk

62 if not user: 1bdefghaijk

63 user = db.users.get_one(username, "email", any_case=True) 1defghaijk

64 

65 self.user = user 1bdefghaijk

66 return user 1bdefghaijk

67 

68 @abc.abstractmethod 1c

69 def authenticate(self) -> tuple[str, timedelta] | None: 1c

70 """Attempt to authenticate a user""" 

71 raise NotImplementedError