Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/env.py: 57%
59 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1# Originally generated from `alembic init`
2# https://alembic.sqlalchemy.org/en/latest/tutorial.html#creating-an-environment
4import contextlib 1a
5from typing import Optional 1a
7import sqlalchemy 1a
8from alembic import context 1a
9from sqlalchemy.ext.asyncio import AsyncEngine 1a
11from prefect.server.database.configurations import SQLITE_BEGIN_MODE 1a
12from prefect.server.database.dependencies import provide_database_interface 1a
13from prefect.server.utilities.database import get_dialect 1a
14from prefect.utilities.asyncutils import run_async_from_worker_thread 1a
16db_interface = provide_database_interface() 1a
17config = context.config 1a
18target_metadata = db_interface.Base.metadata 1a
19dialect = get_dialect(db_interface.database_config.connection_url) 1a
22def include_object( 1a
23 object: sqlalchemy.schema.SchemaItem,
24 name: str,
25 type_: str,
26 reflected: bool,
27 compare_to: Optional[sqlalchemy.schema.SchemaItem],
28) -> bool:
29 """
30 Determines whether or not alembic should include an object when autogenerating
31 database migrations.
33 Args:
34 object: a sqlalchemy.schema.SchemaItem object such as a
35 sqlalchemy.schema.Table, sqlalchemy.schema.Column,
36 sqlalchemy.schema.Index sqlalchemy.schema.UniqueConstraint, or
37 sqlalchemy.schema.ForeignKeyConstraint object.
38 name: the name of the object. This is typically available via object.name.
39 type: a string describing the type of object; currently "table", "column",
40 "index", "unique_constraint", or "foreign_key_constraint"
41 reflected: True if the given object was produced based on table reflection,
42 False if it's from a local .MetaData object.
43 compare_to: the object being compared against, if available, else None.
45 Returns:
46 bool: whether or not the specified object should be included in autogenerated
47 migration code.
48 """
50 # because of the dynamic inheritance pattern used by the Prefect database,
51 # it is difficult to get alembic to resolve references to indexes on inherited models
52 #
53 # to keep autogenerated migration code clean, we ignore the following indexes:
54 # * functional indexes (ending in 'desc', 'asc'), if an index with the same name already exists
55 # * trigram indexes that already exist
56 # * case_insensitive indexes that already exist
57 # * indexes that don't yet exist but have .ddl_if(dialect=...) metadata that doesn't match
58 # the current dialect.
59 if type_ == "index":
60 if not reflected:
61 if name.endswith(("asc", "desc")):
62 return compare_to is None or object.name != compare_to.name
63 if (ddl_if := object._ddl_if) is not None and ddl_if.dialect is not None:
64 desired: set[str] = (
65 {ddl_if.dialect}
66 if isinstance(ddl_if.dialect, str)
67 else set(ddl_if.dialect)
68 )
69 return dialect.name in desired
71 else: # reflected
72 if name.startswith("gin") or name.endswith("case_insensitive"):
73 return False
75 # SQLite doesn't have an enum type, so reflection always comes back with
76 # a VARCHAR column, which doesn't match. Skip columns where the type
77 # doesn't match
78 if (
79 dialect.name == "sqlite"
80 and type_ == "column"
81 and object.type.__visit_name__ == "enum"
82 and compare_to is not None
83 ):
84 return compare_to.type.__visit_name__ == "enum"
86 return True
89def dry_run_migrations() -> None: 1a
90 """
91 Perform a dry run of migrations.
93 This will create the sql statements without actually running them against the
94 database and output them to stdout.
95 """
96 url = db_interface.database_config.connection_url
97 context.script.version_locations = [db_interface.orm.versions_dir]
99 context.configure(
100 url=url,
101 target_metadata=target_metadata,
102 literal_binds=True,
103 include_schemas=True,
104 include_object=include_object,
105 dialect_opts={"paramstyle": "named"},
106 # Only use batch statements by default on sqlite
107 #
108 # The SQLite database presents a challenge to migration
109 # tools in that it has almost no support for the ALTER statement
110 # which relational schema migrations rely upon.
111 # Migration tools are instead expected to produce copies of SQLite tables
112 # that correspond to the new structure, transfer the data from the existing
113 # table to the new one, then drop the old table.
114 #
115 # see https://alembic.sqlalchemy.org/en/latest/batch.html#batch-migrations
116 render_as_batch=dialect.name == "sqlite",
117 # Each migration is its own transaction
118 transaction_per_migration=True,
119 template_args={"dialect": dialect.name},
120 )
121 with context.begin_transaction():
122 context.run_migrations()
125def do_run_migrations(connection: AsyncEngine) -> None: 1a
126 """
127 Run Alembic migrations using the connection.
129 Args:
130 connection: a database engine.
131 """
132 context.configure( 1b
133 connection=connection,
134 target_metadata=target_metadata,
135 include_schemas=True,
136 include_object=include_object,
137 # Only use batch statements by default on sqlite
138 #
139 # The SQLite database presents a challenge to migration
140 # tools in that it has almost no support for the ALTER statement
141 # which relational schema migrations rely upon.
142 # Migration tools are instead expected to produce copies of SQLite tables
143 # that correspond to the new structure, transfer the data from the existing
144 # table to the new one, then drop the old table.
145 #
146 # see https://alembic.sqlalchemy.org/en/latest/batch.html#batch-migrations
147 render_as_batch=dialect.name == "sqlite",
148 # Each migration is its own transaction
149 transaction_per_migration=True,
150 template_args={"dialect": dialect.name},
151 )
153 # We override SQLAlchemy's handling of BEGIN on SQLite and Alembic bypasses our
154 # typical transaction context manager so we set the mode manually here
155 token = SQLITE_BEGIN_MODE.set("IMMEDIATE") 1b
156 try: 1b
157 with disable_sqlite_foreign_keys(context): 1b
158 with context.begin_transaction(): 1b
159 context.run_migrations() 1b
160 finally:
161 SQLITE_BEGIN_MODE.reset(token) 1b
164@contextlib.contextmanager 1a
165def disable_sqlite_foreign_keys(context): 1a
166 """
167 Disable foreign key constraints on sqlite.
168 """
169 if dialect.name == "sqlite": 169 ↛ 174line 169 didn't jump to line 174 because the condition on line 169 was always true1b
170 context.execute("COMMIT") 1b
171 context.execute("PRAGMA foreign_keys=OFF") 1b
172 context.execute("BEGIN IMMEDIATE") 1b
174 yield 1b
176 if dialect.name == "sqlite": 176 ↛ exitline 176 didn't return from function 'disable_sqlite_foreign_keys' because the condition on line 176 was always true1b
177 context.execute("END") 1b
178 context.execute("PRAGMA foreign_keys=ON") 1b
179 context.execute("BEGIN IMMEDIATE") 1b
182async def apply_migrations() -> None: 1a
183 """
184 Apply migrations to the database.
185 """
186 engine = await db_interface.engine() 1a
187 context.script.version_locations = [db_interface.orm.versions_dir] 1a
189 async with engine.connect() as connection: 1ab
190 await connection.run_sync(do_run_migrations) 1b
193if context.is_offline_mode(): 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true1a
194 dry_run_migrations()
195else:
196 # Running `apply_migrations` via `asyncio.run` causes flakes in the tests
197 # like: `cache lookup failed for type 338396`. Using `run_async_from_worker_thread`
198 # does not cause this issue, but it is not clear why. The current working theory is
199 # that running `apply_migrations` in another thread gives the migrations enough
200 # isolation to avoid caching issues.
201 run_async_from_worker_thread(apply_migrations) 1ab