Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/security/providers/ldap_provider.py: 13%
102 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1from datetime import timedelta 1a
3import ldap 1a
4from ldap.ldapobject import LDAPObject 1a
5from sqlalchemy.orm.session import Session 1a
7from mealie.core import root_logger 1a
8from mealie.core.config import get_app_settings 1a
9from mealie.core.security.providers.credentials_provider import CredentialsProvider 1a
10from mealie.db.models.users.users import AuthMethod 1a
11from mealie.repos.all_repositories import get_repositories 1a
12from mealie.schema.user.auth import CredentialsRequest 1a
13from mealie.schema.user.user import PrivateUser 1a
16class LDAPProvider(CredentialsProvider): 1a
17 """Authentication provider that authenticats a user against an LDAP server using username/password combination"""
19 _logger = root_logger.get_logger("ldap_provider") 1a
21 def __init__(self, session: Session, data: CredentialsRequest) -> None: 1a
22 super().__init__(session, data)
23 self.conn = None
25 def authenticate(self) -> tuple[str, timedelta] | None: 1a
26 """Attempt to authenticate a user given a username and password against an LDAP provider"""
27 # When LDAP is enabled, we need to still also support authentication with Mealie backend
28 # First we look to see if we have a user. If we don't we'll attempt to create one with LDAP
29 # If we do find a user, we will check if their auth method is LDAP and attempt to authenticate
30 # Otherwise, we will proceed with Mealie authentication
31 user = self.try_get_user(self.data.username)
32 if not user or user.auth_method == AuthMethod.LDAP:
33 user = self.get_user()
34 if user:
35 return self.get_access_token(user, self.data.remember_me)
37 return super().authenticate()
39 def search_user(self, conn: LDAPObject) -> list[tuple[str, dict[str, list[bytes]]]] | None: 1a
40 """
41 Searches for a user by LDAP_ID_ATTRIBUTE, LDAP_MAIL_ATTRIBUTE, and the provided LDAP_USER_FILTER.
42 If none or multiple users are found, return False
43 """
44 if not self.data:
45 return None
46 settings = get_app_settings()
48 user_filter = ""
49 if settings.LDAP_USER_FILTER:
50 # fill in the template provided by the user to maintain backwards compatibility
51 user_filter = settings.LDAP_USER_FILTER.format(
52 id_attribute=settings.LDAP_ID_ATTRIBUTE,
53 mail_attribute=settings.LDAP_MAIL_ATTRIBUTE,
54 input=self.data.username,
55 )
56 # Don't assume the provided search filter has (|({id_attribute}={input})({mail_attribute}={input}))
57 search_filter = "(&(|({id_attribute}={input})({mail_attribute}={input})){filter})".format(
58 id_attribute=settings.LDAP_ID_ATTRIBUTE,
59 mail_attribute=settings.LDAP_MAIL_ATTRIBUTE,
60 input=self.data.username,
61 filter=user_filter,
62 )
64 user_entry: list[tuple[str, dict[str, list[bytes]]]] | None = None
65 try:
66 self._logger.debug(f"[LDAP] Starting search with filter: {search_filter}")
67 user_entry = conn.search_s(
68 settings.LDAP_BASE_DN,
69 ldap.SCOPE_SUBTREE,
70 search_filter,
71 [
72 settings.LDAP_ID_ATTRIBUTE,
73 settings.LDAP_NAME_ATTRIBUTE,
74 settings.LDAP_MAIL_ATTRIBUTE,
75 ],
76 )
77 except ldap.FILTER_ERROR:
78 self._logger.error("[LDAP] Bad user search filter")
80 if not user_entry:
81 conn.unbind_s()
82 self._logger.error("[LDAP] No user was found with the provided user filter")
83 return None
85 # we only want the entries that have a dn
86 user_entry = [(dn, attr) for dn, attr in user_entry if dn]
88 if len(user_entry) > 1:
89 self._logger.warning("[LDAP] Multiple users found with the provided user filter")
90 self._logger.debug(f"[LDAP] The following entries were returned: {user_entry}")
91 conn.unbind_s()
92 return None
94 return user_entry
96 def get_user(self) -> PrivateUser | None: 1a
97 """Given a username and password, tries to authenticate by BINDing to an
98 LDAP server
100 If the BIND succeeds, it will either create a new user of that username on
101 the server or return an existing one.
102 Returns False on failure.
103 """
105 settings = get_app_settings()
106 db = get_repositories(self.session, group_id=None, household_id=None)
107 if not self.data:
108 return None
109 data = self.data
111 if settings.LDAP_TLS_INSECURE:
112 ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
114 conn = ldap.initialize(settings.LDAP_SERVER_URL)
115 conn.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
116 conn.set_option(ldap.OPT_REFERRALS, 0)
118 if settings.LDAP_TLS_CACERTFILE:
119 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, settings.LDAP_TLS_CACERTFILE)
120 conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
122 if settings.LDAP_ENABLE_STARTTLS:
123 conn.start_tls_s()
125 try:
126 conn.simple_bind_s(settings.LDAP_QUERY_BIND, settings.LDAP_QUERY_PASSWORD)
127 except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT):
128 self._logger.error("[LDAP] Unable to bind to with provided user/password")
129 conn.unbind_s()
130 return None
132 user_entry = self.search_user(conn)
133 if not user_entry:
134 return None
135 user_dn, user_attr = user_entry[0]
137 # Check the credentials of the user
138 try:
139 self._logger.debug(f"[LDAP] Attempting to bind with '{user_dn}' using the provided password")
140 conn.simple_bind_s(user_dn, data.password)
141 except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT):
142 self._logger.error("[LDAP] Bind failed")
143 conn.unbind_s()
144 return None
146 user = self.try_get_user(data.username)
148 if user is None:
149 self._logger.debug("[LDAP] User is not in Mealie. Creating a new account")
151 attribute_keys = {
152 settings.LDAP_ID_ATTRIBUTE: "username",
153 settings.LDAP_NAME_ATTRIBUTE: "name",
154 settings.LDAP_MAIL_ATTRIBUTE: "mail",
155 }
156 attributes = {}
157 for attribute_key, attribute_name in attribute_keys.items():
158 if attribute_key not in user_attr or len(user_attr[attribute_key]) == 0:
159 self._logger.error(
160 f"[LDAP] Unable to create user due to missing '{attribute_name}' ('{attribute_key}') attribute"
161 )
162 self._logger.debug(f"[LDAP] User has the following attributes: {user_attr}")
163 conn.unbind_s()
164 return None
165 attributes[attribute_key] = user_attr[attribute_key][0].decode("utf-8")
167 user = db.users.create(
168 {
169 "username": attributes[settings.LDAP_ID_ATTRIBUTE],
170 "password": "LDAP",
171 "full_name": attributes[settings.LDAP_NAME_ATTRIBUTE],
172 "email": attributes[settings.LDAP_MAIL_ATTRIBUTE],
173 "admin": False,
174 "auth_method": AuthMethod.LDAP,
175 },
176 )
178 if settings.LDAP_ADMIN_FILTER:
179 should_be_admin = len(conn.search_s(user_dn, ldap.SCOPE_BASE, settings.LDAP_ADMIN_FILTER, [])) > 0
180 if user.admin != should_be_admin:
181 self._logger.debug(f"[LDAP] {'Setting' if should_be_admin else 'Removing'} user as admin")
182 user.admin = should_be_admin
183 db.users.update(user.id, user)
185 conn.unbind_s()
186 return user