Coverage for /usr/local/lib/python3.12/site-packages/prefect/settings/models/server/database.py: 79%
71 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import warnings 1a
2from typing import Any, ClassVar, Optional 1a
3from urllib.parse import quote_plus 1a
5from pydantic import ( 1a
6 AliasChoices,
7 AliasPath,
8 Field,
9 SecretStr,
10 model_validator,
11)
12from pydantic_settings import SettingsConfigDict 1a
13from typing_extensions import Literal, Self 1a
15from prefect.settings.base import PrefectBaseSettings, build_settings_config 1a
18class SQLAlchemyTLSSettings(PrefectBaseSettings): 1a
19 """
20 Settings for controlling SQLAlchemy mTLS context when
21 using a PostgreSQL database.
22 """
24 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a
25 ("server", "database", "sqlalchemy", "connect_args", "tls")
26 )
28 enabled: bool = Field( 1a
29 default=False,
30 description="Controls whether connected to mTLS enabled PostgreSQL when using a PostgreSQL database with the Prefect backend.",
31 )
33 ca_file: Optional[str] = Field( 1a
34 default=None,
35 description="This configuration settings option specifies the path to PostgreSQL client certificate authority file.",
36 )
38 cert_file: Optional[str] = Field( 1a
39 default=None,
40 description="This configuration settings option specifies the path to PostgreSQL client certificate file.",
41 )
43 key_file: Optional[str] = Field( 1a
44 default=None,
45 description="This configuration settings option specifies the path to PostgreSQL client key file.",
46 )
48 check_hostname: bool = Field( 1a
49 default=True,
50 description="This configuration settings option specifies whether to verify PostgreSQL server hostname.",
51 )
54class SQLAlchemyConnectArgsSettings(PrefectBaseSettings): 1a
55 """
56 Settings for controlling SQLAlchemy connection behavior; note that these settings only take effect when
57 using a PostgreSQL database.
58 """
60 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a
61 ("server", "database", "sqlalchemy", "connect_args")
62 )
64 application_name: Optional[str] = Field( 1a
65 default=None,
66 description="Controls the application_name field for connections opened from the connection pool when using a PostgreSQL database with the Prefect backend.",
67 )
69 statement_cache_size: Optional[int] = Field( 1a
70 default=None,
71 description="Controls statement cache size for PostgreSQL connections. Setting this to 0 is required when using PgBouncer in transaction mode. Defaults to None.",
72 )
74 prepared_statement_cache_size: Optional[int] = Field( 1a
75 default=None,
76 description=(
77 "Controls the size of the statement cache for PostgreSQL connections. "
78 "When set to 0, statement caching is disabled. Defaults to None to use "
79 "SQLAlchemy's default behavior."
80 ),
81 )
83 tls: SQLAlchemyTLSSettings = Field( 1a
84 default_factory=SQLAlchemyTLSSettings,
85 description="Settings for controlling SQLAlchemy mTLS behavior",
86 )
89class SQLAlchemySettings(PrefectBaseSettings): 1a
90 """
91 Settings for controlling SQLAlchemy behavior; note that these settings only take effect when
92 using a PostgreSQL database.
93 """
95 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a
96 ("server", "database", "sqlalchemy")
97 )
99 connect_args: SQLAlchemyConnectArgsSettings = Field( 1a
100 default_factory=SQLAlchemyConnectArgsSettings,
101 description="Settings for controlling SQLAlchemy connection behavior",
102 )
104 pool_size: int = Field( 1a
105 default=5,
106 description="Controls connection pool size of database connection pools from the Prefect backend.",
107 validation_alias=AliasChoices(
108 AliasPath("pool_size"),
109 "prefect_server_database_sqlalchemy_pool_size",
110 "prefect_sqlalchemy_pool_size",
111 ),
112 )
114 pool_recycle: int = Field( 1a
115 default=3600,
116 description="This setting causes the pool to recycle connections after the given number of seconds has passed; set it to -1 to avoid recycling entirely.",
117 )
119 pool_timeout: Optional[float] = Field( 1a
120 default=30.0,
121 description="Number of seconds to wait before giving up on getting a connection from the pool. Defaults to 30 seconds.",
122 )
124 max_overflow: int = Field( 1a
125 default=10,
126 description="Controls maximum overflow of the connection pool. To prevent overflow, set to -1.",
127 validation_alias=AliasChoices(
128 AliasPath("max_overflow"),
129 "prefect_server_database_sqlalchemy_max_overflow",
130 "prefect_sqlalchemy_max_overflow",
131 ),
132 )
135class ServerDatabaseSettings(PrefectBaseSettings): 1a
136 """
137 Settings for controlling server database behavior
138 """
140 model_config: ClassVar[SettingsConfigDict] = build_settings_config( 1a
141 ("server", "database")
142 )
144 sqlalchemy: SQLAlchemySettings = Field( 1a
145 default_factory=SQLAlchemySettings,
146 description="Settings for controlling SQLAlchemy behavior",
147 )
149 connection_url: Optional[SecretStr] = Field( 1a
150 default=None,
151 description="""
152 A database connection URL in a SQLAlchemy-compatible
153 format. Prefect currently supports SQLite and Postgres. Note that all
154 Prefect database engines must use an async driver - for SQLite, use
155 `sqlite+aiosqlite` and for Postgres use `postgresql+asyncpg`.
157 SQLite in-memory databases can be used by providing the url
158 `sqlite+aiosqlite:///file::memory:?cache=shared&uri=true&check_same_thread=false`,
159 which will allow the database to be accessed by multiple threads. Note
160 that in-memory databases can not be accessed from multiple processes and
161 should only be used for simple tests.
162 """,
163 validation_alias=AliasChoices(
164 AliasPath("connection_url"),
165 "prefect_server_database_connection_url",
166 "prefect_api_database_connection_url",
167 ),
168 )
170 driver: Optional[Literal["postgresql+asyncpg", "sqlite+aiosqlite"]] = Field( 1a
171 default=None,
172 description=(
173 "The database driver to use when connecting to the database. "
174 "If not set, the driver will be inferred from the connection URL."
175 ),
176 validation_alias=AliasChoices(
177 AliasPath("driver"),
178 "prefect_server_database_driver",
179 "prefect_api_database_driver",
180 ),
181 )
183 host: Optional[str] = Field( 1a
184 default=None,
185 description="The database server host.",
186 validation_alias=AliasChoices(
187 AliasPath("host"),
188 "prefect_server_database_host",
189 "prefect_api_database_host",
190 ),
191 )
193 port: Optional[int] = Field( 1a
194 default=None,
195 description="The database server port.",
196 validation_alias=AliasChoices(
197 AliasPath("port"),
198 "prefect_server_database_port",
199 "prefect_api_database_port",
200 ),
201 )
203 user: Optional[str] = Field( 1a
204 default=None,
205 description="The user to use when connecting to the database.",
206 validation_alias=AliasChoices(
207 AliasPath("user"),
208 "prefect_server_database_user",
209 "prefect_api_database_user",
210 ),
211 )
213 name: Optional[str] = Field( 1a
214 default=None,
215 description="The name of the Prefect database on the remote server, or the path to the database file for SQLite.",
216 validation_alias=AliasChoices(
217 AliasPath("name"),
218 "prefect_server_database_name",
219 "prefect_api_database_name",
220 ),
221 )
223 password: Optional[SecretStr] = Field( 1a
224 default=None,
225 description="The password to use when connecting to the database. Should be kept secret.",
226 validation_alias=AliasChoices(
227 AliasPath("password"),
228 "prefect_server_database_password",
229 "prefect_api_database_password",
230 ),
231 )
233 echo: bool = Field( 1a
234 default=False,
235 description="If `True`, SQLAlchemy will log all SQL issued to the database. Defaults to `False`.",
236 validation_alias=AliasChoices(
237 AliasPath("echo"),
238 "prefect_server_database_echo",
239 "prefect_api_database_echo",
240 ),
241 )
243 migrate_on_start: bool = Field( 1a
244 default=True,
245 description="If `True`, the database will be migrated on application startup.",
246 validation_alias=AliasChoices(
247 AliasPath("migrate_on_start"),
248 "prefect_server_database_migrate_on_start",
249 "prefect_api_database_migrate_on_start",
250 ),
251 )
253 timeout: Optional[float] = Field( 1a
254 default=10.0,
255 description="A statement timeout, in seconds, applied to all database interactions made by the Prefect backend. Defaults to 10 seconds.",
256 validation_alias=AliasChoices(
257 AliasPath("timeout"),
258 "prefect_server_database_timeout",
259 "prefect_api_database_timeout",
260 ),
261 )
263 connection_timeout: Optional[float] = Field( 1a
264 default=5.0,
265 description="A connection timeout, in seconds, applied to database connections. Defaults to `5`.",
266 validation_alias=AliasChoices(
267 AliasPath("connection_timeout"),
268 "prefect_server_database_connection_timeout",
269 "prefect_api_database_connection_timeout",
270 ),
271 )
273 # handle deprecated fields
275 def __getattribute__(self, name: str) -> Any: 1a
276 if name in ["sqlalchemy_pool_size", "sqlalchemy_max_overflow"]: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true1abcdefghijkl
277 warnings.warn(
278 f"Setting {name} has been moved to the `sqlalchemy` settings group.",
279 DeprecationWarning,
280 )
281 field_name = name.replace("sqlalchemy_", "")
282 return getattr(super().__getattribute__("sqlalchemy"), field_name)
283 return super().__getattribute__(name) 1abcdefghijkl
285 # validators
287 @model_validator(mode="before") 1a
288 @classmethod 1a
289 def set_deprecated_sqlalchemy_settings_on_child_model_and_warn( 1a
290 cls, values: dict[str, Any]
291 ) -> dict[str, Any]:
292 """
293 Set deprecated settings on the child model.
294 """
295 # Initialize sqlalchemy settings if not present
296 if "sqlalchemy" not in values: 1a
297 values["sqlalchemy"] = SQLAlchemySettings() 1a
299 if "sqlalchemy_pool_size" in values: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true1a
300 warnings.warn(
301 "`sqlalchemy_pool_size` has been moved to the `sqlalchemy` settings group as `pool_size`.",
302 DeprecationWarning,
303 )
304 if "pool_size" not in values["sqlalchemy"].model_fields_set:
305 values["sqlalchemy"].pool_size = values["sqlalchemy_pool_size"]
307 if "sqlalchemy_max_overflow" in values: 307 ↛ 308line 307 didn't jump to line 308 because the condition on line 307 was never true1a
308 warnings.warn(
309 "`sqlalchemy_max_overflow` has been moved to the `sqlalchemy` settings group as `max_overflow`.",
310 DeprecationWarning,
311 )
312 if "max_overflow" not in values["sqlalchemy"].model_fields_set:
313 values["sqlalchemy"].max_overflow = values["sqlalchemy_max_overflow"]
315 return values 1a
317 @model_validator(mode="after") 1a
318 def emit_warnings(self) -> Self: # noqa: F821 1a
319 """More post-hoc validation of settings, including warnings for misconfigurations."""
320 warn_on_database_password_value_without_usage(self) 1a
321 return self 1a
324def warn_on_database_password_value_without_usage( 1a
325 settings: ServerDatabaseSettings,
326) -> None:
327 """
328 Validator for settings warning if the database password is set but not used.
329 """
330 db_password = ( 1a
331 settings.password.get_secret_value()
332 if isinstance(settings.password, SecretStr)
333 else None
334 )
335 api_db_connection_url = ( 1a
336 settings.connection_url.get_secret_value()
337 if isinstance(settings.connection_url, SecretStr)
338 else settings.connection_url
339 )
341 if ( 341 ↛ 349line 341 didn't jump to line 349 because the condition on line 341 was never true
342 db_password
343 and api_db_connection_url is not None
344 and "PREFECT_API_DATABASE_PASSWORD" not in api_db_connection_url
345 and "PREFECT_SERVER_DATABASE_PASSWORD" not in api_db_connection_url
346 and db_password not in api_db_connection_url
347 and quote_plus(db_password) not in api_db_connection_url
348 ):
349 warnings.warn(
350 "PREFECT_SERVER_DATABASE_PASSWORD is set but not included in the "
351 "PREFECT_SERVER_DATABASE_CONNECTION_URL. "
352 "The provided password will be ignored."
353 )
354 return None 1a