Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/user_services/password_reset_service.py: 52%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1from fastapi import HTTPException, status 1a

2from sqlalchemy.orm.session import Session 1a

3 

4from mealie.core.security import hash_password, url_safe_token 1a

5from mealie.db.models.users.users import AuthMethod 1a

6from mealie.repos.all_repositories import get_repositories 1a

7from mealie.schema.user.user_passwords import SavePasswordResetToken 1a

8from mealie.services._base_service import BaseService 1a

9from mealie.services.email import EmailService 1a

10 

11 

12class PasswordResetService(BaseService): 1a

13 def __init__(self, session: Session) -> None: 1a

14 self.db = get_repositories(session, group_id=None, household_id=None) 

15 super().__init__() 

16 

17 def generate_reset_token(self, email: str) -> SavePasswordResetToken | None: 1a

18 user = self.db.users.get_one(email, "email", any_case=True) 

19 

20 if user is None: 20 ↛ 24line 20 didn't jump to line 24 because the condition on line 20 was always true

21 self.logger.error(f"failed to create password reset for {email=}: user doesn't exists") 

22 # Do not raise exception here as we don't want to confirm to the client that the Email doesn't exists 

23 return None 

24 elif user.auth_method == AuthMethod.LDAP: 

25 self.logger.error(f"failed to create password reset for {email=}: user controlled by LDAP") 

26 return None 

27 

28 # Create Reset Token 

29 token = url_safe_token() 

30 

31 save_token = SavePasswordResetToken(user_id=user.id, token=token) 

32 

33 return self.db.tokens_pw_reset.create(save_token) 

34 

35 def send_reset_email(self, email: str, accept_language: str | None = None): 1a

36 token_entry = self.generate_reset_token(email) 

37 

38 if token_entry is None: 38 ↛ 42line 38 didn't jump to line 42 because the condition on line 38 was always true

39 return None 

40 

41 # Send Email 

42 email_servive = EmailService(locale=accept_language) 

43 reset_url = f"{self.settings.BASE_URL}/reset-password/?token={token_entry.token}" 

44 

45 try: 

46 email_servive.send_forgot_password(email, reset_url) 

47 except Exception as e: 

48 self.logger.error(f"failed to send reset email: {e}") 

49 raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Failed to send reset email") from e 

50 

51 def reset_password(self, token: str, new_password: str): 1a

52 # Validate Token 

53 token_entry = self.db.tokens_pw_reset.get_one(token, "token") 

54 

55 if token_entry is None: 55 ↛ 59line 55 didn't jump to line 59 because the condition on line 55 was always true

56 self.logger.error("failed to reset password: invalid token") 

57 raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid token") 

58 

59 user = self.db.users.get_one(token_entry.user_id) 

60 # Update Password 

61 password_hash = hash_password(new_password) 

62 

63 new_user = self.db.users.update_password(user.id, password_hash) 

64 # Confirm Password 

65 if new_user.password != password_hash: 

66 self.logger.error("failed to reset password: invalid password") 

67 raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid password") 

68 

69 # Delete Token from DB 

70 self.db.tokens_pw_reset.delete(token_entry.token)