Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/alembic_commands.py: 79%
44 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import warnings 1a
2from functools import wraps 1a
3from pathlib import Path 1a
4from threading import Lock 1a
5from typing import TYPE_CHECKING, Any, Callable, Optional, Union 1a
7from sqlalchemy.exc import SAWarning 1a
8from typing_extensions import ParamSpec, TypeVar 1a
10import prefect.server.database 1a
12if TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true1a
13 from alembic.config import Config
16P = ParamSpec("P") 1a
17R = TypeVar("R", infer_variance=True) 1a
19ALEMBIC_LOCK = Lock() 1a
22def with_alembic_lock(fn: Callable[P, R]) -> Callable[P, R]: 1a
23 """
24 Decorator that prevents alembic commands from running concurrently.
25 This is necessary because alembic uses a global configuration object
26 that is not thread-safe.
28 This issue occurred in https://github.com/PrefectHQ/prefect-dask/pull/50, where
29 dask threads were simultaneously performing alembic upgrades, and causing
30 cryptic `KeyError: 'config'` when `del globals_[attr_name]`.
31 """
33 @wraps(fn) 1a
34 def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 1a
35 with ALEMBIC_LOCK: 1bc
36 return fn(*args, **kwargs) 1bc
38 return wrapper 1a
41def alembic_config() -> "Config": 1a
42 from alembic.config import Config 1b
44 alembic_dir = Path(prefect.server.database.__file__).parent 1b
45 if not alembic_dir.joinpath("alembic.ini").exists(): 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true1b
46 raise ValueError(f"Couldn't find alembic.ini at {alembic_dir}/alembic.ini")
48 alembic_cfg = Config(alembic_dir / "alembic.ini") 1b
50 return alembic_cfg 1b
53@with_alembic_lock 1a
54def alembic_upgrade(revision: str = "head", dry_run: bool = False) -> None: 1a
55 """
56 Run alembic upgrades on Prefect REST API database
58 Args:
59 revision: The revision passed to `alembic downgrade`. Defaults to 'head', upgrading all revisions.
60 dry_run: Show what migrations would be made without applying them. Will emit sql statements to stdout.
61 """
62 # lazy import for performance
63 import alembic.command 1b
65 # don't display reflection warnings that pop up during schema migrations
66 with warnings.catch_warnings(): 1bc
67 warnings.filterwarnings( 1b
68 "ignore",
69 message="Skipped unsupported reflection of expression-based index",
70 category=SAWarning,
71 )
72 alembic.command.upgrade(alembic_config(), revision, sql=dry_run) 1bc
75@with_alembic_lock 1a
76def alembic_downgrade(revision: str = "-1", dry_run: bool = False) -> None: 1a
77 """
78 Run alembic downgrades on Prefect REST API database
80 Args:
81 revision: The revision passed to `alembic downgrade`. Defaults to 'base', downgrading all revisions.
82 dry_run: Show what migrations would be made without applying them. Will emit sql statements to stdout.
83 """
84 # lazy import for performance
85 import alembic.command
87 alembic.command.downgrade(alembic_config(), revision, sql=dry_run)
90@with_alembic_lock 1a
91def alembic_revision( 1a
92 message: Optional[str] = None, autogenerate: bool = False, **kwargs: Any
93) -> None:
94 """
95 Create a new revision file for the database.
97 Args:
98 message: string message to apply to the revision.
99 autogenerate: whether or not to autogenerate the script from the database.
100 """
101 # lazy import for performance
102 import alembic.command
104 alembic.command.revision(
105 alembic_config(), message=message, autogenerate=autogenerate, **kwargs
106 )
109@with_alembic_lock 1a
110def alembic_stamp(revision: Union[str, list[str], tuple[str, ...]]) -> None: 1a
111 """
112 Stamp the revision table with the given revision; don't run any migrations
114 Args:
115 revision: The revision passed to `alembic stamp`.
116 """
117 # lazy import for performance
118 import alembic.command
120 alembic.command.stamp(alembic_config(), revision=revision)