Coverage for opt/mealie/lib/python3.12/site-packages/mealie/core/settings/settings.py: 85%

269 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1import logging 1a

2import os 1a

3import secrets 1a

4from datetime import UTC, datetime 1a

5from pathlib import Path 1a

6from typing import Annotated, Any, NamedTuple 1a

7 

8from dateutil.tz import tzlocal 1a

9from pydantic import PlainSerializer, field_validator 1a

10from pydantic_settings import BaseSettings, SettingsConfigDict 1a

11 

12from mealie.core.settings.themes import Theme 1a

13 

14from .db_providers import AbstractDBProvider, db_provider_factory 1a

15from .static import PACKAGE_DIR 1a

16 

17 

18class ScheduleTime(NamedTuple): 1a

19 hour: int 1a

20 minute: int 1a

21 

22 

23class FeatureDetails(NamedTuple): 1a

24 enabled: bool 1a

25 """Indicates if the feature is enabled or not""" 1a

26 description: str | None 1a

27 """Short description describing why the feature is not ready""" 1a

28 

29 def __str__(self): 1a

30 s = f"Enabled: {self.enabled}" 1a

31 if not self.enabled and self.description: 31 ↛ 33line 31 didn't jump to line 33 because the condition on line 31 was always true1a

32 s += f"\nReason: {self.description}" 1a

33 return s 1a

34 

35 

36MaskedNoneString = Annotated[ 1a

37 str | None, 

38 PlainSerializer(lambda x: None if x is None else "*****", return_type=str | None), 

39] 

40""" 1a

41Custom serializer for sensitive settings. If the setting is None, then will serialize as null, otherwise, 

42the secret will be serialized as '*****' 

43""" 

44 

45 

46def determine_secrets(data_dir: Path, secret: str, production: bool) -> str: 1a

47 if not production: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true1a

48 return "shh-secret-test-key" 

49 

50 secrets_file = data_dir.joinpath(secret) 1a

51 if secrets_file.is_file(): 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true1a

52 with open(secrets_file) as f: 

53 return f.read() 

54 else: 

55 data_dir.mkdir(parents=True, exist_ok=True) 1a

56 with open(secrets_file, "w") as f: 1a

57 new_secret = secrets.token_hex(32) 1a

58 f.write(new_secret) 1a

59 return new_secret 1a

60 

61 

62def get_secrets_dir() -> str | None: 1a

63 """ 

64 Returns a directory to load secret settings from, or `None` if the secrets 

65 directory does not exist or cannot be accessed. 

66 """ 

67 # Avoid a circular import by importing here instead of at the file's top-level. 

68 # get_logger -> AppSettings -> get_logger 

69 from mealie.core.root_logger import get_logger 1a

70 

71 logger = get_logger() 1a

72 

73 secrets_dir = "/run/secrets" 1a

74 

75 # Check that the secrets directory exists. 

76 if not os.path.exists(secrets_dir): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true1a

77 logger.warning(f"Secrets directory '{secrets_dir}' does not exist") 

78 return None 

79 

80 # Likewise, check we have permission to read from the secrets directory. 

81 if not os.access(secrets_dir, os.R_OK): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true1a

82 logger.warning(f"Secrets directory '{secrets_dir}' cannot be read from. Check permissions") 

83 return None 

84 

85 # The secrets directory exists and can be accessed. 

86 return secrets_dir 1a

87 

88 

89class AppLoggingSettings(BaseSettings): 1a

90 """ 

91 Subset of AppSettings to only access logging-related settings. 

92 

93 This is separated out from AppSettings to allow logging during construction 

94 of AppSettings. 

95 """ 

96 

97 TESTING: bool = False 1a

98 PRODUCTION: bool 1a

99 

100 LOG_CONFIG_OVERRIDE: Path | None = None 1a

101 """path to custom logging configuration file""" 1a

102 

103 LOG_LEVEL: str = "info" 1a

104 """corresponds to standard Python log levels""" 1a

105 

106 

107class AppSettings(AppLoggingSettings): 1a

108 theme: Theme = Theme() 1a

109 

110 BASE_URL: str = "http://localhost:8080" 1a

111 """trailing slashes are trimmed (ex. `http://localhost:8080/` becomes ``http://localhost:8080`)""" 1a

112 

113 STATIC_FILES: str = str(PACKAGE_DIR / "frontend") 1a

114 """path to static files directory (ex. `mealie/dist`)""" 1a

115 

116 IS_DEMO: bool = False 1a

117 

118 HOST_IP: str = "*" 1a

119 

120 API_HOST: str = "0.0.0.0" 1a

121 API_PORT: int = 9000 1a

122 API_DOCS: bool = True 1a

123 TOKEN_TIME: int = 48 1a

124 """time in hours""" 1a

125 

126 @field_validator("TOKEN_TIME") 1a

127 @classmethod 1a

128 def validate_token_time(cls, v: int) -> int: 1a

129 if v < 1: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true1a

130 raise ValueError("TOKEN_TIME must be at least 1 hour") 

131 # If TOKEN_TIME is unreasonably high (e.g. hundreds of years), JWT encoding 

132 # can overflow, so we set the max to 10 years (87600 hours). 

133 if v > 87600: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true1a

134 raise ValueError("TOKEN_TIME is too high; maximum is 87600 hours (10 years)") 

135 return v 1a

136 

137 SECRET: str 1a

138 SESSION_SECRET: str 1a

139 

140 GIT_COMMIT_HASH: str = "unknown" 1a

141 

142 ALLOW_SIGNUP: bool = False 1a

143 ALLOW_PASSWORD_LOGIN: bool = True 1a

144 

145 DAILY_SCHEDULE_TIME: str = "23:45" 1a

146 """Local server time, in HH:MM format. See `DAILY_SCHEDULE_TIME_UTC` for the parsed UTC equivalent""" 1a

147 

148 @property 1a

149 def logger(self) -> logging.Logger: 1a

150 # Avoid a circular import by importing here instead of at the file's top-level. 

151 # get_logger -> AppSettings -> get_logger 

152 from mealie.core.root_logger import get_logger 1a

153 

154 return get_logger() 1a

155 

156 @property 1a

157 def DAILY_SCHEDULE_TIME_UTC(self) -> ScheduleTime: 1a

158 """The DAILY_SCHEDULE_TIME in UTC, parsed into hours and minutes""" 

159 

160 # parse DAILY_SCHEDULE_TIME into hours and minutes 

161 try: 1a

162 hour_str, minute_str = self.DAILY_SCHEDULE_TIME.split(":") 1a

163 local_hour = int(hour_str) 1a

164 local_minute = int(minute_str) 1a

165 except ValueError: 

166 local_hour = 23 

167 local_minute = 45 

168 self.logger.exception( 

169 f"Unable to parse {self.DAILY_SCHEDULE_TIME=} as HH:MM; defaulting to {local_hour}:{local_minute}" 

170 ) 

171 

172 # DAILY_SCHEDULE_TIME is in local time, so we convert it to UTC 

173 local_tz = tzlocal() 1a

174 now = datetime.now(local_tz) 1a

175 local_time = now.replace(hour=local_hour, minute=local_minute) 1a

176 utc_time = local_time.astimezone(UTC) 1a

177 

178 self.logger.debug(f"Local time: {local_hour}:{local_minute} | UTC time: {utc_time.hour}:{utc_time.minute}") 1a

179 return ScheduleTime(utc_time.hour, utc_time.minute) 1a

180 

181 # =============================================== 

182 # Security Configuration 

183 

184 SECURITY_MAX_LOGIN_ATTEMPTS: int = 5 1a

185 SECURITY_USER_LOCKOUT_TIME: int = 24 1a

186 "time in hours" 1a

187 

188 @field_validator("BASE_URL") 1a

189 @classmethod 1a

190 def remove_trailing_slash(cls, v: str) -> str: 1a

191 if v and v[-1] == "/": 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true1a

192 return v[:-1] 

193 

194 return v 1a

195 

196 @property 1a

197 def DOCS_URL(self) -> str | None: 1a

198 return "/docs" if self.API_DOCS else None 1a

199 

200 @property 1a

201 def REDOC_URL(self) -> str | None: 1a

202 return "/redoc" if self.API_DOCS else None 1a

203 

204 # =============================================== 

205 # Database Configuration 

206 

207 DB_ENGINE: str = "sqlite" # Options: 'sqlite', 'postgres' 1a

208 DB_PROVIDER: AbstractDBProvider | None = None 1a

209 

210 SQLITE_MIGRATE_JOURNAL_WAL: bool = False 1a

211 

212 @property 1a

213 def DB_URL(self) -> str | None: 1a

214 return self.DB_PROVIDER.db_url if self.DB_PROVIDER else None 1a

215 

216 @property 1a

217 def DB_URL_PUBLIC(self) -> str | None: 1a

218 return self.DB_PROVIDER.db_url_public if self.DB_PROVIDER else None 

219 

220 DEFAULT_GROUP: str = "Home" 1a

221 DEFAULT_HOUSEHOLD: str = "Family" 1a

222 

223 _DEFAULT_EMAIL: str = "changeme@example.com" 1a

224 """ 1a

225 This is the default email used for the first user created in the database. This is only used if no users 

226 exist in the database. it should no longer be set by end users. 

227 """ 

228 _DEFAULT_PASSWORD: str = "MyPassword" 1a

229 """ 1a

230 This is the default password used for the first user created in the database. This is only used if no users 

231 exist in the database. it should no longer be set by end users. 

232 """ 

233 

234 # =============================================== 

235 # Email Configuration 

236 

237 SMTP_HOST: str | None = None 1a

238 SMTP_PORT: str | None = "587" 1a

239 SMTP_FROM_NAME: str | None = "Mealie" 1a

240 SMTP_FROM_EMAIL: str | None = None 1a

241 SMTP_USER: MaskedNoneString = None 1a

242 SMTP_PASSWORD: MaskedNoneString = None 1a

243 SMTP_AUTH_STRATEGY: str | None = "TLS" # Options: 'TLS', 'SSL', 'NONE' 1a

244 

245 @property 1a

246 def SMTP_ENABLE(self) -> bool: 1a

247 return self.SMTP_FEATURE.enabled 

248 

249 @property 1a

250 def SMTP_FEATURE(self) -> FeatureDetails: 1a

251 return AppSettings.validate_smtp( 1ab

252 self.SMTP_HOST, 

253 self.SMTP_PORT, 

254 self.SMTP_FROM_NAME, 

255 self.SMTP_FROM_EMAIL, 

256 self.SMTP_AUTH_STRATEGY, 

257 self.SMTP_USER, 

258 self.SMTP_PASSWORD, 

259 ) 

260 

261 @staticmethod 1a

262 def validate_smtp( 1a

263 host: str | None = None, 

264 port: str | None = None, 

265 from_name: str | None = None, 

266 from_email: str | None = None, 

267 strategy: str | None = None, 

268 user: str | None = None, 

269 password: str | None = None, 

270 ) -> FeatureDetails: 

271 """Validates all SMTP variables are set""" 

272 description = None 1ab

273 required = { 1ab

274 "SMTP_HOST": host, 

275 "SMTP_PORT": port, 

276 "SMTP_FROM_NAME": from_name, 

277 "SMTP_FROM_EMAIL": from_email, 

278 "SMTP_AUTH_STRATEGY": strategy, 

279 } 

280 missing_values = [key for (key, value) in required.items() if value is None] 1ab

281 if missing_values: 281 ↛ 284line 281 didn't jump to line 284 because the condition on line 281 was always true1ab

282 description = f"Missing required values for {missing_values}" 1ab

283 

284 if strategy and strategy.upper() in {"TLS", "SSL"}: 284 ↛ 291line 284 didn't jump to line 291 because the condition on line 284 was always true1ab

285 required["SMTP_USER"] = user 1ab

286 required["SMTP_PASSWORD"] = password 1ab

287 if not description: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true1ab

288 missing_values = [key for (key, value) in required.items() if value is None] 

289 description = f"Missing required values for {missing_values} because SMTP_AUTH_STRATEGY is not None" 

290 

291 not_none = "" not in required.values() and None not in required.values() 1ab

292 

293 return FeatureDetails(enabled=not_none, description=description) 1ab

294 

295 # =============================================== 

296 # LDAP Configuration 

297 

298 LDAP_AUTH_ENABLED: bool = False 1a

299 LDAP_SERVER_URL: str | None = None 1a

300 LDAP_TLS_INSECURE: bool = False 1a

301 LDAP_TLS_CACERTFILE: str | None = None 1a

302 LDAP_ENABLE_STARTTLS: bool = False 1a

303 LDAP_BASE_DN: str | None = None 1a

304 LDAP_QUERY_BIND: str | None = None 1a

305 LDAP_QUERY_PASSWORD: MaskedNoneString = None 1a

306 LDAP_USER_FILTER: str | None = None 1a

307 LDAP_ADMIN_FILTER: str | None = None 1a

308 LDAP_ID_ATTRIBUTE: str = "uid" 1a

309 LDAP_MAIL_ATTRIBUTE: str = "mail" 1a

310 LDAP_NAME_ATTRIBUTE: str = "name" 1a

311 

312 @property 1a

313 def LDAP_FEATURE(self) -> FeatureDetails: 1a

314 description = None if self.LDAP_AUTH_ENABLED else "LDAP_AUTH_ENABLED is false" 1avcdefghijkwlmnopb

315 required = { 1avcdefghijkwlmnopb

316 "LDAP_SERVER_URL": self.LDAP_SERVER_URL, 

317 "LDAP_BASE_DN": self.LDAP_BASE_DN, 

318 "LDAP_ID_ATTRIBUTE": self.LDAP_ID_ATTRIBUTE, 

319 "LDAP_MAIL_ATTRIBUTE": self.LDAP_MAIL_ATTRIBUTE, 

320 "LDAP_NAME_ATTRIBUTE": self.LDAP_NAME_ATTRIBUTE, 

321 } 

322 not_none = None not in required.values() 1avcdefghijkwlmnopb

323 if not not_none and not description: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true1avcdefghijkwlmnopb

324 missing_values = [key for (key, value) in required.items() if value is None] 

325 description = f"Missing required values for {missing_values}" 

326 

327 return FeatureDetails( 1avcdefghijkwlmnopb

328 enabled=self.LDAP_AUTH_ENABLED and not_none, 

329 description=description, 

330 ) 

331 

332 @property 1a

333 def LDAP_ENABLED(self) -> bool: 1a

334 """Validates LDAP settings are all set""" 

335 return self.LDAP_FEATURE.enabled 1vcdefghijkwlmnopb

336 

337 # =============================================== 

338 # OIDC Configuration 

339 OIDC_AUTH_ENABLED: bool = False 1a

340 OIDC_CLIENT_ID: str | None = None 1a

341 OIDC_CLIENT_SECRET: MaskedNoneString = None 1a

342 OIDC_CONFIGURATION_URL: str | None = None 1a

343 OIDC_SIGNUP_ENABLED: bool = True 1a

344 OIDC_USER_GROUP: str | None = None 1a

345 OIDC_ADMIN_GROUP: str | None = None 1a

346 OIDC_AUTO_REDIRECT: bool = False 1a

347 OIDC_PROVIDER_NAME: str = "OAuth" 1a

348 OIDC_REMEMBER_ME: bool = False 1a

349 OIDC_USER_CLAIM: str = "email" 1a

350 OIDC_NAME_CLAIM: str = "name" 1a

351 OIDC_GROUPS_CLAIM: str | None = "groups" 1a

352 OIDC_SCOPES_OVERRIDE: str | None = None 1a

353 OIDC_TLS_CACERTFILE: str | None = None 1a

354 

355 @property 1a

356 def OIDC_REQUIRES_GROUP_CLAIM(self) -> bool: 1a

357 return self.OIDC_USER_GROUP is not None or self.OIDC_ADMIN_GROUP is not None 1aqcrdstuefghijklmnopb

358 

359 @property 1a

360 def OIDC_FEATURE(self) -> FeatureDetails: 1a

361 description = None if self.OIDC_AUTH_ENABLED else "OIDC_AUTH_ENABLED is false" 1aqcrdstuefghijklmnopb

362 required = { 1aqcrdstuefghijklmnopb

363 "OIDC_CLIENT_ID": self.OIDC_CLIENT_ID, 

364 "OIDC_CLIENT_SECRET": self.OIDC_CLIENT_SECRET, 

365 "OIDC_CONFIGURATION_URL": self.OIDC_CONFIGURATION_URL, 

366 "OIDC_USER_CLAIM": self.OIDC_USER_CLAIM, 

367 } 

368 not_none = None not in required.values() 1aqcrdstuefghijklmnopb

369 if not not_none and not description: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true1aqcrdstuefghijklmnopb

370 missing_values = [key for (key, value) in required.items() if value is None] 

371 description = f"Missing required values for {missing_values}" 

372 

373 valid_group_claim = True 1aqcrdstuefghijklmnopb

374 if self.OIDC_REQUIRES_GROUP_CLAIM and self.OIDC_GROUPS_CLAIM is None: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true1aqcrdstuefghijklmnopb

375 if not description: 

376 description = "OIDC_GROUPS_CLAIM is required when OIDC_USER_GROUP or OIDC_ADMIN_GROUP are provided" 

377 valid_group_claim = False 

378 

379 return FeatureDetails( 1aqcrdstuefghijklmnopb

380 enabled=self.OIDC_AUTH_ENABLED and not_none and valid_group_claim, 

381 description=description, 

382 ) 

383 

384 @property 1a

385 def OIDC_READY(self) -> bool: 1a

386 """Validates OIDC settings are all set""" 

387 return self.OIDC_FEATURE.enabled 1aqcrdstuefghijklmnopb

388 

389 # =============================================== 

390 # OpenAI Configuration 

391 

392 OPENAI_BASE_URL: str | None = None 1a

393 """The base URL for the OpenAI API. Leave this unset for most usecases""" 1a

394 OPENAI_API_KEY: MaskedNoneString = None 1a

395 """Your OpenAI API key. Required to enable OpenAI features""" 1a

396 OPENAI_MODEL: str = "gpt-4o" 1a

397 """Which OpenAI model to send requests to. Leave this unset for most usecases""" 1a

398 OPENAI_CUSTOM_HEADERS: dict[str, str] = {} 1a

399 """Custom HTTP headers to send with each OpenAI request""" 1a

400 OPENAI_CUSTOM_PARAMS: dict[str, Any] = {} 1a

401 """Custom HTTP parameters to send with each OpenAI request""" 1a

402 OPENAI_ENABLE_IMAGE_SERVICES: bool = True 1a

403 """Whether to enable image-related features in OpenAI""" 1a

404 OPENAI_WORKERS: int = 2 1a

405 """ 1a

406 Number of OpenAI workers per request. Higher values may increase 

407 processing speed, but will incur additional API costs 

408 """ 

409 OPENAI_SEND_DATABASE_DATA: bool = True 1a

410 """ 1a

411 Sending database data may increase accuracy in certain requests, 

412 but will incur additional API costs 

413 """ 

414 OPENAI_REQUEST_TIMEOUT: int = 300 1a

415 """ 1a

416 The number of seconds to wait for an OpenAI request to complete before cancelling the request 

417 """ 

418 

419 @property 1a

420 def OPENAI_FEATURE(self) -> FeatureDetails: 1a

421 description = None 1aqcrdstuefghijklmnopb

422 if not self.OPENAI_API_KEY: 422 ↛ 424line 422 didn't jump to line 424 because the condition on line 422 was always true1aqcrdstuefghijklmnopb

423 description = "OPENAI_API_KEY is not set" 1aqcrdstuefghijklmnopb

424 elif not self.OPENAI_MODEL: 

425 description = "OPENAI_MODEL is not set" 

426 

427 return FeatureDetails( 1aqcrdstuefghijklmnopb

428 enabled=bool(self.OPENAI_API_KEY and self.OPENAI_MODEL), 

429 description=description, 

430 ) 

431 

432 @property 1a

433 def OPENAI_ENABLED(self) -> bool: 1a

434 """Validates OpenAI settings are all set""" 

435 return self.OPENAI_FEATURE.enabled 1qcrdstuefghijklmnopb

436 

437 # =============================================== 

438 # Web Concurrency 

439 

440 WORKER_PER_CORE: int = 1 1a

441 """Old gunicorn env for workers per core.""" 1a

442 

443 UVICORN_WORKERS: int = 1 1a

444 """Number of Uvicorn workers to run.""" 1a

445 

446 @property 1a

447 def WORKERS(self) -> int: 1a

448 return max(1, self.WORKER_PER_CORE * self.UVICORN_WORKERS) 1a

449 

450 model_config = SettingsConfigDict(arbitrary_types_allowed=True, extra="allow", env_nested_delimiter="__") 1a

451 

452 # =============================================== 

453 # TLS 

454 

455 TLS_CERTIFICATE_PATH: str | os.PathLike[str] | None = None 1a

456 """Path where the certificate resides.""" 1a

457 

458 TLS_PRIVATE_KEY_PATH: str | os.PathLike[str] | None = None 1a

459 """Path where the private key resides.""" 1a

460 

461 

462def app_settings_constructor(data_dir: Path, production: bool, env_file: Path, env_encoding="utf-8") -> AppSettings: 1a

463 """ 

464 app_settings_constructor is a factory function that returns an AppSettings object. It is used to inject the 

465 required dependencies into the AppSettings object and nested child objects. AppSettings should not be instantiated 

466 directly, but rather through this factory function. 

467 """ 

468 secret_settings = { 1a

469 "SECRET": determine_secrets(data_dir, ".secret", production), 

470 "SESSION_SECRET": determine_secrets(data_dir, ".session_secret", production), 

471 } 

472 app_settings = AppSettings( 1a

473 _env_file=env_file, # type: ignore 

474 _env_file_encoding=env_encoding, # type: ignore 

475 # `get_secrets_dir` must be called here rather than within `AppSettings` 

476 # to avoid a circular import. 

477 _secrets_dir=get_secrets_dir(), # type: ignore 

478 **secret_settings, 

479 ) 

480 

481 app_settings.DB_PROVIDER = db_provider_factory( 1a

482 app_settings.DB_ENGINE or "sqlite", 

483 data_dir, 

484 env_file=env_file, 

485 env_encoding=env_encoding, 

486 ) 

487 

488 return app_settings 1a