Coverage for opt/mealie/lib/python3.12/site-packages/mealie/routes/admin/admin_management_groups.py: 48%
50 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1from functools import cached_property 1a
3from fastapi import APIRouter, Depends, HTTPException, status 1a
4from pydantic import UUID4 1a
6from mealie.schema.group.group import GroupAdminUpdate 1a
7from mealie.schema.mapper import mapper 1a
8from mealie.schema.response.pagination import PaginationQuery 1a
9from mealie.schema.response.responses import ErrorResponse 1a
10from mealie.schema.user.user import GroupBase, GroupInDB, GroupPagination 1a
11from mealie.services.group_services.group_service import GroupService 1a
13from .._base import BaseAdminController, controller 1a
14from .._base.mixins import HttpRepo 1a
16router = APIRouter(prefix="/groups") 1a
19@controller(router) 1a
20class AdminGroupManagementRoutes(BaseAdminController): 1a
21 @cached_property 1a
22 def repo(self): 1a
23 if not self.user:
24 raise Exception("No user is logged in.")
26 return self.repos.groups
28 # =======================================================================
29 # CRUD Operations
31 @property 1a
32 def mixins(self): 1a
33 return HttpRepo[GroupBase, GroupInDB, GroupAdminUpdate](
34 self.repo,
35 self.logger,
36 self.registered_exceptions,
37 )
39 @router.get("", response_model=GroupPagination) 1a
40 def get_all(self, q: PaginationQuery = Depends(PaginationQuery)): 1a
41 response = self.repo.page_all(
42 pagination=q,
43 override=GroupInDB,
44 )
46 response.set_pagination_guides(router.url_path_for("get_all"), q.model_dump())
47 return response
49 @router.post("", response_model=GroupInDB, status_code=status.HTTP_201_CREATED) 1a
50 def create_one(self, data: GroupBase): 1a
51 return GroupService.create_group(self.repos, data)
53 @router.get("/{item_id}", response_model=GroupInDB) 1a
54 def get_one(self, item_id: UUID4): 1a
55 return self.mixins.get_one(item_id)
57 @router.put("/{item_id}", response_model=GroupInDB) 1a
58 def update_one(self, item_id: UUID4, data: GroupAdminUpdate): 1a
59 group = self.repo.get_one(item_id)
61 if data.preferences:
62 preferences = self.repos.group_preferences.get_one(value=item_id, key="group_id")
63 preferences = mapper(data.preferences, preferences)
64 group.preferences = self.repos.group_preferences.update(item_id, preferences)
66 if data.name not in ["", group.name]:
67 # only update the group if the name changed, since the name is the only field that can be updated
68 group.name = data.name
69 group = self.repo.update(item_id, group)
71 return group
73 @router.delete("/{item_id}", response_model=GroupInDB) 1a
74 def delete_one(self, item_id: UUID4): 1a
75 item = self.repo.get_one(item_id)
77 if item and len(item.users) > 0:
78 raise HTTPException(
79 status_code=status.HTTP_400_BAD_REQUEST,
80 detail=ErrorResponse.respond(message="Cannot delete group with users"),
81 )
83 return self.mixins.delete_one(item_id)