Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/user_services/password_reset_service.py: 52%
46 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1from fastapi import HTTPException, status 1a
2from sqlalchemy.orm.session import Session 1a
4from mealie.core.security import hash_password, url_safe_token 1a
5from mealie.db.models.users.users import AuthMethod 1a
6from mealie.repos.all_repositories import get_repositories 1a
7from mealie.schema.user.user_passwords import SavePasswordResetToken 1a
8from mealie.services._base_service import BaseService 1a
9from mealie.services.email import EmailService 1a
12class PasswordResetService(BaseService): 1a
13 def __init__(self, session: Session) -> None: 1a
14 self.db = get_repositories(session, group_id=None, household_id=None)
15 super().__init__()
17 def generate_reset_token(self, email: str) -> SavePasswordResetToken | None: 1a
18 user = self.db.users.get_one(email, "email", any_case=True)
20 if user is None: 20 ↛ 24line 20 didn't jump to line 24 because the condition on line 20 was always true
21 self.logger.error(f"failed to create password reset for {email=}: user doesn't exists")
22 # Do not raise exception here as we don't want to confirm to the client that the Email doesn't exists
23 return None
24 elif user.auth_method == AuthMethod.LDAP:
25 self.logger.error(f"failed to create password reset for {email=}: user controlled by LDAP")
26 return None
28 # Create Reset Token
29 token = url_safe_token()
31 save_token = SavePasswordResetToken(user_id=user.id, token=token)
33 return self.db.tokens_pw_reset.create(save_token)
35 def send_reset_email(self, email: str, accept_language: str | None = None): 1a
36 token_entry = self.generate_reset_token(email)
38 if token_entry is None: 38 ↛ 42line 38 didn't jump to line 42 because the condition on line 38 was always true
39 return None
41 # Send Email
42 email_servive = EmailService(locale=accept_language)
43 reset_url = f"{self.settings.BASE_URL}/reset-password/?token={token_entry.token}"
45 try:
46 email_servive.send_forgot_password(email, reset_url)
47 except Exception as e:
48 self.logger.error(f"failed to send reset email: {e}")
49 raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Failed to send reset email") from e
51 def reset_password(self, token: str, new_password: str): 1a
52 # Validate Token
53 token_entry = self.db.tokens_pw_reset.get_one(token, "token")
55 if token_entry is None: 55 ↛ 59line 55 didn't jump to line 59 because the condition on line 55 was always true
56 self.logger.error("failed to reset password: invalid token")
57 raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid token")
59 user = self.db.users.get_one(token_entry.user_id)
60 # Update Password
61 password_hash = hash_password(new_password)
63 new_user = self.db.users.update_password(user.id, password_hash)
64 # Confirm Password
65 if new_user.password != password_hash:
66 self.logger.error("failed to reset password: invalid password")
67 raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid password")
69 # Delete Token from DB
70 self.db.tokens_pw_reset.delete(token_entry.token)