Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/alembic_commands.py: 79%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import warnings 1a

2from functools import wraps 1a

3from pathlib import Path 1a

4from threading import Lock 1a

5from typing import TYPE_CHECKING, Any, Callable, Optional, Union 1a

6 

7from sqlalchemy.exc import SAWarning 1a

8from typing_extensions import ParamSpec, TypeVar 1a

9 

10import prefect.server.database 1a

11 

12if TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true1a

13 from alembic.config import Config 

14 

15 

16P = ParamSpec("P") 1a

17R = TypeVar("R", infer_variance=True) 1a

18 

19ALEMBIC_LOCK = Lock() 1a

20 

21 

22def with_alembic_lock(fn: Callable[P, R]) -> Callable[P, R]: 1a

23 """ 

24 Decorator that prevents alembic commands from running concurrently. 

25 This is necessary because alembic uses a global configuration object 

26 that is not thread-safe. 

27 

28 This issue occurred in https://github.com/PrefectHQ/prefect-dask/pull/50, where 

29 dask threads were simultaneously performing alembic upgrades, and causing 

30 cryptic `KeyError: 'config'` when `del globals_[attr_name]`. 

31 """ 

32 

33 @wraps(fn) 1a

34 def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 1a

35 with ALEMBIC_LOCK: 1bc

36 return fn(*args, **kwargs) 1bc

37 

38 return wrapper 1a

39 

40 

41def alembic_config() -> "Config": 1a

42 from alembic.config import Config 1b

43 

44 alembic_dir = Path(prefect.server.database.__file__).parent 1b

45 if not alembic_dir.joinpath("alembic.ini").exists(): 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true1b

46 raise ValueError(f"Couldn't find alembic.ini at {alembic_dir}/alembic.ini") 

47 

48 alembic_cfg = Config(alembic_dir / "alembic.ini") 1b

49 

50 return alembic_cfg 1b

51 

52 

53@with_alembic_lock 1a

54def alembic_upgrade(revision: str = "head", dry_run: bool = False) -> None: 1a

55 """ 

56 Run alembic upgrades on Prefect REST API database 

57 

58 Args: 

59 revision: The revision passed to `alembic downgrade`. Defaults to 'head', upgrading all revisions. 

60 dry_run: Show what migrations would be made without applying them. Will emit sql statements to stdout. 

61 """ 

62 # lazy import for performance 

63 import alembic.command 1b

64 

65 # don't display reflection warnings that pop up during schema migrations 

66 with warnings.catch_warnings(): 1bc

67 warnings.filterwarnings( 1b

68 "ignore", 

69 message="Skipped unsupported reflection of expression-based index", 

70 category=SAWarning, 

71 ) 

72 alembic.command.upgrade(alembic_config(), revision, sql=dry_run) 1bc

73 

74 

75@with_alembic_lock 1a

76def alembic_downgrade(revision: str = "-1", dry_run: bool = False) -> None: 1a

77 """ 

78 Run alembic downgrades on Prefect REST API database 

79 

80 Args: 

81 revision: The revision passed to `alembic downgrade`. Defaults to 'base', downgrading all revisions. 

82 dry_run: Show what migrations would be made without applying them. Will emit sql statements to stdout. 

83 """ 

84 # lazy import for performance 

85 import alembic.command 

86 

87 alembic.command.downgrade(alembic_config(), revision, sql=dry_run) 

88 

89 

90@with_alembic_lock 1a

91def alembic_revision( 1a

92 message: Optional[str] = None, autogenerate: bool = False, **kwargs: Any 

93) -> None: 

94 """ 

95 Create a new revision file for the database. 

96 

97 Args: 

98 message: string message to apply to the revision. 

99 autogenerate: whether or not to autogenerate the script from the database. 

100 """ 

101 # lazy import for performance 

102 import alembic.command 

103 

104 alembic.command.revision( 

105 alembic_config(), message=message, autogenerate=autogenerate, **kwargs 

106 ) 

107 

108 

109@with_alembic_lock 1a

110def alembic_stamp(revision: Union[str, list[str], tuple[str, ...]]) -> None: 1a

111 """ 

112 Stamp the revision table with the given revision; don't run any migrations 

113 

114 Args: 

115 revision: The revision passed to `alembic stamp`. 

116 """ 

117 # lazy import for performance 

118 import alembic.command 

119 

120 alembic.command.stamp(alembic_config(), revision=revision)