Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/security/providers/openid_provider.py: 16%

70 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1from datetime import timedelta 1a

2 

3from authlib.oidc.core import UserInfo 1a

4from sqlalchemy.orm.session import Session 1a

5 

6from mealie.core import root_logger 1a

7from mealie.core.config import get_app_settings 1a

8from mealie.core.exceptions import MissingClaimException 1a

9from mealie.core.security.providers.auth_provider import AuthProvider 1a

10from mealie.db.models.users.users import AuthMethod 1a

11from mealie.repos.all_repositories import get_repositories 1a

12 

13 

14class OpenIDProvider(AuthProvider[UserInfo]): 1a

15 """Authentication provider that authenticates a user using a token from OIDC ID token""" 

16 

17 _logger = root_logger.get_logger("openid_provider") 1a

18 

19 def __init__(self, session: Session, data: UserInfo, use_default_groups: bool = False) -> None: 1a

20 super().__init__(session, data) 

21 self.use_default_groups = use_default_groups 

22 

23 def authenticate(self) -> tuple[str, timedelta] | None: 1a

24 """Attempt to authenticate a user given a username and password""" 

25 

26 settings = get_app_settings() 

27 claims = self.data 

28 if not claims: 

29 self._logger.error("[OIDC] No claims in the id_token") 

30 raise MissingClaimException() 

31 

32 # Log all claims for debugging 

33 self._logger.debug("[OIDC] Received claims:") 

34 for key, value in claims.items(): 

35 self._logger.debug("[OIDC] %s: %s", key, value) 

36 

37 if not self.required_claims.issubset(claims.keys()): 

38 self._logger.error( 

39 "[OIDC] Required claims not present. Expected: %s Actual: %s", 

40 self.required_claims, 

41 claims.keys(), 

42 ) 

43 raise MissingClaimException() 

44 

45 # Check for empty required claims 

46 for claim in self.required_claims: 

47 if not claims.get(claim): 

48 self._logger.error("[OIDC] Required claim '%s' is empty", claim) 

49 raise MissingClaimException() 

50 

51 repos = get_repositories(self.session, group_id=None, household_id=None) 

52 

53 is_admin = False 

54 if settings.OIDC_REQUIRES_GROUP_CLAIM: 

55 # We explicitly allow the groups claim to be missing to account for the behaviour of some IdPs: 

56 # https://github.com/keycloak/keycloak/issues/22340 

57 # We still log a warning though 

58 if settings.OIDC_GROUPS_CLAIM not in claims: 

59 self._logger.warning( 

60 "[OIDC] claims did not include a %s claim%s", 

61 settings.OIDC_GROUPS_CLAIM, 

62 ", using an empty list as default" if self.use_default_groups else "", 

63 ) 

64 group_claim = claims.get(settings.OIDC_GROUPS_CLAIM, []) or [] 

65 is_admin = settings.OIDC_ADMIN_GROUP in group_claim if settings.OIDC_ADMIN_GROUP else False 

66 is_valid_user = settings.OIDC_USER_GROUP in group_claim if settings.OIDC_USER_GROUP else True 

67 

68 if not (is_valid_user or is_admin): 

69 self._logger.warning( 

70 "[OIDC] Successfully authenticated, but user does not have one of the required group(s). \ 

71 Found: %s - Required (one of): %s", 

72 group_claim, 

73 [settings.OIDC_USER_GROUP, settings.OIDC_ADMIN_GROUP], 

74 ) 

75 return None 

76 

77 user = self.try_get_user(claims.get(settings.OIDC_USER_CLAIM)) 

78 if not user: 

79 if not settings.OIDC_SIGNUP_ENABLED: 

80 self._logger.debug("[OIDC] No user found. Not creating a new user - new user creation is disabled.") 

81 return None 

82 

83 self._logger.debug("[OIDC] No user found. Creating new OIDC user.") 

84 

85 try: 

86 # some IdPs don't provide a username (looking at you Google), so if we don't have the claim, 

87 # we'll create the user with whatever the USER_CLAIM is (default email) 

88 username = claims.get( 

89 "preferred_username", claims.get("username", claims.get(settings.OIDC_USER_CLAIM)) 

90 ) 

91 user = repos.users.create( 

92 { 

93 "username": username, 

94 "password": "OIDC", 

95 "full_name": claims.get(settings.OIDC_NAME_CLAIM), 

96 "email": claims.get("email"), 

97 "admin": is_admin, 

98 "auth_method": AuthMethod.OIDC, 

99 } 

100 ) 

101 self.session.commit() 

102 

103 except Exception as e: 

104 self._logger.error("[OIDC] Exception while creating user: %s", e) 

105 return None 

106 

107 return self.get_access_token(user, settings.OIDC_REMEMBER_ME) # type: ignore 

108 

109 if user: 

110 if settings.OIDC_ADMIN_GROUP and user.admin != is_admin: 

111 self._logger.debug("[OIDC] %s user as admin", "Setting" if is_admin else "Removing") 

112 user.admin = is_admin 

113 repos.users.update(user.id, user) 

114 return self.get_access_token(user, settings.OIDC_REMEMBER_ME) 

115 

116 self._logger.warning("[OIDC] Found user but their AuthMethod does not match OIDC") 

117 return None 

118 

119 @property 1a

120 def required_claims(self): 1a

121 settings = get_app_settings() 

122 

123 claims = {settings.OIDC_NAME_CLAIM, "email", settings.OIDC_USER_CLAIM} 

124 if settings.OIDC_REQUIRES_GROUP_CLAIM and not self.use_default_groups: 

125 claims.add(settings.OIDC_GROUPS_CLAIM) 

126 return claims