Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scheduler/tasks/delete_old_checked_shopping_list_items.py: 30%
38 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 14:03 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 14:03 +0000
1from collections.abc import Callable 1a
3from pydantic import UUID4 1a
5from mealie.db.db_setup import session_context 1a
6from mealie.repos.all_repositories import get_repositories 1a
7from mealie.routes.households.controller_shopping_lists import publish_list_item_events 1a
8from mealie.schema.response.pagination import OrderDirection, PaginationQuery 1a
9from mealie.schema.user.user import DEFAULT_INTEGRATION_ID 1a
10from mealie.services.event_bus_service.event_bus_service import EventBusService 1a
11from mealie.services.event_bus_service.event_types import EventDocumentDataBase, EventTypes 1a
12from mealie.services.household_services.shopping_lists import ShoppingListService 1a
14MAX_CHECKED_ITEMS = 100 1a
17def _create_publish_event(event_bus_service: EventBusService): 1a
18 def publish_event(
19 event_type: EventTypes,
20 document_data: EventDocumentDataBase,
21 group_id: UUID4,
22 household_id: UUID4 | None,
23 message: str = "",
24 ):
25 event_bus_service.dispatch(
26 integration_id=DEFAULT_INTEGRATION_ID,
27 group_id=group_id,
28 household_id=household_id,
29 event_type=event_type,
30 document_data=document_data,
31 message=message,
32 )
34 return publish_event
37def _trim_list_items(shopping_list_service: ShoppingListService, shopping_list_id: UUID4, event_publisher: Callable): 1a
38 pagination = PaginationQuery(
39 page=1,
40 per_page=-1,
41 query_filter=f'shopping_list_id="{shopping_list_id}" AND checked=true',
42 order_by="updated_at",
43 order_direction=OrderDirection.desc,
44 )
45 query = shopping_list_service.list_items.page_all(pagination)
46 if len(query.items) <= MAX_CHECKED_ITEMS:
47 return
49 items_to_delete = query.items[MAX_CHECKED_ITEMS:]
50 items_response = shopping_list_service.bulk_delete_items([item.id for item in items_to_delete])
51 publish_list_item_events(event_publisher, items_response)
54def delete_old_checked_list_items(): 1a
55 with session_context() as session:
56 repos = get_repositories(session)
57 groups = repos.groups.page_all(PaginationQuery(page=1, per_page=-1)).items
59 for group in groups:
60 group_repos = get_repositories(session, group_id=group.id)
61 households = group_repos.households.page_all(PaginationQuery(page=1, per_page=-1)).items
62 event_bus_service = EventBusService(session=session)
63 event_publisher = _create_publish_event(event_bus_service)
65 for household in households:
66 household_repos = get_repositories(session, group_id=group.id, household_id=household.id)
68 shopping_list_service = ShoppingListService(household_repos)
69 shopping_list_data = household_repos.group_shopping_lists.page_all(PaginationQuery(page=1, per_page=-1))
70 for shopping_list in shopping_list_data.items:
71 _trim_list_items(
72 shopping_list_service,
73 shopping_list.id,
74 event_publisher,
75 )