Coverage for /usr/local/lib/python3.12/site-packages/prefect/settings/models/server/database.py: 79%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import warnings 1a

2from typing import Any, ClassVar, Optional 1a

3from urllib.parse import quote_plus 1a

4 

5from pydantic import ( 1a

6 AliasChoices, 

7 AliasPath, 

8 Field, 

9 SecretStr, 

10 model_validator, 

11) 

12from pydantic_settings import SettingsConfigDict 1a

13from typing_extensions import Literal, Self 1a

14 

15from prefect.settings.base import PrefectBaseSettings, build_settings_config 1a

16 

17 

18class SQLAlchemyTLSSettings(PrefectBaseSettings): 1a

19 """ 

20 Settings for controlling SQLAlchemy mTLS context when 

21 using a PostgreSQL database. 

22 """ 

23 

24 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

25 ("server", "database", "sqlalchemy", "connect_args", "tls") 

26 ) 

27 

28 enabled: bool = Field( 1a

29 default=False, 

30 description="Controls whether connected to mTLS enabled PostgreSQL when using a PostgreSQL database with the Prefect backend.", 

31 ) 

32 

33 ca_file: Optional[str] = Field( 1a

34 default=None, 

35 description="This configuration settings option specifies the path to PostgreSQL client certificate authority file.", 

36 ) 

37 

38 cert_file: Optional[str] = Field( 1a

39 default=None, 

40 description="This configuration settings option specifies the path to PostgreSQL client certificate file.", 

41 ) 

42 

43 key_file: Optional[str] = Field( 1a

44 default=None, 

45 description="This configuration settings option specifies the path to PostgreSQL client key file.", 

46 ) 

47 

48 check_hostname: bool = Field( 1a

49 default=True, 

50 description="This configuration settings option specifies whether to verify PostgreSQL server hostname.", 

51 ) 

52 

53 

54class SQLAlchemyConnectArgsSettings(PrefectBaseSettings): 1a

55 """ 

56 Settings for controlling SQLAlchemy connection behavior; note that these settings only take effect when 

57 using a PostgreSQL database. 

58 """ 

59 

60 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

61 ("server", "database", "sqlalchemy", "connect_args") 

62 ) 

63 

64 application_name: Optional[str] = Field( 1a

65 default=None, 

66 description="Controls the application_name field for connections opened from the connection pool when using a PostgreSQL database with the Prefect backend.", 

67 ) 

68 

69 statement_cache_size: Optional[int] = Field( 1a

70 default=None, 

71 description="Controls statement cache size for PostgreSQL connections. Setting this to 0 is required when using PgBouncer in transaction mode. Defaults to None.", 

72 ) 

73 

74 prepared_statement_cache_size: Optional[int] = Field( 1a

75 default=None, 

76 description=( 

77 "Controls the size of the statement cache for PostgreSQL connections. " 

78 "When set to 0, statement caching is disabled. Defaults to None to use " 

79 "SQLAlchemy's default behavior." 

80 ), 

81 ) 

82 

83 tls: SQLAlchemyTLSSettings = Field( 1a

84 default_factory=SQLAlchemyTLSSettings, 

85 description="Settings for controlling SQLAlchemy mTLS behavior", 

86 ) 

87 

88 

89class SQLAlchemySettings(PrefectBaseSettings): 1a

90 """ 

91 Settings for controlling SQLAlchemy behavior; note that these settings only take effect when 

92 using a PostgreSQL database. 

93 """ 

94 

95 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

96 ("server", "database", "sqlalchemy") 

97 ) 

98 

99 connect_args: SQLAlchemyConnectArgsSettings = Field( 1a

100 default_factory=SQLAlchemyConnectArgsSettings, 

101 description="Settings for controlling SQLAlchemy connection behavior", 

102 ) 

103 

104 pool_size: int = Field( 1a

105 default=5, 

106 description="Controls connection pool size of database connection pools from the Prefect backend.", 

107 validation_alias=AliasChoices( 

108 AliasPath("pool_size"), 

109 "prefect_server_database_sqlalchemy_pool_size", 

110 "prefect_sqlalchemy_pool_size", 

111 ), 

112 ) 

113 

114 pool_recycle: int = Field( 1a

115 default=3600, 

116 description="This setting causes the pool to recycle connections after the given number of seconds has passed; set it to -1 to avoid recycling entirely.", 

117 ) 

118 

119 pool_timeout: Optional[float] = Field( 1a

120 default=30.0, 

121 description="Number of seconds to wait before giving up on getting a connection from the pool. Defaults to 30 seconds.", 

122 ) 

123 

124 max_overflow: int = Field( 1a

125 default=10, 

126 description="Controls maximum overflow of the connection pool. To prevent overflow, set to -1.", 

127 validation_alias=AliasChoices( 

128 AliasPath("max_overflow"), 

129 "prefect_server_database_sqlalchemy_max_overflow", 

130 "prefect_sqlalchemy_max_overflow", 

131 ), 

132 ) 

133 

134 

135class ServerDatabaseSettings(PrefectBaseSettings): 1a

136 """ 

137 Settings for controlling server database behavior 

138 """ 

139 

140 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a

141 ("server", "database") 

142 ) 

143 

144 sqlalchemy: SQLAlchemySettings = Field( 1a

145 default_factory=SQLAlchemySettings, 

146 description="Settings for controlling SQLAlchemy behavior", 

147 ) 

148 

149 connection_url: Optional[SecretStr] = Field( 1a

150 default=None, 

151 description=""" 

152 A database connection URL in a SQLAlchemy-compatible 

153 format. Prefect currently supports SQLite and Postgres. Note that all 

154 Prefect database engines must use an async driver - for SQLite, use 

155 `sqlite+aiosqlite` and for Postgres use `postgresql+asyncpg`. 

156 

157 SQLite in-memory databases can be used by providing the url 

158 `sqlite+aiosqlite:///file::memory:?cache=shared&uri=true&check_same_thread=false`, 

159 which will allow the database to be accessed by multiple threads. Note 

160 that in-memory databases can not be accessed from multiple processes and 

161 should only be used for simple tests. 

162 """, 

163 validation_alias=AliasChoices( 

164 AliasPath("connection_url"), 

165 "prefect_server_database_connection_url", 

166 "prefect_api_database_connection_url", 

167 ), 

168 ) 

169 

170 driver: Optional[Literal["postgresql+asyncpg", "sqlite+aiosqlite"]] = Field( 1a

171 default=None, 

172 description=( 

173 "The database driver to use when connecting to the database. " 

174 "If not set, the driver will be inferred from the connection URL." 

175 ), 

176 validation_alias=AliasChoices( 

177 AliasPath("driver"), 

178 "prefect_server_database_driver", 

179 "prefect_api_database_driver", 

180 ), 

181 ) 

182 

183 host: Optional[str] = Field( 1a

184 default=None, 

185 description="The database server host.", 

186 validation_alias=AliasChoices( 

187 AliasPath("host"), 

188 "prefect_server_database_host", 

189 "prefect_api_database_host", 

190 ), 

191 ) 

192 

193 port: Optional[int] = Field( 1a

194 default=None, 

195 description="The database server port.", 

196 validation_alias=AliasChoices( 

197 AliasPath("port"), 

198 "prefect_server_database_port", 

199 "prefect_api_database_port", 

200 ), 

201 ) 

202 

203 user: Optional[str] = Field( 1a

204 default=None, 

205 description="The user to use when connecting to the database.", 

206 validation_alias=AliasChoices( 

207 AliasPath("user"), 

208 "prefect_server_database_user", 

209 "prefect_api_database_user", 

210 ), 

211 ) 

212 

213 name: Optional[str] = Field( 1a

214 default=None, 

215 description="The name of the Prefect database on the remote server, or the path to the database file for SQLite.", 

216 validation_alias=AliasChoices( 

217 AliasPath("name"), 

218 "prefect_server_database_name", 

219 "prefect_api_database_name", 

220 ), 

221 ) 

222 

223 password: Optional[SecretStr] = Field( 1a

224 default=None, 

225 description="The password to use when connecting to the database. Should be kept secret.", 

226 validation_alias=AliasChoices( 

227 AliasPath("password"), 

228 "prefect_server_database_password", 

229 "prefect_api_database_password", 

230 ), 

231 ) 

232 

233 echo: bool = Field( 1a

234 default=False, 

235 description="If `True`, SQLAlchemy will log all SQL issued to the database. Defaults to `False`.", 

236 validation_alias=AliasChoices( 

237 AliasPath("echo"), 

238 "prefect_server_database_echo", 

239 "prefect_api_database_echo", 

240 ), 

241 ) 

242 

243 migrate_on_start: bool = Field( 1a

244 default=True, 

245 description="If `True`, the database will be migrated on application startup.", 

246 validation_alias=AliasChoices( 

247 AliasPath("migrate_on_start"), 

248 "prefect_server_database_migrate_on_start", 

249 "prefect_api_database_migrate_on_start", 

250 ), 

251 ) 

252 

253 timeout: Optional[float] = Field( 1a

254 default=10.0, 

255 description="A statement timeout, in seconds, applied to all database interactions made by the Prefect backend. Defaults to 10 seconds.", 

256 validation_alias=AliasChoices( 

257 AliasPath("timeout"), 

258 "prefect_server_database_timeout", 

259 "prefect_api_database_timeout", 

260 ), 

261 ) 

262 

263 connection_timeout: Optional[float] = Field( 1a

264 default=5.0, 

265 description="A connection timeout, in seconds, applied to database connections. Defaults to `5`.", 

266 validation_alias=AliasChoices( 

267 AliasPath("connection_timeout"), 

268 "prefect_server_database_connection_timeout", 

269 "prefect_api_database_connection_timeout", 

270 ), 

271 ) 

272 

273 # handle deprecated fields 

274 

275 def __getattribute__(self, name: str) -> Any: 1a

276 if name in ["sqlalchemy_pool_size", "sqlalchemy_max_overflow"]: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true1abcde

277 warnings.warn( 

278 f"Setting {name} has been moved to the `sqlalchemy` settings group.", 

279 DeprecationWarning, 

280 ) 

281 field_name = name.replace("sqlalchemy_", "") 

282 return getattr(super().__getattribute__("sqlalchemy"), field_name) 

283 return super().__getattribute__(name) 1abcde

284 

285 # validators 

286 

287 @model_validator(mode="before") 1a

288 @classmethod 1a

289 def set_deprecated_sqlalchemy_settings_on_child_model_and_warn( 1a

290 cls, values: dict[str, Any] 

291 ) -> dict[str, Any]: 

292 """ 

293 Set deprecated settings on the child model. 

294 """ 

295 # Initialize sqlalchemy settings if not present 

296 if "sqlalchemy" not in values: 1a

297 values["sqlalchemy"] = SQLAlchemySettings() 1a

298 

299 if "sqlalchemy_pool_size" in values: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true1a

300 warnings.warn( 

301 "`sqlalchemy_pool_size` has been moved to the `sqlalchemy` settings group as `pool_size`.", 

302 DeprecationWarning, 

303 ) 

304 if "pool_size" not in values["sqlalchemy"].model_fields_set: 

305 values["sqlalchemy"].pool_size = values["sqlalchemy_pool_size"] 

306 

307 if "sqlalchemy_max_overflow" in values: 307 ↛ 308line 307 didn't jump to line 308 because the condition on line 307 was never true1a

308 warnings.warn( 

309 "`sqlalchemy_max_overflow` has been moved to the `sqlalchemy` settings group as `max_overflow`.", 

310 DeprecationWarning, 

311 ) 

312 if "max_overflow" not in values["sqlalchemy"].model_fields_set: 

313 values["sqlalchemy"].max_overflow = values["sqlalchemy_max_overflow"] 

314 

315 return values 1a

316 

317 @model_validator(mode="after") 1a

318 def emit_warnings(self) -> Self: # noqa: F821 1a

319 """More post-hoc validation of settings, including warnings for misconfigurations.""" 

320 warn_on_database_password_value_without_usage(self) 1a

321 return self 1a

322 

323 

324def warn_on_database_password_value_without_usage( 1a

325 settings: ServerDatabaseSettings, 

326) -> None: 

327 """ 

328 Validator for settings warning if the database password is set but not used. 

329 """ 

330 db_password = ( 1a

331 settings.password.get_secret_value() 

332 if isinstance(settings.password, SecretStr) 

333 else None 

334 ) 

335 api_db_connection_url = ( 1a

336 settings.connection_url.get_secret_value() 

337 if isinstance(settings.connection_url, SecretStr) 

338 else settings.connection_url 

339 ) 

340 

341 if ( 341 ↛ 349line 341 didn't jump to line 349 because the condition on line 341 was never true

342 db_password 

343 and api_db_connection_url is not None 

344 and "PREFECT_API_DATABASE_PASSWORD" not in api_db_connection_url 

345 and "PREFECT_SERVER_DATABASE_PASSWORD" not in api_db_connection_url 

346 and db_password not in api_db_connection_url 

347 and quote_plus(db_password) not in api_db_connection_url 

348 ): 

349 warnings.warn( 

350 "PREFECT_SERVER_DATABASE_PASSWORD is set but not included in the " 

351 "PREFECT_SERVER_DATABASE_CONNECTION_URL. " 

352 "The provided password will be ignored." 

353 ) 

354 return None 1a