Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/settings/settings.py: 85%
269 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1import logging 1a
2import os 1a
3import secrets 1a
4from datetime import UTC, datetime 1a
5from pathlib import Path 1a
6from typing import Annotated, Any, NamedTuple 1a
8from dateutil.tz import tzlocal 1a
9from pydantic import PlainSerializer, field_validator 1a
10from pydantic_settings import BaseSettings, SettingsConfigDict 1a
12from mealie.core.settings.themes import Theme 1a
14from .db_providers import AbstractDBProvider, db_provider_factory 1a
15from .static import PACKAGE_DIR 1a
18class ScheduleTime(NamedTuple): 1a
19 hour: int 1a
20 minute: int 1a
23class FeatureDetails(NamedTuple): 1a
24 enabled: bool 1a
25 """Indicates if the feature is enabled or not""" 1a
26 description: str | None 1a
27 """Short description describing why the feature is not ready""" 1a
29 def __str__(self): 1a
30 s = f"Enabled: {self.enabled}" 1a
31 if not self.enabled and self.description: 31 ↛ 33line 31 didn't jump to line 33 because the condition on line 31 was always true1a
32 s += f"\nReason: {self.description}" 1a
33 return s 1a
36MaskedNoneString = Annotated[ 1a
37 str | None,
38 PlainSerializer(lambda x: None if x is None else "*****", return_type=str | None),
39]
40""" 1a
41Custom serializer for sensitive settings. If the setting is None, then will serialize as null, otherwise,
42the secret will be serialized as '*****'
43"""
46def determine_secrets(data_dir: Path, secret: str, production: bool) -> str: 1a
47 if not production: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true1a
48 return "shh-secret-test-key"
50 secrets_file = data_dir.joinpath(secret) 1a
51 if secrets_file.is_file(): 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true1a
52 with open(secrets_file) as f:
53 return f.read()
54 else:
55 data_dir.mkdir(parents=True, exist_ok=True) 1a
56 with open(secrets_file, "w") as f: 1a
57 new_secret = secrets.token_hex(32) 1a
58 f.write(new_secret) 1a
59 return new_secret 1a
62def get_secrets_dir() -> str | None: 1a
63 """
64 Returns a directory to load secret settings from, or `None` if the secrets
65 directory does not exist or cannot be accessed.
66 """
67 # Avoid a circular import by importing here instead of at the file's top-level.
68 # get_logger -> AppSettings -> get_logger
69 from mealie.core.root_logger import get_logger 1a
71 logger = get_logger() 1a
73 secrets_dir = "/run/secrets" 1a
75 # Check that the secrets directory exists.
76 if not os.path.exists(secrets_dir): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true1a
77 logger.warning(f"Secrets directory '{secrets_dir}' does not exist")
78 return None
80 # Likewise, check we have permission to read from the secrets directory.
81 if not os.access(secrets_dir, os.R_OK): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true1a
82 logger.warning(f"Secrets directory '{secrets_dir}' cannot be read from. Check permissions")
83 return None
85 # The secrets directory exists and can be accessed.
86 return secrets_dir 1a
89class AppLoggingSettings(BaseSettings): 1a
90 """
91 Subset of AppSettings to only access logging-related settings.
93 This is separated out from AppSettings to allow logging during construction
94 of AppSettings.
95 """
97 TESTING: bool = False 1a
98 PRODUCTION: bool 1a
100 LOG_CONFIG_OVERRIDE: Path | None = None 1a
101 """path to custom logging configuration file""" 1a
103 LOG_LEVEL: str = "info" 1a
104 """corresponds to standard Python log levels""" 1a
107class AppSettings(AppLoggingSettings): 1a
108 theme: Theme = Theme() 1a
110 BASE_URL: str = "http://localhost:8080" 1a
111 """trailing slashes are trimmed (ex. `http://localhost:8080/` becomes ``http://localhost:8080`)""" 1a
113 STATIC_FILES: str = str(PACKAGE_DIR / "frontend") 1a
114 """path to static files directory (ex. `mealie/dist`)""" 1a
116 IS_DEMO: bool = False 1a
118 HOST_IP: str = "*" 1a
120 API_HOST: str = "0.0.0.0" 1a
121 API_PORT: int = 9000 1a
122 API_DOCS: bool = True 1a
123 TOKEN_TIME: int = 48 1a
124 """time in hours""" 1a
126 @field_validator("TOKEN_TIME") 1a
127 @classmethod 1a
128 def validate_token_time(cls, v: int) -> int: 1a
129 if v < 1: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true1a
130 raise ValueError("TOKEN_TIME must be at least 1 hour")
131 # If TOKEN_TIME is unreasonably high (e.g. hundreds of years), JWT encoding
132 # can overflow, so we set the max to 10 years (87600 hours).
133 if v > 87600: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true1a
134 raise ValueError("TOKEN_TIME is too high; maximum is 87600 hours (10 years)")
135 return v 1a
137 SECRET: str 1a
138 SESSION_SECRET: str 1a
140 GIT_COMMIT_HASH: str = "unknown" 1a
142 ALLOW_SIGNUP: bool = False 1a
143 ALLOW_PASSWORD_LOGIN: bool = True 1a
145 DAILY_SCHEDULE_TIME: str = "23:45" 1a
146 """Local server time, in HH:MM format. See `DAILY_SCHEDULE_TIME_UTC` for the parsed UTC equivalent""" 1a
148 @property 1a
149 def logger(self) -> logging.Logger: 1a
150 # Avoid a circular import by importing here instead of at the file's top-level.
151 # get_logger -> AppSettings -> get_logger
152 from mealie.core.root_logger import get_logger 1a
154 return get_logger() 1a
156 @property 1a
157 def DAILY_SCHEDULE_TIME_UTC(self) -> ScheduleTime: 1a
158 """The DAILY_SCHEDULE_TIME in UTC, parsed into hours and minutes"""
160 # parse DAILY_SCHEDULE_TIME into hours and minutes
161 try: 1a
162 hour_str, minute_str = self.DAILY_SCHEDULE_TIME.split(":") 1a
163 local_hour = int(hour_str) 1a
164 local_minute = int(minute_str) 1a
165 except ValueError:
166 local_hour = 23
167 local_minute = 45
168 self.logger.exception(
169 f"Unable to parse {self.DAILY_SCHEDULE_TIME=} as HH:MM; defaulting to {local_hour}:{local_minute}"
170 )
172 # DAILY_SCHEDULE_TIME is in local time, so we convert it to UTC
173 local_tz = tzlocal() 1a
174 now = datetime.now(local_tz) 1a
175 local_time = now.replace(hour=local_hour, minute=local_minute) 1a
176 utc_time = local_time.astimezone(UTC) 1a
178 self.logger.debug(f"Local time: {local_hour}:{local_minute} | UTC time: {utc_time.hour}:{utc_time.minute}") 1a
179 return ScheduleTime(utc_time.hour, utc_time.minute) 1a
181 # ===============================================
182 # Security Configuration
184 SECURITY_MAX_LOGIN_ATTEMPTS: int = 5 1a
185 SECURITY_USER_LOCKOUT_TIME: int = 24 1a
186 "time in hours" 1a
188 @field_validator("BASE_URL") 1a
189 @classmethod 1a
190 def remove_trailing_slash(cls, v: str) -> str: 1a
191 if v and v[-1] == "/": 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true1a
192 return v[:-1]
194 return v 1a
196 @property 1a
197 def DOCS_URL(self) -> str | None: 1a
198 return "/docs" if self.API_DOCS else None 1a
200 @property 1a
201 def REDOC_URL(self) -> str | None: 1a
202 return "/redoc" if self.API_DOCS else None 1a
204 # ===============================================
205 # Database Configuration
207 DB_ENGINE: str = "sqlite" # Options: 'sqlite', 'postgres' 1a
208 DB_PROVIDER: AbstractDBProvider | None = None 1a
210 SQLITE_MIGRATE_JOURNAL_WAL: bool = False 1a
212 @property 1a
213 def DB_URL(self) -> str | None: 1a
214 return self.DB_PROVIDER.db_url if self.DB_PROVIDER else None 1a
216 @property 1a
217 def DB_URL_PUBLIC(self) -> str | None: 1a
218 return self.DB_PROVIDER.db_url_public if self.DB_PROVIDER else None
220 DEFAULT_GROUP: str = "Home" 1a
221 DEFAULT_HOUSEHOLD: str = "Family" 1a
223 _DEFAULT_EMAIL: str = "changeme@example.com" 1a
224 """ 1a
225 This is the default email used for the first user created in the database. This is only used if no users
226 exist in the database. it should no longer be set by end users.
227 """
228 _DEFAULT_PASSWORD: str = "MyPassword" 1a
229 """ 1a
230 This is the default password used for the first user created in the database. This is only used if no users
231 exist in the database. it should no longer be set by end users.
232 """
234 # ===============================================
235 # Email Configuration
237 SMTP_HOST: str | None = None 1a
238 SMTP_PORT: str | None = "587" 1a
239 SMTP_FROM_NAME: str | None = "Mealie" 1a
240 SMTP_FROM_EMAIL: str | None = None 1a
241 SMTP_USER: MaskedNoneString = None 1a
242 SMTP_PASSWORD: MaskedNoneString = None 1a
243 SMTP_AUTH_STRATEGY: str | None = "TLS" # Options: 'TLS', 'SSL', 'NONE' 1a
245 @property 1a
246 def SMTP_ENABLE(self) -> bool: 1a
247 return self.SMTP_FEATURE.enabled
249 @property 1a
250 def SMTP_FEATURE(self) -> FeatureDetails: 1a
251 return AppSettings.validate_smtp( 1ab
252 self.SMTP_HOST,
253 self.SMTP_PORT,
254 self.SMTP_FROM_NAME,
255 self.SMTP_FROM_EMAIL,
256 self.SMTP_AUTH_STRATEGY,
257 self.SMTP_USER,
258 self.SMTP_PASSWORD,
259 )
261 @staticmethod 1a
262 def validate_smtp( 1a
263 host: str | None = None,
264 port: str | None = None,
265 from_name: str | None = None,
266 from_email: str | None = None,
267 strategy: str | None = None,
268 user: str | None = None,
269 password: str | None = None,
270 ) -> FeatureDetails:
271 """Validates all SMTP variables are set"""
272 description = None 1ab
273 required = { 1ab
274 "SMTP_HOST": host,
275 "SMTP_PORT": port,
276 "SMTP_FROM_NAME": from_name,
277 "SMTP_FROM_EMAIL": from_email,
278 "SMTP_AUTH_STRATEGY": strategy,
279 }
280 missing_values = [key for (key, value) in required.items() if value is None] 1ab
281 if missing_values: 281 ↛ 284line 281 didn't jump to line 284 because the condition on line 281 was always true1ab
282 description = f"Missing required values for {missing_values}" 1ab
284 if strategy and strategy.upper() in {"TLS", "SSL"}: 284 ↛ 291line 284 didn't jump to line 291 because the condition on line 284 was always true1ab
285 required["SMTP_USER"] = user 1ab
286 required["SMTP_PASSWORD"] = password 1ab
287 if not description: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true1ab
288 missing_values = [key for (key, value) in required.items() if value is None]
289 description = f"Missing required values for {missing_values} because SMTP_AUTH_STRATEGY is not None"
291 not_none = "" not in required.values() and None not in required.values() 1ab
293 return FeatureDetails(enabled=not_none, description=description) 1ab
295 # ===============================================
296 # LDAP Configuration
298 LDAP_AUTH_ENABLED: bool = False 1a
299 LDAP_SERVER_URL: str | None = None 1a
300 LDAP_TLS_INSECURE: bool = False 1a
301 LDAP_TLS_CACERTFILE: str | None = None 1a
302 LDAP_ENABLE_STARTTLS: bool = False 1a
303 LDAP_BASE_DN: str | None = None 1a
304 LDAP_QUERY_BIND: str | None = None 1a
305 LDAP_QUERY_PASSWORD: MaskedNoneString = None 1a
306 LDAP_USER_FILTER: str | None = None 1a
307 LDAP_ADMIN_FILTER: str | None = None 1a
308 LDAP_ID_ATTRIBUTE: str = "uid" 1a
309 LDAP_MAIL_ATTRIBUTE: str = "mail" 1a
310 LDAP_NAME_ATTRIBUTE: str = "name" 1a
312 @property 1a
313 def LDAP_FEATURE(self) -> FeatureDetails: 1a
314 description = None if self.LDAP_AUTH_ENABLED else "LDAP_AUTH_ENABLED is false" 1acdejfgkhlb
315 required = { 1acdejfgkhlb
316 "LDAP_SERVER_URL": self.LDAP_SERVER_URL,
317 "LDAP_BASE_DN": self.LDAP_BASE_DN,
318 "LDAP_ID_ATTRIBUTE": self.LDAP_ID_ATTRIBUTE,
319 "LDAP_MAIL_ATTRIBUTE": self.LDAP_MAIL_ATTRIBUTE,
320 "LDAP_NAME_ATTRIBUTE": self.LDAP_NAME_ATTRIBUTE,
321 }
322 not_none = None not in required.values() 1acdejfgkhlb
323 if not not_none and not description: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true1acdejfgkhlb
324 missing_values = [key for (key, value) in required.items() if value is None]
325 description = f"Missing required values for {missing_values}"
327 return FeatureDetails( 1acdejfgkhlb
328 enabled=self.LDAP_AUTH_ENABLED and not_none,
329 description=description,
330 )
332 @property 1a
333 def LDAP_ENABLED(self) -> bool: 1a
334 """Validates LDAP settings are all set"""
335 return self.LDAP_FEATURE.enabled 1cdejfgkhlb
337 # ===============================================
338 # OIDC Configuration
339 OIDC_AUTH_ENABLED: bool = False 1a
340 OIDC_CLIENT_ID: str | None = None 1a
341 OIDC_CLIENT_SECRET: MaskedNoneString = None 1a
342 OIDC_CONFIGURATION_URL: str | None = None 1a
343 OIDC_SIGNUP_ENABLED: bool = True 1a
344 OIDC_USER_GROUP: str | None = None 1a
345 OIDC_ADMIN_GROUP: str | None = None 1a
346 OIDC_AUTO_REDIRECT: bool = False 1a
347 OIDC_PROVIDER_NAME: str = "OAuth" 1a
348 OIDC_REMEMBER_ME: bool = False 1a
349 OIDC_USER_CLAIM: str = "email" 1a
350 OIDC_NAME_CLAIM: str = "name" 1a
351 OIDC_GROUPS_CLAIM: str | None = "groups" 1a
352 OIDC_SCOPES_OVERRIDE: str | None = None 1a
353 OIDC_TLS_CACERTFILE: str | None = None 1a
355 @property 1a
356 def OIDC_REQUIRES_GROUP_CLAIM(self) -> bool: 1a
357 return self.OIDC_USER_GROUP is not None or self.OIDC_ADMIN_GROUP is not None 1acdefghib
359 @property 1a
360 def OIDC_FEATURE(self) -> FeatureDetails: 1a
361 description = None if self.OIDC_AUTH_ENABLED else "OIDC_AUTH_ENABLED is false" 1acdefghib
362 required = { 1acdefghib
363 "OIDC_CLIENT_ID": self.OIDC_CLIENT_ID,
364 "OIDC_CLIENT_SECRET": self.OIDC_CLIENT_SECRET,
365 "OIDC_CONFIGURATION_URL": self.OIDC_CONFIGURATION_URL,
366 "OIDC_USER_CLAIM": self.OIDC_USER_CLAIM,
367 }
368 not_none = None not in required.values() 1acdefghib
369 if not not_none and not description: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true1acdefghib
370 missing_values = [key for (key, value) in required.items() if value is None]
371 description = f"Missing required values for {missing_values}"
373 valid_group_claim = True 1acdefghib
374 if self.OIDC_REQUIRES_GROUP_CLAIM and self.OIDC_GROUPS_CLAIM is None: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true1acdefghib
375 if not description:
376 description = "OIDC_GROUPS_CLAIM is required when OIDC_USER_GROUP or OIDC_ADMIN_GROUP are provided"
377 valid_group_claim = False
379 return FeatureDetails( 1acdefghib
380 enabled=self.OIDC_AUTH_ENABLED and not_none and valid_group_claim,
381 description=description,
382 )
384 @property 1a
385 def OIDC_READY(self) -> bool: 1a
386 """Validates OIDC settings are all set"""
387 return self.OIDC_FEATURE.enabled 1acdefghib
389 # ===============================================
390 # OpenAI Configuration
392 OPENAI_BASE_URL: str | None = None 1a
393 """The base URL for the OpenAI API. Leave this unset for most usecases""" 1a
394 OPENAI_API_KEY: MaskedNoneString = None 1a
395 """Your OpenAI API key. Required to enable OpenAI features""" 1a
396 OPENAI_MODEL: str = "gpt-4o" 1a
397 """Which OpenAI model to send requests to. Leave this unset for most usecases""" 1a
398 OPENAI_CUSTOM_HEADERS: dict[str, str] = {} 1a
399 """Custom HTTP headers to send with each OpenAI request""" 1a
400 OPENAI_CUSTOM_PARAMS: dict[str, Any] = {} 1a
401 """Custom HTTP parameters to send with each OpenAI request""" 1a
402 OPENAI_ENABLE_IMAGE_SERVICES: bool = True 1a
403 """Whether to enable image-related features in OpenAI""" 1a
404 OPENAI_WORKERS: int = 2 1a
405 """ 1a
406 Number of OpenAI workers per request. Higher values may increase
407 processing speed, but will incur additional API costs
408 """
409 OPENAI_SEND_DATABASE_DATA: bool = True 1a
410 """ 1a
411 Sending database data may increase accuracy in certain requests,
412 but will incur additional API costs
413 """
414 OPENAI_REQUEST_TIMEOUT: int = 300 1a
415 """ 1a
416 The number of seconds to wait for an OpenAI request to complete before cancelling the request
417 """
419 @property 1a
420 def OPENAI_FEATURE(self) -> FeatureDetails: 1a
421 description = None 1acdefghib
422 if not self.OPENAI_API_KEY: 422 ↛ 424line 422 didn't jump to line 424 because the condition on line 422 was always true1acdefghib
423 description = "OPENAI_API_KEY is not set" 1acdefghib
424 elif not self.OPENAI_MODEL:
425 description = "OPENAI_MODEL is not set"
427 return FeatureDetails( 1acdefghib
428 enabled=bool(self.OPENAI_API_KEY and self.OPENAI_MODEL),
429 description=description,
430 )
432 @property 1a
433 def OPENAI_ENABLED(self) -> bool: 1a
434 """Validates OpenAI settings are all set"""
435 return self.OPENAI_FEATURE.enabled 1cdefghib
437 # ===============================================
438 # Web Concurrency
440 WORKER_PER_CORE: int = 1 1a
441 """Old gunicorn env for workers per core.""" 1a
443 UVICORN_WORKERS: int = 1 1a
444 """Number of Uvicorn workers to run.""" 1a
446 @property 1a
447 def WORKERS(self) -> int: 1a
448 return max(1, self.WORKER_PER_CORE * self.UVICORN_WORKERS) 1a
450 model_config = SettingsConfigDict(arbitrary_types_allowed=True, extra="allow", env_nested_delimiter="__") 1a
452 # ===============================================
453 # TLS
455 TLS_CERTIFICATE_PATH: str | os.PathLike[str] | None = None 1a
456 """Path where the certificate resides.""" 1a
458 TLS_PRIVATE_KEY_PATH: str | os.PathLike[str] | None = None 1a
459 """Path where the private key resides.""" 1a
462def app_settings_constructor(data_dir: Path, production: bool, env_file: Path, env_encoding="utf-8") -> AppSettings: 1a
463 """
464 app_settings_constructor is a factory function that returns an AppSettings object. It is used to inject the
465 required dependencies into the AppSettings object and nested child objects. AppSettings should not be instantiated
466 directly, but rather through this factory function.
467 """
468 secret_settings = { 1a
469 "SECRET": determine_secrets(data_dir, ".secret", production),
470 "SESSION_SECRET": determine_secrets(data_dir, ".session_secret", production),
471 }
472 app_settings = AppSettings( 1a
473 _env_file=env_file, # type: ignore
474 _env_file_encoding=env_encoding, # type: ignore
475 # `get_secrets_dir` must be called here rather than within `AppSettings`
476 # to avoid a circular import.
477 _secrets_dir=get_secrets_dir(), # type: ignore
478 **secret_settings,
479 )
481 app_settings.DB_PROVIDER = db_provider_factory( 1a
482 app_settings.DB_ENGINE or "sqlite",
483 data_dir,
484 env_file=env_file,
485 env_encoding=env_encoding,
486 )
488 return app_settings 1a