Coverage for opt/mealie/lib/python3.12/site-packages/mealie/routes/users/crud.py: 62%

49 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:32 +0000

1from fastapi import HTTPException, status 1a

2from pydantic import UUID4 1a

3 

4from mealie.core.security import hash_password 1a

5from mealie.core.security.providers.credentials_provider import CredentialsProvider 1a

6from mealie.db.models.users.users import AuthMethod 1a

7from mealie.routes._base import BaseUserController, controller 1a

8from mealie.routes._base.routers import UserAPIRouter 1a

9from mealie.routes.users._helpers import assert_user_change_allowed 1a

10from mealie.schema.response import ErrorResponse, SuccessResponse 1a

11from mealie.schema.user import ChangePassword, UserBase, UserOut 1a

12from mealie.schema.user.user import UserRatings, UserRatingSummary 1a

13 

14user_router = UserAPIRouter(prefix="/users", tags=["Users: CRUD"]) 1a

15 

16 

17@controller(user_router) 1a

18class UserController(BaseUserController): 1a

19 @user_router.get("/self", response_model=UserOut) 1a

20 def get_logged_in_user(self): 1a

21 return self.user 1cdefghijkb

22 

23 @user_router.get("/self/ratings", response_model=UserRatings[UserRatingSummary]) 1a

24 def get_logged_in_user_ratings(self): 1a

25 return UserRatings(ratings=self.repos.user_ratings.get_by_user(self.user.id)) 

26 

27 @user_router.get("/self/ratings/{recipe_id}", response_model=UserRatingSummary) 1a

28 def get_logged_in_user_rating_for_recipe(self, recipe_id: UUID4): 1a

29 user_rating = self.repos.user_ratings.get_by_user_and_recipe(self.user.id, recipe_id) 

30 if user_rating: 

31 return user_rating 

32 else: 

33 raise HTTPException( 

34 status.HTTP_404_NOT_FOUND, 

35 ErrorResponse.respond("User has not rated this recipe"), 

36 ) 

37 

38 @user_router.get("/self/favorites", response_model=UserRatings[UserRatingSummary]) 1a

39 def get_logged_in_user_favorites(self): 1a

40 return UserRatings(ratings=self.repos.user_ratings.get_by_user(self.user.id, favorites_only=True)) 

41 

42 @user_router.put("/password") 1a

43 def update_password(self, password_change: ChangePassword): 1a

44 """Resets the User Password""" 

45 if self.user.auth_method == AuthMethod.LDAP: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 raise HTTPException( 

47 status.HTTP_400_BAD_REQUEST, ErrorResponse.respond(self.t("user.ldap-update-password-unavailable")) 

48 ) 

49 if not CredentialsProvider.verify_password(password_change.current_password, self.user.password): 49 ↛ 54line 49 didn't jump to line 54 because the condition on line 49 was always true

50 raise HTTPException( 

51 status.HTTP_400_BAD_REQUEST, ErrorResponse.respond(self.t("user.invalid-current-password")) 

52 ) 

53 

54 self.user.password = hash_password(password_change.new_password) 

55 try: 

56 self.repos.users.update_password(self.user.id, self.user.password) 

57 except Exception as e: 

58 raise HTTPException( 

59 status.HTTP_400_BAD_REQUEST, 

60 ErrorResponse.respond("Failed to update password"), 

61 ) from e 

62 

63 return SuccessResponse.respond(self.t("user.password-updated")) 

64 

65 @user_router.put("/{item_id}") 1a

66 def update_user(self, item_id: UUID4, new_data: UserBase): 1a

67 assert_user_change_allowed(item_id, self.user, new_data) 

68 

69 try: 

70 self.repos.users.update(item_id, new_data.model_dump()) 

71 except Exception as e: 

72 raise HTTPException( 

73 status.HTTP_400_BAD_REQUEST, 

74 ErrorResponse.respond("Failed to update user"), 

75 ) from e 

76 

77 return SuccessResponse.respond(self.t("user.user-updated"))