Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/interface.py: 80%
165 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from collections.abc import Hashable 1a
2from contextlib import asynccontextmanager 1a
3from typing import TYPE_CHECKING, Any 1a
5import sqlalchemy as sa 1a
6from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession 1a
7from typing_extensions import TypeAlias 1a
9from prefect.server.database import orm_models 1a
10from prefect.server.database.alembic_commands import alembic_downgrade, alembic_upgrade 1a
11from prefect.server.database.configurations import BaseDatabaseConfiguration 1a
12from prefect.server.utilities.database import get_dialect 1a
13from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a
15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a
16 from prefect.server.database.query_components import BaseQueryComponents
18_UniqueKey: TypeAlias = tuple[Hashable, ...] 1a
21class DBSingleton(type): 1a
22 """Ensures that only one database interface is created per unique key"""
24 _instances: dict[tuple[str, _UniqueKey, _UniqueKey, _UniqueKey], "DBSingleton"] = ( 1a
25 dict()
26 )
28 def __call__( 1a
29 cls,
30 *args: Any,
31 database_config: BaseDatabaseConfiguration,
32 query_components: "BaseQueryComponents",
33 orm: orm_models.BaseORMConfiguration,
34 **kwargs: Any,
35 ) -> "DBSingleton":
36 instance_key = ( 1adbce
37 cls.__name__,
38 database_config.unique_key(),
39 query_components.unique_key(),
40 orm.unique_key(),
41 )
42 try: 1adbce
43 instance = cls._instances[instance_key] 1adbce
44 except KeyError: 1ac
45 instance = cls._instances[instance_key] = super().__call__( 1ac
46 *args,
47 database_config=database_config,
48 query_components=query_components,
49 orm=orm,
50 **kwargs,
51 )
52 return instance 1adbce
55class PrefectDBInterface(metaclass=DBSingleton): 1a
56 """
57 An interface for backend-specific SqlAlchemy actions and ORM models.
59 The REST API can be configured to run against different databases in order maintain
60 performance at different scales. This interface integrates database- and dialect-
61 specific configuration into a unified interface that the orchestration engine runs
62 against.
63 """
65 def __init__( 1a
66 self,
67 database_config: BaseDatabaseConfiguration,
68 query_components: "BaseQueryComponents",
69 orm: orm_models.BaseORMConfiguration,
70 ):
71 self.database_config = database_config 1a
72 self.queries = query_components 1a
73 self.orm = orm 1a
75 async def create_db(self) -> None: 1a
76 """Create the database"""
77 await self.run_migrations_upgrade() 1ab
79 async def drop_db(self) -> None: 1a
80 """Drop the database"""
81 await self.run_migrations_downgrade(revision="base")
83 async def run_migrations_upgrade(self) -> None: 1a
84 """Run all upgrade migrations"""
85 await run_sync_in_worker_thread(alembic_upgrade) 1ab
87 async def run_migrations_downgrade(self, revision: str = "-1") -> None: 1a
88 """Run all downgrade migrations"""
89 await run_sync_in_worker_thread(alembic_downgrade, revision=revision)
91 async def is_db_connectable(self) -> bool: 1a
92 """
93 Returns boolean indicating if the database is connectable.
94 This method is used to determine if the server is ready to accept requests.
95 """
96 engine = await self.engine()
97 try:
98 async with engine.connect():
99 return True
100 except Exception:
101 return False
103 async def engine(self) -> AsyncEngine: 1a
104 """
105 Provides a SqlAlchemy engine against a specific database.
106 """
107 engine = await self.database_config.engine() 1dbc
109 return engine 1dbc
111 async def session(self) -> AsyncSession: 1a
112 """
113 Provides a SQLAlchemy session.
114 """
115 engine = await self.engine() 1bc
116 return await self.database_config.session(engine) 1bc
118 @asynccontextmanager 1a
119 async def session_context( 1a
120 self, begin_transaction: bool = False, with_for_update: bool = False
121 ):
122 """
123 Provides a SQLAlchemy session and a context manager for opening/closing
124 the underlying connection.
126 Args:
127 begin_transaction: if True, the context manager will begin a SQL transaction.
128 Exiting the context manager will COMMIT or ROLLBACK any changes.
129 """
130 session = await self.session() 1bc
131 async with session: 1bce
132 if begin_transaction: 1bc
133 async with self.database_config.begin_transaction( 1bce
134 session, with_for_update=with_for_update
135 ):
136 yield session 1bc
137 else:
138 yield session 1bc
140 @property 1a
141 def dialect(self) -> type[sa.engine.Dialect]: 1a
142 return get_dialect(self.database_config.connection_url)
144 @property 1a
145 def Base(self) -> type[orm_models.Base]: 1a
146 """Base class for orm models"""
147 return orm_models.Base 1d
149 @property 1a
150 def Flow(self) -> type[orm_models.Flow]: 1a
151 """A flow orm model"""
152 return orm_models.Flow
154 @property 1a
155 def FlowRun(self) -> type[orm_models.FlowRun]: 1a
156 """A flow run orm model"""
157 return orm_models.FlowRun 1b
159 @property 1a
160 def FlowRunState(self) -> type[orm_models.FlowRunState]: 1a
161 """A flow run state orm model"""
162 return orm_models.FlowRunState
164 @property 1a
165 def TaskRun(self) -> type[orm_models.TaskRun]: 1a
166 """A task run orm model"""
167 return orm_models.TaskRun
169 @property 1a
170 def TaskRunState(self) -> type[orm_models.TaskRunState]: 1a
171 """A task run state orm model"""
172 return orm_models.TaskRunState
174 @property 1a
175 def Artifact(self) -> type[orm_models.Artifact]: 1a
176 """An artifact orm model"""
177 return orm_models.Artifact
179 @property 1a
180 def ArtifactCollection(self) -> type[orm_models.ArtifactCollection]: 1a
181 """An artifact collection orm model"""
182 return orm_models.ArtifactCollection
184 @property 1a
185 def TaskRunStateCache(self) -> type[orm_models.TaskRunStateCache]: 1a
186 """A task run state cache orm model"""
187 return orm_models.TaskRunStateCache
189 @property 1a
190 def Deployment(self) -> type[orm_models.Deployment]: 1a
191 """A deployment orm model"""
192 return orm_models.Deployment 1bc
194 @property 1a
195 def DeploymentSchedule(self) -> type[orm_models.DeploymentSchedule]: 1a
196 """A deployment schedule orm model"""
197 return orm_models.DeploymentSchedule 1b
199 @property 1a
200 def SavedSearch(self) -> type[orm_models.SavedSearch]: 1a
201 """A saved search orm model"""
202 return orm_models.SavedSearch
204 @property 1a
205 def WorkPool(self) -> type[orm_models.WorkPool]: 1a
206 """A work pool orm model"""
207 return orm_models.WorkPool 1c
209 @property 1a
210 def Worker(self) -> type[orm_models.Worker]: 1a
211 """A worker process orm model"""
212 return orm_models.Worker 1bc
214 @property 1a
215 def Log(self) -> type[orm_models.Log]: 1a
216 """A log orm model"""
217 return orm_models.Log
219 @property 1a
220 def ConcurrencyLimit(self) -> type[orm_models.ConcurrencyLimit]: 1a
221 """A concurrency model"""
222 return orm_models.ConcurrencyLimit
224 @property 1a
225 def ConcurrencyLimitV2(self) -> type[orm_models.ConcurrencyLimitV2]: 1a
226 """A v2 concurrency model"""
227 return orm_models.ConcurrencyLimitV2
229 @property 1a
230 def CsrfToken(self) -> type[orm_models.CsrfToken]: 1a
231 """A csrf token model"""
232 return orm_models.CsrfToken
234 @property 1a
235 def WorkQueue(self) -> type[orm_models.WorkQueue]: 1a
236 """A work queue model"""
237 return orm_models.WorkQueue 1c
239 @property 1a
240 def Agent(self) -> type[orm_models.Agent]: 1a
241 """An agent model"""
242 return orm_models.Agent
244 @property 1a
245 def BlockType(self) -> type[orm_models.BlockType]: 1a
246 """A block type model"""
247 return orm_models.BlockType 1b
249 @property 1a
250 def BlockSchema(self) -> type[orm_models.BlockSchema]: 1a
251 """A block schema model"""
252 return orm_models.BlockSchema 1b
254 @property 1a
255 def BlockSchemaReference(self) -> type[orm_models.BlockSchemaReference]: 1a
256 """A block schema reference model"""
257 return orm_models.BlockSchemaReference 1b
259 @property 1a
260 def BlockDocument(self) -> type[orm_models.BlockDocument]: 1a
261 """A block document model"""
262 return orm_models.BlockDocument
264 @property 1a
265 def BlockDocumentReference(self) -> type[orm_models.BlockDocumentReference]: 1a
266 """A block document reference model"""
267 return orm_models.BlockDocumentReference
269 @property 1a
270 def Configuration(self) -> type[orm_models.Configuration]: 1a
271 """An configuration model"""
272 return orm_models.Configuration 1bc
274 @property 1a
275 def Variable(self) -> type[orm_models.Variable]: 1a
276 """A variable model"""
277 return orm_models.Variable
279 @property 1a
280 def FlowRunInput(self) -> type[orm_models.FlowRunInput]: 1a
281 """A flow run input model"""
282 return orm_models.FlowRunInput
284 @property 1a
285 def Automation(self) -> type[orm_models.Automation]: 1a
286 """An automation model"""
287 return orm_models.Automation 1b
289 @property 1a
290 def AutomationBucket(self) -> type[orm_models.AutomationBucket]: 1a
291 """An automation bucket model"""
292 return orm_models.AutomationBucket 1c
294 @property 1a
295 def AutomationRelatedResource(self) -> type[orm_models.AutomationRelatedResource]: 1a
296 """An automation related resource model"""
297 return orm_models.AutomationRelatedResource
299 @property 1a
300 def CompositeTriggerChildFiring( 1a
301 self,
302 ) -> type[orm_models.CompositeTriggerChildFiring]:
303 """A model capturing a composite trigger's child firing"""
304 return orm_models.CompositeTriggerChildFiring
306 @property 1a
307 def AutomationEventFollower(self) -> type[orm_models.AutomationEventFollower]: 1a
308 """A model capturing one event following another event"""
309 return orm_models.AutomationEventFollower
311 @property 1a
312 def Event(self) -> type[orm_models.Event]: 1a
313 """An event model"""
314 return orm_models.Event
316 @property 1a
317 def EventResource(self) -> type[orm_models.EventResource]: 1a
318 """An event resource model"""
319 return orm_models.EventResource