Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/security/providers/ldap_provider.py: 13%

102 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1from datetime import timedelta 1a

2 

3import ldap 1a

4from ldap.ldapobject import LDAPObject 1a

5from sqlalchemy.orm.session import Session 1a

6 

7from mealie.core import root_logger 1a

8from mealie.core.config import get_app_settings 1a

9from mealie.core.security.providers.credentials_provider import CredentialsProvider 1a

10from mealie.db.models.users.users import AuthMethod 1a

11from mealie.repos.all_repositories import get_repositories 1a

12from mealie.schema.user.auth import CredentialsRequest 1a

13from mealie.schema.user.user import PrivateUser 1a

14 

15 

16class LDAPProvider(CredentialsProvider): 1a

17 """Authentication provider that authenticats a user against an LDAP server using username/password combination""" 

18 

19 _logger = root_logger.get_logger("ldap_provider") 1a

20 

21 def __init__(self, session: Session, data: CredentialsRequest) -> None: 1a

22 super().__init__(session, data) 

23 self.conn = None 

24 

25 def authenticate(self) -> tuple[str, timedelta] | None: 1a

26 """Attempt to authenticate a user given a username and password against an LDAP provider""" 

27 # When LDAP is enabled, we need to still also support authentication with Mealie backend 

28 # First we look to see if we have a user. If we don't we'll attempt to create one with LDAP 

29 # If we do find a user, we will check if their auth method is LDAP and attempt to authenticate 

30 # Otherwise, we will proceed with Mealie authentication 

31 user = self.try_get_user(self.data.username) 

32 if not user or user.auth_method == AuthMethod.LDAP: 

33 user = self.get_user() 

34 if user: 

35 return self.get_access_token(user, self.data.remember_me) 

36 

37 return super().authenticate() 

38 

39 def search_user(self, conn: LDAPObject) -> list[tuple[str, dict[str, list[bytes]]]] | None: 1a

40 """ 

41 Searches for a user by LDAP_ID_ATTRIBUTE, LDAP_MAIL_ATTRIBUTE, and the provided LDAP_USER_FILTER. 

42 If none or multiple users are found, return False 

43 """ 

44 if not self.data: 

45 return None 

46 settings = get_app_settings() 

47 

48 user_filter = "" 

49 if settings.LDAP_USER_FILTER: 

50 # fill in the template provided by the user to maintain backwards compatibility 

51 user_filter = settings.LDAP_USER_FILTER.format( 

52 id_attribute=settings.LDAP_ID_ATTRIBUTE, 

53 mail_attribute=settings.LDAP_MAIL_ATTRIBUTE, 

54 input=self.data.username, 

55 ) 

56 # Don't assume the provided search filter has (|({id_attribute}={input})({mail_attribute}={input})) 

57 search_filter = "(&(|({id_attribute}={input})({mail_attribute}={input})){filter})".format( 

58 id_attribute=settings.LDAP_ID_ATTRIBUTE, 

59 mail_attribute=settings.LDAP_MAIL_ATTRIBUTE, 

60 input=self.data.username, 

61 filter=user_filter, 

62 ) 

63 

64 user_entry: list[tuple[str, dict[str, list[bytes]]]] | None = None 

65 try: 

66 self._logger.debug(f"[LDAP] Starting search with filter: {search_filter}") 

67 user_entry = conn.search_s( 

68 settings.LDAP_BASE_DN, 

69 ldap.SCOPE_SUBTREE, 

70 search_filter, 

71 [ 

72 settings.LDAP_ID_ATTRIBUTE, 

73 settings.LDAP_NAME_ATTRIBUTE, 

74 settings.LDAP_MAIL_ATTRIBUTE, 

75 ], 

76 ) 

77 except ldap.FILTER_ERROR: 

78 self._logger.error("[LDAP] Bad user search filter") 

79 

80 if not user_entry: 

81 conn.unbind_s() 

82 self._logger.error("[LDAP] No user was found with the provided user filter") 

83 return None 

84 

85 # we only want the entries that have a dn 

86 user_entry = [(dn, attr) for dn, attr in user_entry if dn] 

87 

88 if len(user_entry) > 1: 

89 self._logger.warning("[LDAP] Multiple users found with the provided user filter") 

90 self._logger.debug(f"[LDAP] The following entries were returned: {user_entry}") 

91 conn.unbind_s() 

92 return None 

93 

94 return user_entry 

95 

96 def get_user(self) -> PrivateUser | None: 1a

97 """Given a username and password, tries to authenticate by BINDing to an 

98 LDAP server 

99 

100 If the BIND succeeds, it will either create a new user of that username on 

101 the server or return an existing one. 

102 Returns False on failure. 

103 """ 

104 

105 settings = get_app_settings() 

106 db = get_repositories(self.session, group_id=None, household_id=None) 

107 if not self.data: 

108 return None 

109 data = self.data 

110 

111 if settings.LDAP_TLS_INSECURE: 

112 ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) 

113 

114 conn = ldap.initialize(settings.LDAP_SERVER_URL) 

115 conn.set_option(ldap.OPT_PROTOCOL_VERSION, 3) 

116 conn.set_option(ldap.OPT_REFERRALS, 0) 

117 

118 if settings.LDAP_TLS_CACERTFILE: 

119 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, settings.LDAP_TLS_CACERTFILE) 

120 conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0) 

121 

122 if settings.LDAP_ENABLE_STARTTLS: 

123 conn.start_tls_s() 

124 

125 try: 

126 conn.simple_bind_s(settings.LDAP_QUERY_BIND, settings.LDAP_QUERY_PASSWORD) 

127 except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT): 

128 self._logger.error("[LDAP] Unable to bind to with provided user/password") 

129 conn.unbind_s() 

130 return None 

131 

132 user_entry = self.search_user(conn) 

133 if not user_entry: 

134 return None 

135 user_dn, user_attr = user_entry[0] 

136 

137 # Check the credentials of the user 

138 try: 

139 self._logger.debug(f"[LDAP] Attempting to bind with '{user_dn}' using the provided password") 

140 conn.simple_bind_s(user_dn, data.password) 

141 except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT): 

142 self._logger.error("[LDAP] Bind failed") 

143 conn.unbind_s() 

144 return None 

145 

146 user = self.try_get_user(data.username) 

147 

148 if user is None: 

149 self._logger.debug("[LDAP] User is not in Mealie. Creating a new account") 

150 

151 attribute_keys = { 

152 settings.LDAP_ID_ATTRIBUTE: "username", 

153 settings.LDAP_NAME_ATTRIBUTE: "name", 

154 settings.LDAP_MAIL_ATTRIBUTE: "mail", 

155 } 

156 attributes = {} 

157 for attribute_key, attribute_name in attribute_keys.items(): 

158 if attribute_key not in user_attr or len(user_attr[attribute_key]) == 0: 

159 self._logger.error( 

160 f"[LDAP] Unable to create user due to missing '{attribute_name}' ('{attribute_key}') attribute" 

161 ) 

162 self._logger.debug(f"[LDAP] User has the following attributes: {user_attr}") 

163 conn.unbind_s() 

164 return None 

165 attributes[attribute_key] = user_attr[attribute_key][0].decode("utf-8") 

166 

167 user = db.users.create( 

168 { 

169 "username": attributes[settings.LDAP_ID_ATTRIBUTE], 

170 "password": "LDAP", 

171 "full_name": attributes[settings.LDAP_NAME_ATTRIBUTE], 

172 "email": attributes[settings.LDAP_MAIL_ATTRIBUTE], 

173 "admin": False, 

174 "auth_method": AuthMethod.LDAP, 

175 }, 

176 ) 

177 

178 if settings.LDAP_ADMIN_FILTER: 

179 should_be_admin = len(conn.search_s(user_dn, ldap.SCOPE_BASE, settings.LDAP_ADMIN_FILTER, [])) > 0 

180 if user.admin != should_be_admin: 

181 self._logger.debug(f"[LDAP] {'Setting' if should_be_admin else 'Removing'} user as admin") 

182 user.admin = should_be_admin 

183 db.users.update(user.id, user) 

184 

185 conn.unbind_s() 

186 return user