Coverage for opt/mealie/lib/python3.12/site-packages/mealie/routes/auth/auth.py: 55%

118 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:32 +0000

1from typing import Annotated, Literal 1a

2 

3from authlib.integrations.starlette_client import OAuth 1a

4from fastapi import APIRouter, Depends, Header, Request, Response, status 1a

5from fastapi.exceptions import HTTPException 1a

6from fastapi.responses import RedirectResponse 1a

7from pydantic import BaseModel 1a

8from sqlalchemy.orm.session import Session 1a

9from starlette.datastructures import URLPath 1a

10 

11from mealie.core import root_logger, security 1a

12from mealie.core.config import get_app_settings 1a

13from mealie.core.dependencies import get_current_user 1a

14from mealie.core.exceptions import MissingClaimException, UserLockedOut 1a

15from mealie.core.security.providers.openid_provider import OpenIDProvider 1a

16from mealie.core.security.security import get_auth_provider 1a

17from mealie.db.db_setup import generate_session 1a

18from mealie.lang import local_provider 1a

19from mealie.routes._base.routers import UserAPIRouter 1a

20from mealie.schema.user import PrivateUser 1a

21from mealie.schema.user.auth import CredentialsRequestForm 1a

22 

23from .auth_cache import AuthCache 1a

24 

25public_router = APIRouter(tags=["Users: Authentication"]) 1a

26user_router = UserAPIRouter(tags=["Users: Authentication"]) 1a

27logger = root_logger.get_logger("auth") 1a

28 

29 

30settings = get_app_settings() 1a

31if settings.OIDC_READY: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true1a

32 oauth = OAuth(cache=AuthCache()) 

33 scope = None 

34 if settings.OIDC_SCOPES_OVERRIDE: 

35 scope = settings.OIDC_SCOPES_OVERRIDE 

36 else: 

37 groups_claim = settings.OIDC_GROUPS_CLAIM if settings.OIDC_REQUIRES_GROUP_CLAIM else "" 

38 scope = f"openid email profile {groups_claim}" 

39 client_args = {"scope": scope.rstrip()} 

40 if settings.OIDC_TLS_CACERTFILE: 

41 client_args["verify"] = settings.OIDC_TLS_CACERTFILE 

42 

43 oauth.register( 

44 "oidc", 

45 client_id=settings.OIDC_CLIENT_ID, 

46 client_secret=settings.OIDC_CLIENT_SECRET, 

47 server_metadata_url=settings.OIDC_CONFIGURATION_URL, 

48 client_kwargs=client_args, 

49 code_challenge_method="S256", 

50 ) 

51 

52 

53class MealieAuthToken(BaseModel): 1a

54 access_token: str 1a

55 token_type: str = "bearer" 1a

56 

57 @classmethod 1a

58 def set_cookie( 1a

59 cls, response: Response, token: str, *, expires_in: int | float | None = None, samesite: str | None = None 

60 ): 

61 expires_in = int(expires_in) if expires_in else None 1bc

62 

63 # httponly=False to allow JS access for frontend 

64 response.set_cookie( 1bc

65 key="mealie.access_token", 

66 value=token, 

67 httponly=False, 

68 max_age=expires_in, 

69 secure=settings.PRODUCTION, 

70 samesite=samesite, 

71 ) 

72 

73 @classmethod 1a

74 def respond(cls, token: str, token_type: str = "bearer") -> dict: 1a

75 return cls(access_token=token, token_type=token_type).model_dump() 1bc

76 

77 

78def get_samesite(request: Request) -> Literal["lax", "none"]: 1a

79 """ 

80 Determine the appropriate samesite attribute for cookies. 

81 

82 `samesite="none"` is required for iframe support (i.e. embedding Mealie in another site) 

83 but only works over HTTPS. If `samesite="none"` is set over HTTP, most browsers will reject the cookie. 

84 

85 `samesite="lax"` is the default, which works regardless of HTTP or HTTPS, 

86 but does not support hosting in iframes. 

87 """ 

88 

89 forwarded_proto = request.headers.get("x-forwarded-proto", "").lower() 1bc

90 is_https = request.url.scheme == "https" or forwarded_proto == "https" 1bc

91 

92 if is_https and settings.PRODUCTION: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true1bc

93 return "none" 

94 else: 

95 # TODO: remove this once we resolve pending iframe issues 

96 if settings.PRODUCTION: 96 ↛ 100line 96 didn't jump to line 100 because the condition on line 96 was always true1bc

97 logger.debug("Setting samesite to 'lax' because connection is not HTTPS") 1bc

98 logger.debug(f"{request.url.scheme=} | {forwarded_proto=}") 1bc

99 

100 return "lax" 1bc

101 

102 

103@public_router.post("/token") 1a

104def get_token( 1a

105 request: Request, 

106 response: Response, 

107 data: CredentialsRequestForm = Depends(), 

108 session: Session = Depends(generate_session), 

109): 

110 if "x-forwarded-for" in request.headers: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true1befghicjkd

111 ip = request.headers["x-forwarded-for"] 

112 if "," in ip: # if there are multiple IPs, the first one is canonically the true client 

113 ip = str(ip.split(",")[0]) 

114 else: 

115 # request.client should never be null, except sometimes during testing 

116 ip = request.client.host if request.client else "unknown" 1befghicjkd

117 

118 try: 1befghicjkd

119 auth_provider = get_auth_provider(session, data) 1befghicjkd

120 auth = auth_provider.authenticate() 1befghicjkld

121 except UserLockedOut as e: 

122 logger.error(f"User is locked out from {ip}") 

123 raise HTTPException(status_code=status.HTTP_423_LOCKED, detail="User is locked out") from e 

124 

125 if not auth: 1befghicjkld

126 logger.error(f"Incorrect username or password from {ip}") 1efghicjkld

127 raise HTTPException( 1efghicjkld

128 status_code=status.HTTP_401_UNAUTHORIZED, 

129 ) 

130 

131 access_token, duration = auth 1bc

132 expires_in = duration.total_seconds() if duration else None 1bc

133 

134 MealieAuthToken.set_cookie( 1bc

135 response, 

136 access_token, 

137 expires_in=expires_in, 

138 samesite=get_samesite(request), 

139 ) 

140 return MealieAuthToken.respond(access_token) 1bc

141 

142 

143@public_router.get("/oauth") 1a

144async def oauth_login(request: Request): 1a

145 if not oauth: 145 ↛ anywhereline 145 didn't jump anywhere: it always raised an exception.1b

146 raise HTTPException( 

147 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

148 detail="Could not initialize OAuth client", 

149 ) 

150 client = oauth.create_client("oidc") 

151 redirect_url = None 

152 if not settings.PRODUCTION: 

153 # in development, we want to redirect to the frontend 

154 redirect_url = "http://localhost:3000/login" 

155 else: 

156 redirect_url = URLPath("/login").make_absolute_url(request.base_url) 

157 

158 response: RedirectResponse = await client.authorize_redirect(request, redirect_url) 

159 return response 

160 

161 

162@public_router.get("/oauth/callback") 1a

163async def oauth_callback(request: Request, response: Response, session: Session = Depends(generate_session)): 1a

164 if not oauth: 164 ↛ anywhereline 164 didn't jump anywhere: it always raised an exception.1b

165 raise HTTPException( 

166 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

167 detail="Could not initialize OAuth client", 

168 ) 

169 client = oauth.create_client("oidc") 

170 

171 token = await client.authorize_access_token(request) 

172 

173 auth = None 

174 try: 

175 auth_provider = OpenIDProvider(session, token["userinfo"]) 

176 auth = auth_provider.authenticate() 

177 except MissingClaimException: 

178 try: 

179 logger.debug("[OIDC] Claims not present in the ID token, pulling user info") 

180 userinfo = await client.userinfo(token=token) 

181 auth_provider = OpenIDProvider(session, userinfo, use_default_groups=True) 

182 auth = auth_provider.authenticate() 

183 except MissingClaimException: 

184 auth = None 

185 

186 if not auth: 

187 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) 

188 

189 access_token, duration = auth 

190 expires_in = duration.total_seconds() if duration else None 

191 

192 MealieAuthToken.set_cookie( 

193 response, 

194 access_token, 

195 expires_in=expires_in, 

196 samesite=get_samesite(request), 

197 ) 

198 return MealieAuthToken.respond(access_token) 

199 

200 

201@user_router.get("/refresh") 1a

202async def refresh_token(current_user: PrivateUser = Depends(get_current_user)): 1a

203 """Use a valid token to get another token""" 

204 access_token = security.create_access_token(data={"sub": str(current_user.id)}) 1b

205 return MealieAuthToken.respond(access_token) 1b

206 

207 

208@user_router.post("/logout") 1a

209async def logout( 1a

210 response: Response, 

211 accept_language: Annotated[str | None, Header()] = None, 

212): 

213 response.delete_cookie("mealie.access_token") 

214 

215 translator = local_provider(accept_language) 

216 return {"message": translator.t("notifications.logged-out")}