Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/security/providers/openid_provider.py: 16%
70 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1from datetime import timedelta 1a
3from authlib.oidc.core import UserInfo 1a
4from sqlalchemy.orm.session import Session 1a
6from mealie.core import root_logger 1a
7from mealie.core.config import get_app_settings 1a
8from mealie.core.exceptions import MissingClaimException 1a
9from mealie.core.security.providers.auth_provider import AuthProvider 1a
10from mealie.db.models.users.users import AuthMethod 1a
11from mealie.repos.all_repositories import get_repositories 1a
14class OpenIDProvider(AuthProvider[UserInfo]): 1a
15 """Authentication provider that authenticates a user using a token from OIDC ID token"""
17 _logger = root_logger.get_logger("openid_provider") 1a
19 def __init__(self, session: Session, data: UserInfo, use_default_groups: bool = False) -> None: 1a
20 super().__init__(session, data)
21 self.use_default_groups = use_default_groups
23 def authenticate(self) -> tuple[str, timedelta] | None: 1a
24 """Attempt to authenticate a user given a username and password"""
26 settings = get_app_settings()
27 claims = self.data
28 if not claims:
29 self._logger.error("[OIDC] No claims in the id_token")
30 raise MissingClaimException()
32 # Log all claims for debugging
33 self._logger.debug("[OIDC] Received claims:")
34 for key, value in claims.items():
35 self._logger.debug("[OIDC] %s: %s", key, value)
37 if not self.required_claims.issubset(claims.keys()):
38 self._logger.error(
39 "[OIDC] Required claims not present. Expected: %s Actual: %s",
40 self.required_claims,
41 claims.keys(),
42 )
43 raise MissingClaimException()
45 # Check for empty required claims
46 for claim in self.required_claims:
47 if not claims.get(claim):
48 self._logger.error("[OIDC] Required claim '%s' is empty", claim)
49 raise MissingClaimException()
51 repos = get_repositories(self.session, group_id=None, household_id=None)
53 is_admin = False
54 if settings.OIDC_REQUIRES_GROUP_CLAIM:
55 # We explicitly allow the groups claim to be missing to account for the behaviour of some IdPs:
56 # https://github.com/keycloak/keycloak/issues/22340
57 # We still log a warning though
58 if settings.OIDC_GROUPS_CLAIM not in claims:
59 self._logger.warning(
60 "[OIDC] claims did not include a %s claim%s",
61 settings.OIDC_GROUPS_CLAIM,
62 ", using an empty list as default" if self.use_default_groups else "",
63 )
64 group_claim = claims.get(settings.OIDC_GROUPS_CLAIM, []) or []
65 is_admin = settings.OIDC_ADMIN_GROUP in group_claim if settings.OIDC_ADMIN_GROUP else False
66 is_valid_user = settings.OIDC_USER_GROUP in group_claim if settings.OIDC_USER_GROUP else True
68 if not (is_valid_user or is_admin):
69 self._logger.warning(
70 "[OIDC] Successfully authenticated, but user does not have one of the required group(s). \
71 Found: %s - Required (one of): %s",
72 group_claim,
73 [settings.OIDC_USER_GROUP, settings.OIDC_ADMIN_GROUP],
74 )
75 return None
77 user = self.try_get_user(claims.get(settings.OIDC_USER_CLAIM))
78 if not user:
79 if not settings.OIDC_SIGNUP_ENABLED:
80 self._logger.debug("[OIDC] No user found. Not creating a new user - new user creation is disabled.")
81 return None
83 self._logger.debug("[OIDC] No user found. Creating new OIDC user.")
85 try:
86 # some IdPs don't provide a username (looking at you Google), so if we don't have the claim,
87 # we'll create the user with whatever the USER_CLAIM is (default email)
88 username = claims.get(
89 "preferred_username", claims.get("username", claims.get(settings.OIDC_USER_CLAIM))
90 )
91 user = repos.users.create(
92 {
93 "username": username,
94 "password": "OIDC",
95 "full_name": claims.get(settings.OIDC_NAME_CLAIM),
96 "email": claims.get("email"),
97 "admin": is_admin,
98 "auth_method": AuthMethod.OIDC,
99 }
100 )
101 self.session.commit()
103 except Exception as e:
104 self._logger.error("[OIDC] Exception while creating user: %s", e)
105 return None
107 return self.get_access_token(user, settings.OIDC_REMEMBER_ME) # type: ignore
109 if user:
110 if settings.OIDC_ADMIN_GROUP and user.admin != is_admin:
111 self._logger.debug("[OIDC] %s user as admin", "Setting" if is_admin else "Removing")
112 user.admin = is_admin
113 repos.users.update(user.id, user)
114 return self.get_access_token(user, settings.OIDC_REMEMBER_ME)
116 self._logger.warning("[OIDC] Found user but their AuthMethod does not match OIDC")
117 return None
119 @property 1a
120 def required_claims(self): 1a
121 settings = get_app_settings()
123 claims = {settings.OIDC_NAME_CLAIM, "email", settings.OIDC_USER_CLAIM}
124 if settings.OIDC_REQUIRES_GROUP_CLAIM and not self.use_default_groups:
125 claims.add(settings.OIDC_GROUPS_CLAIM)
126 return claims