Coverage for opt/mealie/lib/python3.12/site-packages/mealie/routes/auth/auth.py: 55%
118 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1from typing import Annotated, Literal 1a
3from authlib.integrations.starlette_client import OAuth 1a
4from fastapi import APIRouter, Depends, Header, Request, Response, status 1a
5from fastapi.exceptions import HTTPException 1a
6from fastapi.responses import RedirectResponse 1a
7from pydantic import BaseModel 1a
8from sqlalchemy.orm.session import Session 1a
9from starlette.datastructures import URLPath 1a
11from mealie.core import root_logger, security 1a
12from mealie.core.config import get_app_settings 1a
13from mealie.core.dependencies import get_current_user 1a
14from mealie.core.exceptions import MissingClaimException, UserLockedOut 1a
15from mealie.core.security.providers.openid_provider import OpenIDProvider 1a
16from mealie.core.security.security import get_auth_provider 1a
17from mealie.db.db_setup import generate_session 1a
18from mealie.lang import local_provider 1a
19from mealie.routes._base.routers import UserAPIRouter 1a
20from mealie.schema.user import PrivateUser 1a
21from mealie.schema.user.auth import CredentialsRequestForm 1a
23from .auth_cache import AuthCache 1a
25public_router = APIRouter(tags=["Users: Authentication"]) 1a
26user_router = UserAPIRouter(tags=["Users: Authentication"]) 1a
27logger = root_logger.get_logger("auth") 1a
30settings = get_app_settings() 1a
31if settings.OIDC_READY: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true1a
32 oauth = OAuth(cache=AuthCache())
33 scope = None
34 if settings.OIDC_SCOPES_OVERRIDE:
35 scope = settings.OIDC_SCOPES_OVERRIDE
36 else:
37 groups_claim = settings.OIDC_GROUPS_CLAIM if settings.OIDC_REQUIRES_GROUP_CLAIM else ""
38 scope = f"openid email profile {groups_claim}"
39 client_args = {"scope": scope.rstrip()}
40 if settings.OIDC_TLS_CACERTFILE:
41 client_args["verify"] = settings.OIDC_TLS_CACERTFILE
43 oauth.register(
44 "oidc",
45 client_id=settings.OIDC_CLIENT_ID,
46 client_secret=settings.OIDC_CLIENT_SECRET,
47 server_metadata_url=settings.OIDC_CONFIGURATION_URL,
48 client_kwargs=client_args,
49 code_challenge_method="S256",
50 )
53class MealieAuthToken(BaseModel): 1a
54 access_token: str 1a
55 token_type: str = "bearer" 1a
57 @classmethod 1a
58 def set_cookie( 1a
59 cls, response: Response, token: str, *, expires_in: int | float | None = None, samesite: str | None = None
60 ):
61 expires_in = int(expires_in) if expires_in else None 1bc
63 # httponly=False to allow JS access for frontend
64 response.set_cookie( 1bc
65 key="mealie.access_token",
66 value=token,
67 httponly=False,
68 max_age=expires_in,
69 secure=settings.PRODUCTION,
70 samesite=samesite,
71 )
73 @classmethod 1a
74 def respond(cls, token: str, token_type: str = "bearer") -> dict: 1a
75 return cls(access_token=token, token_type=token_type).model_dump() 1bc
78def get_samesite(request: Request) -> Literal["lax", "none"]: 1a
79 """
80 Determine the appropriate samesite attribute for cookies.
82 `samesite="none"` is required for iframe support (i.e. embedding Mealie in another site)
83 but only works over HTTPS. If `samesite="none"` is set over HTTP, most browsers will reject the cookie.
85 `samesite="lax"` is the default, which works regardless of HTTP or HTTPS,
86 but does not support hosting in iframes.
87 """
89 forwarded_proto = request.headers.get("x-forwarded-proto", "").lower() 1bc
90 is_https = request.url.scheme == "https" or forwarded_proto == "https" 1bc
92 if is_https and settings.PRODUCTION: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true1bc
93 return "none"
94 else:
95 # TODO: remove this once we resolve pending iframe issues
96 if settings.PRODUCTION: 96 ↛ 100line 96 didn't jump to line 100 because the condition on line 96 was always true1bc
97 logger.debug("Setting samesite to 'lax' because connection is not HTTPS") 1bc
98 logger.debug(f"{request.url.scheme=} | {forwarded_proto=}") 1bc
100 return "lax" 1bc
103@public_router.post("/token") 1a
104def get_token( 1a
105 request: Request,
106 response: Response,
107 data: CredentialsRequestForm = Depends(),
108 session: Session = Depends(generate_session),
109):
110 if "x-forwarded-for" in request.headers: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true1befghicjkd
111 ip = request.headers["x-forwarded-for"]
112 if "," in ip: # if there are multiple IPs, the first one is canonically the true client
113 ip = str(ip.split(",")[0])
114 else:
115 # request.client should never be null, except sometimes during testing
116 ip = request.client.host if request.client else "unknown" 1befghicjkd
118 try: 1befghicjkd
119 auth_provider = get_auth_provider(session, data) 1befghicjkd
120 auth = auth_provider.authenticate() 1befghicjkld
121 except UserLockedOut as e:
122 logger.error(f"User is locked out from {ip}")
123 raise HTTPException(status_code=status.HTTP_423_LOCKED, detail="User is locked out") from e
125 if not auth: 1befghicjkld
126 logger.error(f"Incorrect username or password from {ip}") 1efghicjkld
127 raise HTTPException( 1efghicjkld
128 status_code=status.HTTP_401_UNAUTHORIZED,
129 )
131 access_token, duration = auth 1bc
132 expires_in = duration.total_seconds() if duration else None 1bc
134 MealieAuthToken.set_cookie( 1bc
135 response,
136 access_token,
137 expires_in=expires_in,
138 samesite=get_samesite(request),
139 )
140 return MealieAuthToken.respond(access_token) 1bc
143@public_router.get("/oauth") 1a
144async def oauth_login(request: Request): 1a
145 if not oauth: 145 ↛ anywhereline 145 didn't jump anywhere: it always raised an exception.1b
146 raise HTTPException(
147 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
148 detail="Could not initialize OAuth client",
149 )
150 client = oauth.create_client("oidc")
151 redirect_url = None
152 if not settings.PRODUCTION:
153 # in development, we want to redirect to the frontend
154 redirect_url = "http://localhost:3000/login"
155 else:
156 redirect_url = URLPath("/login").make_absolute_url(request.base_url)
158 response: RedirectResponse = await client.authorize_redirect(request, redirect_url)
159 return response
162@public_router.get("/oauth/callback") 1a
163async def oauth_callback(request: Request, response: Response, session: Session = Depends(generate_session)): 1a
164 if not oauth: 164 ↛ anywhereline 164 didn't jump anywhere: it always raised an exception.1b
165 raise HTTPException(
166 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
167 detail="Could not initialize OAuth client",
168 )
169 client = oauth.create_client("oidc")
171 token = await client.authorize_access_token(request)
173 auth = None
174 try:
175 auth_provider = OpenIDProvider(session, token["userinfo"])
176 auth = auth_provider.authenticate()
177 except MissingClaimException:
178 try:
179 logger.debug("[OIDC] Claims not present in the ID token, pulling user info")
180 userinfo = await client.userinfo(token=token)
181 auth_provider = OpenIDProvider(session, userinfo, use_default_groups=True)
182 auth = auth_provider.authenticate()
183 except MissingClaimException:
184 auth = None
186 if not auth:
187 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
189 access_token, duration = auth
190 expires_in = duration.total_seconds() if duration else None
192 MealieAuthToken.set_cookie(
193 response,
194 access_token,
195 expires_in=expires_in,
196 samesite=get_samesite(request),
197 )
198 return MealieAuthToken.respond(access_token)
201@user_router.get("/refresh") 1a
202async def refresh_token(current_user: PrivateUser = Depends(get_current_user)): 1a
203 """Use a valid token to get another token"""
204 access_token = security.create_access_token(data={"sub": str(current_user.id)}) 1b
205 return MealieAuthToken.respond(access_token) 1b
208@user_router.post("/logout") 1a
209async def logout( 1a
210 response: Response,
211 accept_language: Annotated[str | None, Header()] = None,
212):
213 response.delete_cookie("mealie.access_token")
215 translator = local_provider(accept_language)
216 return {"message": translator.t("notifications.logged-out")}