Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/env.py: 57%

59 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1# Originally generated from `alembic init` 

2# https://alembic.sqlalchemy.org/en/latest/tutorial.html#creating-an-environment 

3 

4import contextlib 1a

5from typing import Optional 1a

6 

7import sqlalchemy 1a

8from alembic import context 1a

9from sqlalchemy.ext.asyncio import AsyncEngine 1a

10 

11from prefect.server.database.configurations import SQLITE_BEGIN_MODE 1a

12from prefect.server.database.dependencies import provide_database_interface 1a

13from prefect.server.utilities.database import get_dialect 1a

14from prefect.utilities.asyncutils import run_async_from_worker_thread 1a

15 

16db_interface = provide_database_interface() 1a

17config = context.config 1a

18target_metadata = db_interface.Base.metadata 1a

19dialect = get_dialect(db_interface.database_config.connection_url) 1a

20 

21 

22def include_object( 1a

23 object: sqlalchemy.schema.SchemaItem, 

24 name: str, 

25 type_: str, 

26 reflected: bool, 

27 compare_to: Optional[sqlalchemy.schema.SchemaItem], 

28) -> bool: 

29 """ 

30 Determines whether or not alembic should include an object when autogenerating 

31 database migrations. 

32 

33 Args: 

34 object: a sqlalchemy.schema.SchemaItem object such as a 

35 sqlalchemy.schema.Table, sqlalchemy.schema.Column, 

36 sqlalchemy.schema.Index sqlalchemy.schema.UniqueConstraint, or 

37 sqlalchemy.schema.ForeignKeyConstraint object. 

38 name: the name of the object. This is typically available via object.name. 

39 type: a string describing the type of object; currently "table", "column", 

40 "index", "unique_constraint", or "foreign_key_constraint" 

41 reflected: True if the given object was produced based on table reflection, 

42 False if it's from a local .MetaData object. 

43 compare_to: the object being compared against, if available, else None. 

44 

45 Returns: 

46 bool: whether or not the specified object should be included in autogenerated 

47 migration code. 

48 """ 

49 

50 # because of the dynamic inheritance pattern used by the Prefect database, 

51 # it is difficult to get alembic to resolve references to indexes on inherited models 

52 # 

53 # to keep autogenerated migration code clean, we ignore the following indexes: 

54 # * functional indexes (ending in 'desc', 'asc'), if an index with the same name already exists 

55 # * trigram indexes that already exist 

56 # * case_insensitive indexes that already exist 

57 # * indexes that don't yet exist but have .ddl_if(dialect=...) metadata that doesn't match 

58 # the current dialect. 

59 if type_ == "index": 

60 if not reflected: 

61 if name.endswith(("asc", "desc")): 

62 return compare_to is None or object.name != compare_to.name 

63 if (ddl_if := object._ddl_if) is not None and ddl_if.dialect is not None: 

64 desired: set[str] = ( 

65 {ddl_if.dialect} 

66 if isinstance(ddl_if.dialect, str) 

67 else set(ddl_if.dialect) 

68 ) 

69 return dialect.name in desired 

70 

71 else: # reflected 

72 if name.startswith("gin") or name.endswith("case_insensitive"): 

73 return False 

74 

75 # SQLite doesn't have an enum type, so reflection always comes back with 

76 # a VARCHAR column, which doesn't match. Skip columns where the type 

77 # doesn't match 

78 if ( 

79 dialect.name == "sqlite" 

80 and type_ == "column" 

81 and object.type.__visit_name__ == "enum" 

82 and compare_to is not None 

83 ): 

84 return compare_to.type.__visit_name__ == "enum" 

85 

86 return True 

87 

88 

89def dry_run_migrations() -> None: 1a

90 """ 

91 Perform a dry run of migrations. 

92 

93 This will create the sql statements without actually running them against the 

94 database and output them to stdout. 

95 """ 

96 url = db_interface.database_config.connection_url 

97 context.script.version_locations = [db_interface.orm.versions_dir] 

98 

99 context.configure( 

100 url=url, 

101 target_metadata=target_metadata, 

102 literal_binds=True, 

103 include_schemas=True, 

104 include_object=include_object, 

105 dialect_opts={"paramstyle": "named"}, 

106 # Only use batch statements by default on sqlite 

107 # 

108 # The SQLite database presents a challenge to migration 

109 # tools in that it has almost no support for the ALTER statement 

110 # which relational schema migrations rely upon. 

111 # Migration tools are instead expected to produce copies of SQLite tables 

112 # that correspond to the new structure, transfer the data from the existing 

113 # table to the new one, then drop the old table. 

114 # 

115 # see https://alembic.sqlalchemy.org/en/latest/batch.html#batch-migrations 

116 render_as_batch=dialect.name == "sqlite", 

117 # Each migration is its own transaction 

118 transaction_per_migration=True, 

119 template_args={"dialect": dialect.name}, 

120 ) 

121 with context.begin_transaction(): 

122 context.run_migrations() 

123 

124 

125def do_run_migrations(connection: AsyncEngine) -> None: 1a

126 """ 

127 Run Alembic migrations using the connection. 

128 

129 Args: 

130 connection: a database engine. 

131 """ 

132 context.configure( 1b

133 connection=connection, 

134 target_metadata=target_metadata, 

135 include_schemas=True, 

136 include_object=include_object, 

137 # Only use batch statements by default on sqlite 

138 # 

139 # The SQLite database presents a challenge to migration 

140 # tools in that it has almost no support for the ALTER statement 

141 # which relational schema migrations rely upon. 

142 # Migration tools are instead expected to produce copies of SQLite tables 

143 # that correspond to the new structure, transfer the data from the existing 

144 # table to the new one, then drop the old table. 

145 # 

146 # see https://alembic.sqlalchemy.org/en/latest/batch.html#batch-migrations 

147 render_as_batch=dialect.name == "sqlite", 

148 # Each migration is its own transaction 

149 transaction_per_migration=True, 

150 template_args={"dialect": dialect.name}, 

151 ) 

152 

153 # We override SQLAlchemy's handling of BEGIN on SQLite and Alembic bypasses our 

154 # typical transaction context manager so we set the mode manually here 

155 token = SQLITE_BEGIN_MODE.set("IMMEDIATE") 1b

156 try: 1b

157 with disable_sqlite_foreign_keys(context): 1b

158 with context.begin_transaction(): 1b

159 context.run_migrations() 1b

160 finally: 

161 SQLITE_BEGIN_MODE.reset(token) 1b

162 

163 

164@contextlib.contextmanager 1a

165def disable_sqlite_foreign_keys(context): 1a

166 """ 

167 Disable foreign key constraints on sqlite. 

168 """ 

169 if dialect.name == "sqlite": 169 ↛ 174line 169 didn't jump to line 174 because the condition on line 169 was always true1b

170 context.execute("COMMIT") 1b

171 context.execute("PRAGMA foreign_keys=OFF") 1b

172 context.execute("BEGIN IMMEDIATE") 1b

173 

174 yield 1b

175 

176 if dialect.name == "sqlite": 176 ↛ exitline 176 didn't return from function 'disable_sqlite_foreign_keys' because the condition on line 176 was always true1b

177 context.execute("END") 1b

178 context.execute("PRAGMA foreign_keys=ON") 1b

179 context.execute("BEGIN IMMEDIATE") 1b

180 

181 

182async def apply_migrations() -> None: 1a

183 """ 

184 Apply migrations to the database. 

185 """ 

186 engine = await db_interface.engine() 1a

187 context.script.version_locations = [db_interface.orm.versions_dir] 1a

188 

189 async with engine.connect() as connection: 1ab

190 await connection.run_sync(do_run_migrations) 1b

191 

192 

193if context.is_offline_mode(): 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true1a

194 dry_run_migrations() 

195else: 

196 # Running `apply_migrations` via `asyncio.run` causes flakes in the tests 

197 # like: `cache lookup failed for type 338396`. Using `run_async_from_worker_thread` 

198 # does not cause this issue, but it is not clear why. The current working theory is 

199 # that running `apply_migrations` in another thread gives the migrations enough 

200 # isolation to avoid caching issues. 

201 run_async_from_worker_thread(apply_migrations) 1ab