Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scheduler/tasks/delete_old_checked_shopping_list_items.py: 30%

38 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 14:03 +0000

1from collections.abc import Callable 1a

2 

3from pydantic import UUID4 1a

4 

5from mealie.db.db_setup import session_context 1a

6from mealie.repos.all_repositories import get_repositories 1a

7from mealie.routes.households.controller_shopping_lists import publish_list_item_events 1a

8from mealie.schema.response.pagination import OrderDirection, PaginationQuery 1a

9from mealie.schema.user.user import DEFAULT_INTEGRATION_ID 1a

10from mealie.services.event_bus_service.event_bus_service import EventBusService 1a

11from mealie.services.event_bus_service.event_types import EventDocumentDataBase, EventTypes 1a

12from mealie.services.household_services.shopping_lists import ShoppingListService 1a

13 

14MAX_CHECKED_ITEMS = 100 1a

15 

16 

17def _create_publish_event(event_bus_service: EventBusService): 1a

18 def publish_event( 

19 event_type: EventTypes, 

20 document_data: EventDocumentDataBase, 

21 group_id: UUID4, 

22 household_id: UUID4 | None, 

23 message: str = "", 

24 ): 

25 event_bus_service.dispatch( 

26 integration_id=DEFAULT_INTEGRATION_ID, 

27 group_id=group_id, 

28 household_id=household_id, 

29 event_type=event_type, 

30 document_data=document_data, 

31 message=message, 

32 ) 

33 

34 return publish_event 

35 

36 

37def _trim_list_items(shopping_list_service: ShoppingListService, shopping_list_id: UUID4, event_publisher: Callable): 1a

38 pagination = PaginationQuery( 

39 page=1, 

40 per_page=-1, 

41 query_filter=f'shopping_list_id="{shopping_list_id}" AND checked=true', 

42 order_by="updated_at", 

43 order_direction=OrderDirection.desc, 

44 ) 

45 query = shopping_list_service.list_items.page_all(pagination) 

46 if len(query.items) <= MAX_CHECKED_ITEMS: 

47 return 

48 

49 items_to_delete = query.items[MAX_CHECKED_ITEMS:] 

50 items_response = shopping_list_service.bulk_delete_items([item.id for item in items_to_delete]) 

51 publish_list_item_events(event_publisher, items_response) 

52 

53 

54def delete_old_checked_list_items(): 1a

55 with session_context() as session: 

56 repos = get_repositories(session) 

57 groups = repos.groups.page_all(PaginationQuery(page=1, per_page=-1)).items 

58 

59 for group in groups: 

60 group_repos = get_repositories(session, group_id=group.id) 

61 households = group_repos.households.page_all(PaginationQuery(page=1, per_page=-1)).items 

62 event_bus_service = EventBusService(session=session) 

63 event_publisher = _create_publish_event(event_bus_service) 

64 

65 for household in households: 

66 household_repos = get_repositories(session, group_id=group.id, household_id=household.id) 

67 

68 shopping_list_service = ShoppingListService(household_repos) 

69 shopping_list_data = household_repos.group_shopping_lists.page_all(PaginationQuery(page=1, per_page=-1)) 

70 for shopping_list in shopping_list_data.items: 

71 _trim_list_items( 

72 shopping_list_service, 

73 shopping_list.id, 

74 event_publisher, 

75 )