Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/interface.py: 95%

165 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from collections.abc import Hashable 1a

2from contextlib import asynccontextmanager 1a

3from typing import TYPE_CHECKING, Any 1a

4 

5import sqlalchemy as sa 1a

6from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession 1a

7from typing_extensions import TypeAlias 1a

8 

9from prefect.server.database import orm_models 1a

10from prefect.server.database.alembic_commands import alembic_downgrade, alembic_upgrade 1a

11from prefect.server.database.configurations import BaseDatabaseConfiguration 1a

12from prefect.server.utilities.database import get_dialect 1a

13from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a

14 

15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a

16 from prefect.server.database.query_components import BaseQueryComponents 

17 

18_UniqueKey: TypeAlias = tuple[Hashable, ...] 1a

19 

20 

21class DBSingleton(type): 1a

22 """Ensures that only one database interface is created per unique key""" 

23 

24 _instances: dict[tuple[str, _UniqueKey, _UniqueKey, _UniqueKey], "DBSingleton"] = ( 1a

25 dict() 

26 ) 

27 

28 def __call__( 1a

29 cls, 

30 *args: Any, 

31 database_config: BaseDatabaseConfiguration, 

32 query_components: "BaseQueryComponents", 

33 orm: orm_models.BaseORMConfiguration, 

34 **kwargs: Any, 

35 ) -> "DBSingleton": 

36 instance_key = ( 1alehjbidkgfc

37 cls.__name__, 

38 database_config.unique_key(), 

39 query_components.unique_key(), 

40 orm.unique_key(), 

41 ) 

42 try: 1alehjbidkgfc

43 instance = cls._instances[instance_key] 1alehjbidkgfc

44 except KeyError: 1ah

45 instance = cls._instances[instance_key] = super().__call__( 1ah

46 *args, 

47 database_config=database_config, 

48 query_components=query_components, 

49 orm=orm, 

50 **kwargs, 

51 ) 

52 return instance 1alehjbidkgfc

53 

54 

55class PrefectDBInterface(metaclass=DBSingleton): 1a

56 """ 

57 An interface for backend-specific SqlAlchemy actions and ORM models. 

58 

59 The REST API can be configured to run against different databases in order maintain 

60 performance at different scales. This interface integrates database- and dialect- 

61 specific configuration into a unified interface that the orchestration engine runs 

62 against. 

63 """ 

64 

65 def __init__( 1a

66 self, 

67 database_config: BaseDatabaseConfiguration, 

68 query_components: "BaseQueryComponents", 

69 orm: orm_models.BaseORMConfiguration, 

70 ): 

71 self.database_config = database_config 1a

72 self.queries = query_components 1a

73 self.orm = orm 1a

74 

75 async def create_db(self) -> None: 1ab

76 """Create the database""" 

77 await self.run_migrations_upgrade() 1ae

78 

79 async def drop_db(self) -> None: 1a

80 """Drop the database""" 

81 await self.run_migrations_downgrade(revision="base") 1b

82 

83 async def run_migrations_upgrade(self) -> None: 1a

84 """Run all upgrade migrations""" 

85 await run_sync_in_worker_thread(alembic_upgrade) 1ae

86 

87 async def run_migrations_downgrade(self, revision: str = "-1") -> None: 1a

88 """Run all downgrade migrations""" 

89 await run_sync_in_worker_thread(alembic_downgrade, revision=revision) 1b

90 

91 async def is_db_connectable(self) -> bool: 1a

92 """ 

93 Returns boolean indicating if the database is connectable. 

94 This method is used to determine if the server is ready to accept requests. 

95 """ 

96 engine = await self.engine() 1bc

97 try: 1bc

98 async with engine.connect(): 1bc

99 return True 1bc

100 except Exception: 

101 return False 

102 

103 async def engine(self) -> AsyncEngine: 1a

104 """ 

105 Provides a SqlAlchemy engine against a specific database. 

106 """ 

107 engine = await self.database_config.engine() 1lehjbidgfc

108 

109 return engine 1lehjbidgfc

110 

111 async def session(self) -> AsyncSession: 1a

112 """ 

113 Provides a SQLAlchemy session. 

114 """ 

115 engine = await self.engine() 1ehjbidgfc

116 return await self.database_config.session(engine) 1ehjbidgfc

117 

118 @asynccontextmanager 1a

119 async def session_context( 1a

120 self, begin_transaction: bool = False, with_for_update: bool = False 

121 ): 

122 """ 

123 Provides a SQLAlchemy session and a context manager for opening/closing 

124 the underlying connection. 

125 

126 Args: 

127 begin_transaction: if True, the context manager will begin a SQL transaction. 

128 Exiting the context manager will COMMIT or ROLLBACK any changes. 

129 """ 

130 session = await self.session() 1ehjbidgfc

131 async with session: 1ehjbmidkgfc

132 if begin_transaction: 1ehjbidgfc

133 async with self.database_config.begin_transaction( 1ehbmidkgfc

134 session, with_for_update=with_for_update 

135 ): 

136 yield session 1ehbmidkgfc

137 else: 

138 yield session 1ehjbidgfc

139 

140 @property 1a

141 def dialect(self) -> type[sa.engine.Dialect]: 1a

142 return get_dialect(self.database_config.connection_url) 1bc

143 

144 @property 1a

145 def Base(self) -> type[orm_models.Base]: 1a

146 """Base class for orm models""" 

147 return orm_models.Base 1l

148 

149 @property 1a

150 def Flow(self) -> type[orm_models.Flow]: 1a

151 """A flow orm model""" 

152 return orm_models.Flow 1bdfc

153 

154 @property 1a

155 def FlowRun(self) -> type[orm_models.FlowRun]: 1a

156 """A flow run orm model""" 

157 return orm_models.FlowRun 1ebdfc

158 

159 @property 1a

160 def FlowRunState(self) -> type[orm_models.FlowRunState]: 1a

161 """A flow run state orm model""" 

162 return orm_models.FlowRunState 1bc

163 

164 @property 1a

165 def TaskRun(self) -> type[orm_models.TaskRun]: 1a

166 """A task run orm model""" 

167 return orm_models.TaskRun 1bdfc

168 

169 @property 1a

170 def TaskRunState(self) -> type[orm_models.TaskRunState]: 1a

171 """A task run state orm model""" 

172 return orm_models.TaskRunState 1bdc

173 

174 @property 1a

175 def Artifact(self) -> type[orm_models.Artifact]: 1a

176 """An artifact orm model""" 

177 return orm_models.Artifact 1bdfc

178 

179 @property 1a

180 def ArtifactCollection(self) -> type[orm_models.ArtifactCollection]: 1a

181 """An artifact collection orm model""" 

182 return orm_models.ArtifactCollection 1bdc

183 

184 @property 1a

185 def TaskRunStateCache(self) -> type[orm_models.TaskRunStateCache]: 1a

186 """A task run state cache orm model""" 

187 return orm_models.TaskRunStateCache 

188 

189 @property 1a

190 def Deployment(self) -> type[orm_models.Deployment]: 1a

191 """A deployment orm model""" 

192 return orm_models.Deployment 1ebmdgfc

193 

194 @property 1a

195 def DeploymentSchedule(self) -> type[orm_models.DeploymentSchedule]: 1a

196 """A deployment schedule orm model""" 

197 return orm_models.DeploymentSchedule 1ebdc

198 

199 @property 1a

200 def SavedSearch(self) -> type[orm_models.SavedSearch]: 1a

201 """A saved search orm model""" 

202 return orm_models.SavedSearch 1bdfc

203 

204 @property 1a

205 def WorkPool(self) -> type[orm_models.WorkPool]: 1a

206 """A work pool orm model""" 

207 return orm_models.WorkPool 1hbdc

208 

209 @property 1a

210 def Worker(self) -> type[orm_models.Worker]: 1a

211 """A worker process orm model""" 

212 return orm_models.Worker 1ehbc

213 

214 @property 1a

215 def Log(self) -> type[orm_models.Log]: 1a

216 """A log orm model""" 

217 return orm_models.Log 1bc

218 

219 @property 1a

220 def ConcurrencyLimit(self) -> type[orm_models.ConcurrencyLimit]: 1a

221 """A concurrency model""" 

222 return orm_models.ConcurrencyLimit 1bdc

223 

224 @property 1a

225 def ConcurrencyLimitV2(self) -> type[orm_models.ConcurrencyLimitV2]: 1a

226 """A v2 concurrency model""" 

227 return orm_models.ConcurrencyLimitV2 1bdkc

228 

229 @property 1a

230 def CsrfToken(self) -> type[orm_models.CsrfToken]: 1a

231 """A csrf token model""" 

232 return orm_models.CsrfToken 

233 

234 @property 1a

235 def WorkQueue(self) -> type[orm_models.WorkQueue]: 1a

236 """A work queue model""" 

237 return orm_models.WorkQueue 1bdc

238 

239 @property 1a

240 def Agent(self) -> type[orm_models.Agent]: 1a

241 """An agent model""" 

242 return orm_models.Agent 

243 

244 @property 1a

245 def BlockType(self) -> type[orm_models.BlockType]: 1a

246 """A block type model""" 

247 return orm_models.BlockType 1ebdc

248 

249 @property 1a

250 def BlockSchema(self) -> type[orm_models.BlockSchema]: 1a

251 """A block schema model""" 

252 return orm_models.BlockSchema 1ebidgfc

253 

254 @property 1a

255 def BlockSchemaReference(self) -> type[orm_models.BlockSchemaReference]: 1a

256 """A block schema reference model""" 

257 return orm_models.BlockSchemaReference 1ebidgfc

258 

259 @property 1a

260 def BlockDocument(self) -> type[orm_models.BlockDocument]: 1a

261 """A block document model""" 

262 return orm_models.BlockDocument 1bidgc

263 

264 @property 1a

265 def BlockDocumentReference(self) -> type[orm_models.BlockDocumentReference]: 1a

266 """A block document reference model""" 

267 return orm_models.BlockDocumentReference 1bidgc

268 

269 @property 1a

270 def Configuration(self) -> type[orm_models.Configuration]: 1a

271 """An configuration model""" 

272 return orm_models.Configuration 1eh

273 

274 @property 1a

275 def Variable(self) -> type[orm_models.Variable]: 1a

276 """A variable model""" 

277 return orm_models.Variable 1bdc

278 

279 @property 1a

280 def FlowRunInput(self) -> type[orm_models.FlowRunInput]: 1a

281 """A flow run input model""" 

282 return orm_models.FlowRunInput 1b

283 

284 @property 1a

285 def Automation(self) -> type[orm_models.Automation]: 1a

286 """An automation model""" 

287 return orm_models.Automation 1ejbdc

288 

289 @property 1a

290 def AutomationBucket(self) -> type[orm_models.AutomationBucket]: 1a

291 """An automation bucket model""" 

292 return orm_models.AutomationBucket 1hbidgfc

293 

294 @property 1a

295 def AutomationRelatedResource(self) -> type[orm_models.AutomationRelatedResource]: 1a

296 """An automation related resource model""" 

297 return orm_models.AutomationRelatedResource 1bdc

298 

299 @property 1a

300 def CompositeTriggerChildFiring( 1a

301 self, 

302 ) -> type[orm_models.CompositeTriggerChildFiring]: 

303 """A model capturing a composite trigger's child firing""" 

304 return orm_models.CompositeTriggerChildFiring 1bc

305 

306 @property 1a

307 def AutomationEventFollower(self) -> type[orm_models.AutomationEventFollower]: 1a

308 """A model capturing one event following another event""" 

309 return orm_models.AutomationEventFollower 

310 

311 @property 1a

312 def Event(self) -> type[orm_models.Event]: 1a

313 """An event model""" 

314 return orm_models.Event 1bdgc

315 

316 @property 1a

317 def EventResource(self) -> type[orm_models.EventResource]: 1a

318 """An event resource model""" 

319 return orm_models.EventResource 1bdgc