Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/orm_models.py: 86%

550 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import datetime 1a

2import uuid 1a

3from abc import ABC, abstractmethod 1a

4from collections.abc import Hashable, Iterable 1a

5from pathlib import Path 1a

6from typing import TYPE_CHECKING, Any, ClassVar, Optional, Union 1a

7 

8import sqlalchemy as sa 1a

9from sqlalchemy import FetchedValue 1a

10from sqlalchemy.dialects import postgresql 1a

11from sqlalchemy.ext.asyncio import AsyncSession 1a

12from sqlalchemy.ext.hybrid import hybrid_property 1a

13from sqlalchemy.orm import ( 1a

14 DeclarativeBase, 

15 Mapped, 

16 declared_attr, 

17 mapped_column, 

18 registry, 

19 relationship, 

20 synonym, 

21) 

22from sqlalchemy.orm.decl_api import registry as RegistryType 1a

23from sqlalchemy.sql import roles 1a

24from sqlalchemy.sql.functions import coalesce 1a

25 

26import prefect.server.schemas as schemas 1a

27from prefect.server.events.actions import ServerActionTypes 1a

28from prefect.server.events.schemas.automations import ( 1a

29 AutomationSort, 

30 Firing, 

31 ServerTriggerTypes, 

32) 

33from prefect.server.events.schemas.events import ReceivedEvent 1a

34from prefect.server.schemas.statuses import ( 1a

35 DeploymentStatus, 

36 WorkerStatus, 

37 WorkPoolStatus, 

38 WorkQueueStatus, 

39) 

40from prefect.server.utilities.database import ( 1a

41 CAMEL_TO_SNAKE, 

42 JSON, 

43 UUID, 

44 GenerateUUID, 

45 Pydantic, 

46 Timestamp, 

47) 

48from prefect.server.utilities.encryption import decrypt_fernet, encrypt_fernet 1a

49from prefect.types._datetime import DateTime, now 1a

50from prefect.utilities.names import generate_slug 1a

51 

52# for 'plain JSON' columns, use the postgresql variant (which comes with an 

53# extra operator) and fall back to the generic JSON variant for SQLite 

54sa_JSON: postgresql.JSON = postgresql.JSON().with_variant(sa.JSON(), "sqlite") 1a

55 

56 

57class Base(DeclarativeBase): 1a

58 """ 

59 Base SQLAlchemy model that automatically infers the table name 

60 and provides ID, created, and updated columns 

61 """ 

62 

63 registry: ClassVar[RegistryType] = registry( 1a

64 metadata=sa.schema.MetaData( 

65 # define naming conventions for our Base class to use 

66 # sqlalchemy will use the following templated strings 

67 # to generate the names of indices, constraints, and keys 

68 # 

69 # we offset the table name with two underscores (__) to 

70 # help differentiate, for example, between "flow_run.state_type" 

71 # and "flow_run_state.type". 

72 # 

73 # more information on this templating and available 

74 # customization can be found here 

75 # https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.MetaData 

76 # 

77 # this also allows us to avoid having to specify names explicitly 

78 # when using sa.ForeignKey.use_alter = True 

79 # https://docs.sqlalchemy.org/en/14/core/constraints.html 

80 naming_convention={ 

81 "ix": "ix_%(table_name)s__%(column_0_N_name)s", 

82 "uq": "uq_%(table_name)s__%(column_0_N_name)s", 

83 "ck": "ck_%(table_name)s__%(constraint_name)s", 

84 "fk": "fk_%(table_name)s__%(column_0_N_name)s__%(referred_table_name)s", 

85 "pk": "pk_%(table_name)s", 

86 } 

87 ), 

88 type_annotation_map={ 

89 uuid.UUID: UUID, 

90 DateTime: Timestamp, 

91 }, 

92 ) 

93 

94 # required in order to access columns with server defaults 

95 # or SQL expression defaults, subsequent to a flush, without 

96 # triggering an expired load 

97 # 

98 # this allows us to load attributes with a server default after 

99 # an INSERT, for example 

100 # 

101 # https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#preventing-implicit-io-when-using-asyncsession 

102 __mapper_args__: dict[str, Any] = {"eager_defaults": True} 1a

103 

104 def __repr__(self) -> str: 1a

105 return f"{self.__class__.__name__}(id={self.id})" 

106 

107 @declared_attr.directive 1a

108 def __tablename__(cls) -> str: 1a

109 """ 

110 By default, turn the model's camel-case class name 

111 into a snake-case table name. Override by providing 

112 an explicit `__tablename__` class property. 

113 """ 

114 return CAMEL_TO_SNAKE.sub("_", cls.__name__).lower() 1a

115 

116 id: Mapped[uuid.UUID] = mapped_column( 1a

117 primary_key=True, 

118 server_default=GenerateUUID(), 

119 default=uuid.uuid4, 

120 ) 

121 

122 created: Mapped[DateTime] = mapped_column( 1a

123 server_default=sa.func.now(), default=lambda: now("UTC") 

124 ) 

125 

126 # onupdate is only called when statements are actually issued 

127 # against the database. until COMMIT is issued, this column 

128 # will not be updated 

129 updated: Mapped[DateTime] = mapped_column( 1a

130 index=True, 

131 server_default=sa.func.now(), 

132 default=lambda: now("UTC"), 

133 onupdate=sa.func.now(), 

134 server_onupdate=FetchedValue(), 

135 ) 

136 

137 

138class Flow(Base): 1a

139 """SQLAlchemy mixin of a flow.""" 

140 

141 name: Mapped[str] 1a

142 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a

143 labels: Mapped[Optional[schemas.core.KeyValueLabels]] = mapped_column(JSON) 1a

144 

145 flow_runs: Mapped[list["FlowRun"]] = relationship( 1a

146 back_populates="flow", lazy="raise" 

147 ) 

148 deployments: Mapped[list["Deployment"]] = relationship( 1a

149 back_populates="flow", lazy="raise" 

150 ) 

151 

152 __table_args__: Any = ( 1a

153 sa.UniqueConstraint("name"), 

154 sa.Index("ix_flow__created", "created"), 

155 sa.Index("trgm_ix_flow_name", "name", postgresql_using="gin").ddl_if( 

156 dialect="postgresql" 

157 ), 

158 ) 

159 

160 

161class FlowRunState(Base): 1a

162 """SQLAlchemy mixin of a flow run state.""" 

163 

164 flow_run_id: Mapped[uuid.UUID] = mapped_column( 1a

165 sa.ForeignKey("flow_run.id", ondelete="cascade") 

166 ) 

167 

168 type: Mapped[schemas.states.StateType] = mapped_column( 1a

169 sa.Enum(schemas.states.StateType, name="state_type"), index=True 

170 ) 

171 timestamp: Mapped[DateTime] = mapped_column( 1a

172 server_default=sa.func.now(), default=lambda: now("UTC") 

173 ) 

174 name: Mapped[str] = mapped_column(index=True) 1a

175 message: Mapped[Optional[str]] 1a

176 state_details: Mapped[schemas.states.StateDetails] = mapped_column( 1a

177 Pydantic(schemas.states.StateDetails), 

178 server_default="{}", 

179 default=schemas.states.StateDetails, 

180 ) 

181 _data: Mapped[Optional[Any]] = mapped_column(JSON, name="data") 1a

182 

183 result_artifact_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

184 sa.ForeignKey("artifact.id", ondelete="SET NULL", use_alter=True), 

185 index=True, 

186 ) 

187 

188 _result_artifact: Mapped[Optional["Artifact"]] = relationship( 1a

189 lazy="selectin", 

190 foreign_keys=[result_artifact_id], 

191 primaryjoin="Artifact.id==FlowRunState.result_artifact_id", 

192 ) 

193 

194 @hybrid_property 1a

195 def data(self) -> Optional[Any]: 1a

196 if self._data: 

197 # ensures backwards compatibility for results stored on state objects 

198 return self._data 

199 if not self.result_artifact_id: 

200 # do not try to load the relationship if there's no artifact id 

201 return None 

202 if TYPE_CHECKING: 

203 assert self._result_artifact is not None 

204 return self._result_artifact.data 

205 

206 flow_run: Mapped["FlowRun"] = relationship(lazy="raise", foreign_keys=[flow_run_id]) 1a

207 

208 def as_state(self) -> schemas.states.State: 1a

209 return schemas.states.State.model_validate(self, from_attributes=True) 

210 

211 @declared_attr.directive 1a

212 @classmethod 1a

213 def __table_args__(cls) -> Iterable[sa.Index]: 1a

214 return ( 1a

215 sa.Index( 

216 "uq_flow_run_state__flow_run_id_timestamp_desc", 

217 cls.flow_run_id, 

218 cls.timestamp.desc(), 

219 unique=True, 

220 ), 

221 ) 

222 

223 

224class TaskRunState(Base): 1a

225 """SQLAlchemy model of a task run state.""" 

226 

227 # this column isn't explicitly indexed because it is included in 

228 # the unique compound index on (task_run_id, timestamp) 

229 task_run_id: Mapped[uuid.UUID] = mapped_column( 1a

230 sa.ForeignKey("task_run.id", ondelete="cascade") 

231 ) 

232 

233 type: Mapped[schemas.states.StateType] = mapped_column( 1a

234 sa.Enum(schemas.states.StateType, name="state_type"), index=True 

235 ) 

236 timestamp: Mapped[DateTime] = mapped_column( 1a

237 server_default=sa.func.now(), default=lambda: now("UTC") 

238 ) 

239 name: Mapped[str] = mapped_column(index=True) 1a

240 message: Mapped[Optional[str]] 1a

241 state_details: Mapped[schemas.states.StateDetails] = mapped_column( 1a

242 Pydantic(schemas.states.StateDetails), 

243 server_default="{}", 

244 default=schemas.states.StateDetails, 

245 ) 

246 _data: Mapped[Optional[Any]] = mapped_column(JSON, name="data") 1a

247 

248 result_artifact_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

249 sa.ForeignKey("artifact.id", ondelete="SET NULL", use_alter=True), index=True 

250 ) 

251 

252 _result_artifact: Mapped[Optional["Artifact"]] = relationship( 1a

253 lazy="selectin", 

254 foreign_keys=[result_artifact_id], 

255 primaryjoin="Artifact.id==TaskRunState.result_artifact_id", 

256 ) 

257 

258 @hybrid_property 1a

259 def data(self) -> Optional[Any]: 1a

260 if self._data: 

261 # ensures backwards compatibility for results stored on state objects 

262 return self._data 

263 if not self.result_artifact_id: 

264 # do not try to load the relationship if there's no artifact id 

265 return None 

266 if TYPE_CHECKING: 

267 assert self._result_artifact is not None 

268 return self._result_artifact.data 

269 

270 task_run: Mapped["TaskRun"] = relationship(lazy="raise", foreign_keys=[task_run_id]) 1a

271 

272 def as_state(self) -> schemas.states.State: 1a

273 return schemas.states.State.model_validate(self, from_attributes=True) 

274 

275 @declared_attr.directive 1a

276 @classmethod 1a

277 def __table_args__(cls) -> Iterable[sa.Index]: 1a

278 return ( 1a

279 sa.Index( 

280 "uq_task_run_state__task_run_id_timestamp_desc", 

281 cls.task_run_id, 

282 cls.timestamp.desc(), 

283 unique=True, 

284 ), 

285 ) 

286 

287 

288class Artifact(Base): 1a

289 """ 

290 SQLAlchemy model of artifacts. 

291 """ 

292 

293 key: Mapped[Optional[str]] = mapped_column(index=True) 1a

294 

295 task_run_id: Mapped[Optional[uuid.UUID]] = mapped_column(index=True) 1a

296 

297 flow_run_id: Mapped[Optional[uuid.UUID]] = mapped_column(index=True) 1a

298 

299 type: Mapped[Optional[str]] 1a

300 data: Mapped[Optional[Any]] = mapped_column(sa_JSON) 1a

301 description: Mapped[Optional[str]] 1a

302 

303 # Suffixed with underscore as attribute name 'metadata' is reserved for the MetaData instance when using a declarative base class. 

304 metadata_: Mapped[Optional[dict[str, str]]] = mapped_column(sa_JSON) 1a

305 

306 @declared_attr.directive 1a

307 @classmethod 1a

308 def __table_args__(cls) -> Iterable[sa.Index]: 1a

309 return ( 1a

310 sa.Index( 

311 "ix_artifact__key", 

312 cls.key, 

313 ), 

314 sa.Index( 

315 "ix_artifact__key_created_desc", 

316 cls.key, 

317 cls.created.desc(), 

318 postgresql_include=[ 

319 "id", 

320 "updated", 

321 "type", 

322 "task_run_id", 

323 "flow_run_id", 

324 ], 

325 ), 

326 ) 

327 

328 

329class ArtifactCollection(Base): 1a

330 key: Mapped[str] 1a

331 

332 latest_id: Mapped[uuid.UUID] 1a

333 

334 task_run_id: Mapped[Optional[uuid.UUID]] 1a

335 

336 flow_run_id: Mapped[Optional[uuid.UUID]] 1a

337 

338 type: Mapped[Optional[str]] 1a

339 data: Mapped[Optional[Any]] = mapped_column(sa_JSON) 1a

340 description: Mapped[Optional[str]] 1a

341 metadata_: Mapped[Optional[dict[str, str]]] = mapped_column(sa_JSON) 1a

342 

343 __table_args__: Any = ( 1a

344 sa.UniqueConstraint("key"), 

345 sa.Index( 

346 "ix_artifact_collection__key_latest_id", 

347 "key", 

348 "latest_id", 

349 ), 

350 ) 

351 

352 

353class TaskRunStateCache(Base): 1a

354 """ 

355 SQLAlchemy model of a task run state cache. 

356 """ 

357 

358 cache_key: Mapped[str] = mapped_column() 1a

359 cache_expiration: Mapped[Optional[DateTime]] 1a

360 task_run_state_id: Mapped[uuid.UUID] 1a

361 

362 @declared_attr.directive 1a

363 @classmethod 1a

364 def __table_args__(cls) -> Iterable[sa.Index]: 1a

365 return ( 1a

366 sa.Index( 

367 "ix_task_run_state_cache__cache_key_created_desc", 

368 cls.cache_key, 

369 cls.created.desc(), 

370 ), 

371 ) 

372 

373 

374class Run(Base): 1a

375 """ 

376 Common columns and logic for FlowRun and TaskRun models 

377 """ 

378 

379 __abstract__ = True 1a

380 

381 name: Mapped[str] = mapped_column(default=lambda: generate_slug(2), index=True) 1a

382 state_type: Mapped[Optional[schemas.states.StateType]] = mapped_column( 1a

383 sa.Enum(schemas.states.StateType, name="state_type") 

384 ) 

385 state_name: Mapped[Optional[str]] 1a

386 state_timestamp: Mapped[Optional[DateTime]] 1a

387 run_count: Mapped[int] = mapped_column(server_default="0", default=0) 1a

388 expected_start_time: Mapped[Optional[DateTime]] 1a

389 next_scheduled_start_time: Mapped[Optional[DateTime]] 1a

390 start_time: Mapped[Optional[DateTime]] 1a

391 end_time: Mapped[Optional[DateTime]] 1a

392 total_run_time: Mapped[datetime.timedelta] = mapped_column( 1a

393 server_default="0", default=datetime.timedelta(0) 

394 ) 

395 

396 @hybrid_property 1a

397 def estimated_run_time(self) -> datetime.timedelta: 1a

398 """Total run time is incremented in the database whenever a RUNNING 

399 state is exited. To give up-to-date estimates, we estimate incremental 

400 run time for any runs currently in a RUNNING state.""" 

401 if self.state_type and self.state_type == schemas.states.StateType.RUNNING: 

402 if TYPE_CHECKING: 

403 assert self.state_timestamp is not None 

404 return self.total_run_time + (now("UTC") - self.state_timestamp) 

405 else: 

406 return self.total_run_time 

407 

408 @estimated_run_time.inplace.expression 1a

409 @classmethod 1a

410 def _estimated_run_time_expression(cls) -> sa.Label[datetime.timedelta]: 1a

411 return ( 

412 sa.select( 

413 sa.case( 

414 ( 

415 cls.state_type == schemas.states.StateType.RUNNING, 

416 sa.func.interval_add( 

417 cls.total_run_time, 

418 sa.func.date_diff(sa.func.now(), cls.state_timestamp), 

419 ), 

420 ), 

421 else_=cls.total_run_time, 

422 ) 

423 ) 

424 # add a correlate statement so this can reuse the `FROM` clause 

425 # of any parent query 

426 .correlate(cls) 

427 .label("estimated_run_time") 

428 ) 

429 

430 @hybrid_property 1a

431 def estimated_start_time_delta(self) -> datetime.timedelta: 1a

432 """The delta to the expected start time (or "lateness") is computed as 

433 the difference between the actual start time and expected start time. To 

434 give up-to-date estimates, we estimate lateness for any runs that don't 

435 have a start time and are not in a final state and were expected to 

436 start already.""" 

437 if ( 

438 self.start_time 

439 and self.expected_start_time is not None 

440 and self.start_time > (self.expected_start_time) 

441 ): 

442 return self.start_time - self.expected_start_time 

443 elif ( 

444 self.start_time is None 

445 and self.expected_start_time 

446 and self.expected_start_time < now("UTC") 

447 and self.state_type not in schemas.states.TERMINAL_STATES 

448 ): 

449 return now("UTC") - self.expected_start_time 

450 else: 

451 return datetime.timedelta(0) 

452 

453 @estimated_start_time_delta.inplace.expression 1a

454 @classmethod 1a

455 def _estimated_start_time_delta_expression( 1a

456 cls, 

457 ) -> sa.SQLColumnExpression[datetime.timedelta]: 

458 return sa.case( 

459 ( 

460 cls.start_time > cls.expected_start_time, 

461 sa.func.date_diff(cls.start_time, cls.expected_start_time), 

462 ), 

463 ( 

464 sa.and_( 

465 cls.start_time.is_(None), 

466 cls.state_type.not_in(schemas.states.TERMINAL_STATES), 

467 cls.expected_start_time < sa.func.now(), 

468 ), 

469 sa.func.date_diff(sa.func.now(), cls.expected_start_time), 

470 ), 

471 else_=datetime.timedelta(0), 

472 ) 

473 

474 

475class FlowRun(Run): 1a

476 """SQLAlchemy model of a flow run.""" 

477 

478 flow_id: Mapped[uuid.UUID] = mapped_column( 1a

479 sa.ForeignKey("flow.id", ondelete="cascade"), index=True 

480 ) 

481 

482 deployment_id: Mapped[Optional[uuid.UUID]] = mapped_column() 1a

483 work_queue_name: Mapped[Optional[str]] = mapped_column(index=True) 1a

484 flow_version: Mapped[Optional[str]] = mapped_column(index=True) 1a

485 deployment_version: Mapped[Optional[str]] = mapped_column(index=True) 1a

486 parameters: Mapped[dict[str, Any]] = mapped_column( 1a

487 JSON, server_default="{}", default=dict 

488 ) 

489 idempotency_key: Mapped[Optional[str]] = mapped_column() 1a

490 context: Mapped[dict[str, Any]] = mapped_column( 1a

491 JSON, server_default="{}", default=dict 

492 ) 

493 empirical_policy: Mapped[schemas.core.FlowRunPolicy] = mapped_column( 1a

494 Pydantic(schemas.core.FlowRunPolicy), 

495 server_default="{}", 

496 default=schemas.core.FlowRunPolicy, 

497 ) 

498 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a

499 labels: Mapped[Optional[schemas.core.KeyValueLabels]] = mapped_column(JSON) 1a

500 

501 created_by: Mapped[Optional[schemas.core.CreatedBy]] = mapped_column( 1a

502 Pydantic(schemas.core.CreatedBy) 

503 ) 

504 

505 infrastructure_pid: Mapped[Optional[str]] 1a

506 job_variables: Mapped[Optional[dict[str, Any]]] = mapped_column( 1a

507 JSON, server_default="{}", default=dict 

508 ) 

509 

510 infrastructure_document_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

511 sa.ForeignKey("block_document.id", ondelete="CASCADE"), index=True 

512 ) 

513 

514 parent_task_run_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

515 sa.ForeignKey("task_run.id", ondelete="SET NULL", use_alter=True), index=True 

516 ) 

517 

518 auto_scheduled: Mapped[bool] = mapped_column(server_default="0", default=False) 1a

519 

520 # TODO remove this foreign key for significant delete performance gains 

521 state_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

522 sa.ForeignKey("flow_run_state.id", ondelete="SET NULL", use_alter=True), 

523 index=True, 

524 ) 

525 

526 work_queue_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

527 sa.ForeignKey("work_queue.id", ondelete="SET NULL"), index=True 

528 ) 

529 

530 # -------------------------- relationships 

531 

532 # current states are eagerly loaded unless otherwise specified 

533 _state: Mapped[Optional["FlowRunState"]] = relationship( 1a

534 lazy="selectin", 

535 foreign_keys=[state_id], 

536 primaryjoin="FlowRunState.id==FlowRun.state_id", 

537 ) 

538 

539 @hybrid_property 1a

540 def state(self) -> Optional[FlowRunState]: 1a

541 return self._state 

542 

543 @state.inplace.setter 1a

544 def _set_state(self, value: Optional[FlowRunState]) -> None: 1a

545 # because this is a slightly non-standard SQLAlchemy relationship, we 

546 # prefer an explicit setter method to a setter property, because 

547 # user expectations about SQLAlchemy attribute assignment might not be 

548 # met, namely that an unrelated (from SQLAlchemy's perspective) field of 

549 # the provided state is also modified. However, property assignment 

550 # still works because the ORM model's __init__ depends on it. 

551 return self.set_state(value) 

552 

553 def set_state(self, state: Optional[FlowRunState]) -> None: 1a

554 """ 

555 If a state is assigned to this run, populate its run id. 

556 

557 This would normally be handled by the back-populated SQLAlchemy 

558 relationship, but because this is a one-to-one pointer to a 

559 one-to-many relationship, SQLAlchemy can't figure it out. 

560 """ 

561 if state is not None: 

562 state.flow_run_id = self.id 

563 self._state = state 

564 

565 flow: Mapped["Flow"] = relationship(back_populates="flow_runs", lazy="raise") 1a

566 

567 task_runs: Mapped[list["TaskRun"]] = relationship( 1a

568 back_populates="flow_run", 

569 lazy="raise", 

570 # foreign_keys=lambda: [flow_run_id], 

571 primaryjoin="TaskRun.flow_run_id==FlowRun.id", 

572 ) 

573 

574 parent_task_run: Mapped[Optional["TaskRun"]] = relationship( 1a

575 back_populates="subflow_run", 

576 lazy="raise", 

577 foreign_keys=[parent_task_run_id], 

578 ) 

579 

580 work_queue: Mapped[Optional["WorkQueue"]] = relationship( 1a

581 lazy="selectin", foreign_keys=[work_queue_id] 

582 ) 

583 

584 @declared_attr.directive 1a

585 @classmethod 1a

586 def __table_args__(cls) -> Iterable[sa.Index]: 1a

587 return ( 1a

588 sa.Index( 

589 "uq_flow_run__flow_id_idempotency_key", 

590 cls.flow_id, 

591 cls.idempotency_key, 

592 unique=True, 

593 ), 

594 sa.Index( 

595 "ix_flow_run__coalesce_start_time_expected_start_time_desc", 

596 coalesce(cls.start_time, cls.expected_start_time).desc(), 

597 ), 

598 sa.Index( 

599 "ix_flow_run__coalesce_start_time_expected_start_time_asc", 

600 coalesce(cls.start_time, cls.expected_start_time).asc(), 

601 ), 

602 sa.Index( 

603 "ix_flow_run__expected_start_time_desc", 

604 cls.expected_start_time.desc(), 

605 ), 

606 sa.Index( 

607 "ix_flow_run__next_scheduled_start_time_asc", 

608 cls.next_scheduled_start_time.asc(), 

609 ), 

610 sa.Index( 

611 "ix_flow_run__end_time_desc", 

612 cls.end_time.desc(), 

613 ), 

614 sa.Index( 

615 "ix_flow_run__start_time", 

616 cls.start_time, 

617 ), 

618 sa.Index( 

619 "ix_flow_run__state_type", 

620 cls.state_type, 

621 ), 

622 sa.Index( 

623 "ix_flow_run__state_name", 

624 cls.state_name, 

625 ), 

626 sa.Index( 

627 "ix_flow_run__state_timestamp", 

628 cls.state_timestamp, 

629 ), 

630 sa.Index("trgm_ix_flow_run_name", cls.name, postgresql_using="gin").ddl_if( 

631 dialect="postgresql" 

632 ), 

633 sa.Index( 

634 # index names are at most 63 characters long. 

635 "ix_flow_run__scheduler_deployment_id_auto_scheduled_next_schedu", 

636 cls.deployment_id, 

637 cls.auto_scheduled, 

638 cls.next_scheduled_start_time, 

639 postgresql_where=cls.state_type == schemas.states.StateType.SCHEDULED, 

640 sqlite_where=cls.state_type == schemas.states.StateType.SCHEDULED, 

641 ), 

642 ) 

643 

644 

645_TaskInput = Union[ 1a

646 schemas.core.TaskRunResult, 

647 schemas.core.FlowRunResult, 

648 schemas.core.Parameter, 

649 schemas.core.Constant, 

650] 

651_TaskInputs = dict[str, list[_TaskInput]] 1a

652 

653 

654class TaskRun(Run): 1a

655 """SQLAlchemy model of a task run.""" 

656 

657 flow_run_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

658 sa.ForeignKey("flow_run.id", ondelete="cascade"), index=True 

659 ) 

660 

661 task_key: Mapped[str] = mapped_column() 1a

662 dynamic_key: Mapped[str] = mapped_column() 1a

663 cache_key: Mapped[Optional[str]] 1a

664 cache_expiration: Mapped[Optional[DateTime]] 1a

665 task_version: Mapped[Optional[str]] 1a

666 flow_run_run_count: Mapped[int] = mapped_column(server_default="0", default=0) 1a

667 empirical_policy: Mapped[schemas.core.TaskRunPolicy] = mapped_column( 1a

668 Pydantic(schemas.core.TaskRunPolicy), 

669 server_default="{}", 

670 default=schemas.core.TaskRunPolicy, 

671 ) 

672 task_inputs: Mapped[_TaskInputs] = mapped_column( 1a

673 Pydantic(_TaskInputs), server_default="{}", default=dict 

674 ) 

675 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a

676 labels: Mapped[Optional[schemas.core.KeyValueLabels]] = mapped_column(JSON) 1a

677 

678 state_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

679 UUID, 

680 index=True, 

681 ) 

682 

683 # -------------------------- relationships 

684 

685 # current states are eagerly loaded unless otherwise specified 

686 _state: Mapped[Optional[TaskRunState]] = relationship( 1a

687 lazy="selectin", 

688 foreign_keys=[state_id], 

689 primaryjoin="TaskRunState.id==TaskRun.state_id", 

690 ) 

691 

692 @hybrid_property 1a

693 def state(self) -> Optional[TaskRunState]: 1a

694 return self._state 

695 

696 @state.inplace.setter 1a

697 def _set_state(self, value: Optional[TaskRunState]) -> None: 1a

698 # because this is a slightly non-standard SQLAlchemy relationship, we 

699 # prefer an explicit setter method to a setter property, because 

700 # user expectations about SQLAlchemy attribute assignment might not be 

701 # met, namely that an unrelated (from SQLAlchemy's perspective) field of 

702 # the provided state is also modified. However, property assignment 

703 # still works because the ORM model's __init__ depends on it. 

704 return self.set_state(value) 

705 

706 def set_state(self, state: Optional[TaskRunState]) -> None: 1a

707 """ 

708 If a state is assigned to this run, populate its run id. 

709 

710 This would normally be handled by the back-populated SQLAlchemy 

711 relationship, but because this is a one-to-one pointer to a 

712 one-to-many relationship, SQLAlchemy can't figure it out. 

713 """ 

714 if state is not None: 

715 state.task_run_id = self.id 

716 self._state = state 

717 

718 flow_run: Mapped[Optional["FlowRun"]] = relationship( 1a

719 back_populates="task_runs", 

720 lazy="raise", 

721 foreign_keys=[flow_run_id], 

722 ) 

723 

724 subflow_run: Mapped["FlowRun"] = relationship( 1a

725 back_populates="parent_task_run", 

726 lazy="raise", 

727 # foreign_keys=["FlowRun.parent_task_run_id"], 

728 primaryjoin="FlowRun.parent_task_run_id==TaskRun.id", 

729 uselist=False, 

730 ) 

731 

732 @declared_attr.directive 1a

733 @classmethod 1a

734 def __table_args__(cls) -> Iterable[sa.Index]: 1a

735 return ( 1a

736 sa.Index( 

737 "uq_task_run__flow_run_id_task_key_dynamic_key", 

738 cls.flow_run_id, 

739 cls.task_key, 

740 cls.dynamic_key, 

741 unique=True, 

742 ), 

743 sa.Index( 

744 "ix_task_run__expected_start_time_desc", 

745 cls.expected_start_time.desc(), 

746 ), 

747 sa.Index( 

748 "ix_task_run__next_scheduled_start_time_asc", 

749 cls.next_scheduled_start_time.asc(), 

750 ), 

751 sa.Index( 

752 "ix_task_run__end_time_desc", 

753 cls.end_time.desc(), 

754 ), 

755 sa.Index( 

756 "ix_task_run__start_time", 

757 cls.start_time, 

758 ), 

759 sa.Index( 

760 "ix_task_run__state_type", 

761 cls.state_type, 

762 ), 

763 sa.Index( 

764 "ix_task_run__state_name", 

765 cls.state_name, 

766 ), 

767 sa.Index( 

768 "ix_task_run__state_timestamp", 

769 cls.state_timestamp, 

770 ), 

771 sa.Index("trgm_ix_task_run_name", cls.name, postgresql_using="gin").ddl_if( 

772 dialect="postgresql" 

773 ), 

774 ) 

775 

776 

777class DeploymentSchedule(Base): 1a

778 deployment_id: Mapped[uuid.UUID] = mapped_column( 1a

779 sa.ForeignKey("deployment.id", ondelete="CASCADE"), index=True 

780 ) 

781 

782 schedule: Mapped[schemas.schedules.SCHEDULE_TYPES] = mapped_column( 1a

783 Pydantic(schemas.schedules.SCHEDULE_TYPES) 

784 ) 

785 active: Mapped[bool] = mapped_column(default=True) 1a

786 max_scheduled_runs: Mapped[Optional[int]] 1a

787 parameters: Mapped[dict[str, Any]] = mapped_column( 1a

788 JSON, server_default="{}", default=dict, nullable=False 

789 ) 

790 slug: Mapped[Optional[str]] = mapped_column(sa.String, nullable=True) 1a

791 

792 @declared_attr.directive 1a

793 @classmethod 1a

794 def __table_args__(cls) -> Iterable[sa.Index]: 1a

795 return ( 1a

796 sa.Index( 

797 "ix_deployment_schedule__deployment_id__slug", 

798 cls.deployment_id, 

799 cls.slug, 

800 unique=True, 

801 ), 

802 sa.Index( 

803 "ix_deployment_schedule__slug", 

804 cls.slug, 

805 unique=False, 

806 ), 

807 ) 

808 

809 

810class Deployment(Base): 1a

811 """SQLAlchemy model of a deployment.""" 

812 

813 name: Mapped[str] 1a

814 version: Mapped[Optional[str]] 1a

815 description: Mapped[Optional[str]] = mapped_column(sa.Text()) 1a

816 work_queue_name: Mapped[Optional[str]] = mapped_column(index=True) 1a

817 infra_overrides: Mapped[dict[str, Any]] = mapped_column( 1a

818 JSON, server_default="{}", default=dict 

819 ) 

820 path: Mapped[Optional[str]] 1a

821 entrypoint: Mapped[Optional[str]] 1a

822 

823 last_polled: Mapped[Optional[DateTime]] 1a

824 status: Mapped[DeploymentStatus] = mapped_column( 1a

825 sa.Enum(DeploymentStatus, name="deployment_status"), 

826 default=DeploymentStatus.NOT_READY, 

827 server_default="NOT_READY", 

828 ) 

829 

830 @declared_attr 1a

831 def job_variables(self) -> Mapped[dict[str, Any]]: 1a

832 return synonym("infra_overrides") 1a

833 

834 flow_id: Mapped[uuid.UUID] = mapped_column( 1a

835 sa.ForeignKey("flow.id", ondelete="CASCADE"), index=True 

836 ) 

837 

838 work_queue_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

839 sa.ForeignKey("work_queue.id", ondelete="SET NULL"), index=True 

840 ) 

841 paused: Mapped[bool] = mapped_column(server_default="0", default=False, index=True) 1a

842 

843 schedules: Mapped[list["DeploymentSchedule"]] = relationship( 1a

844 lazy="selectin", order_by=lambda: DeploymentSchedule.updated.desc() 

845 ) 

846 

847 # deprecated in favor of `concurrency_limit_id` FK 

848 _concurrency_limit: Mapped[Optional[int]] = mapped_column(name="concurrency_limit") 1a

849 concurrency_limit_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

850 sa.ForeignKey("concurrency_limit_v2.id", ondelete="SET NULL"), 

851 ) 

852 global_concurrency_limit: Mapped[Optional["ConcurrencyLimitV2"]] = relationship( 1a

853 lazy="selectin", 

854 ) 

855 concurrency_options: Mapped[Optional[schemas.core.ConcurrencyOptions]] = ( 1a

856 mapped_column( 

857 Pydantic(schemas.core.ConcurrencyOptions), 

858 server_default=None, 

859 nullable=True, 

860 default=None, 

861 ) 

862 ) 

863 

864 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a

865 labels: Mapped[Optional[schemas.core.KeyValueLabels]] = mapped_column(JSON) 1a

866 parameters: Mapped[dict[str, Any]] = mapped_column( 1a

867 JSON, server_default="{}", default=dict 

868 ) 

869 pull_steps: Mapped[Optional[list[dict[str, Any]]]] = mapped_column( 1a

870 JSON, default=list 

871 ) 

872 parameter_openapi_schema: Mapped[Optional[dict[str, Any]]] = mapped_column( 1a

873 JSON, default=dict 

874 ) 

875 enforce_parameter_schema: Mapped[bool] = mapped_column( 1a

876 default=True, server_default="0" 

877 ) 

878 created_by: Mapped[Optional[schemas.core.CreatedBy]] = mapped_column( 1a

879 Pydantic(schemas.core.CreatedBy) 

880 ) 

881 updated_by: Mapped[Optional[schemas.core.UpdatedBy]] = mapped_column( 1a

882 Pydantic(schemas.core.UpdatedBy) 

883 ) 

884 

885 infrastructure_document_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

886 sa.ForeignKey("block_document.id", ondelete="CASCADE"), index=False 

887 ) 

888 

889 storage_document_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

890 sa.ForeignKey("block_document.id", ondelete="CASCADE"), 

891 index=False, 

892 ) 

893 

894 flow: Mapped["Flow"] = relationship( 1a

895 "Flow", back_populates="deployments", lazy="raise" 

896 ) 

897 

898 work_queue: Mapped[Optional["WorkQueue"]] = relationship( 1a

899 lazy="selectin", foreign_keys=[work_queue_id] 

900 ) 

901 

902 __table_args__: Any = ( 1a

903 sa.Index( 

904 "uq_deployment__flow_id_name", 

905 "flow_id", 

906 "name", 

907 unique=True, 

908 ), 

909 sa.Index( 

910 "ix_deployment__created", 

911 "created", 

912 ), 

913 sa.Index("trgm_ix_deployment_name", "name", postgresql_using="gin").ddl_if( 

914 dialect="postgresql" 

915 ), 

916 ) 

917 

918 

919class Log(Base): 1a

920 """ 

921 SQLAlchemy model of a logging statement. 

922 """ 

923 

924 name: Mapped[str] 1a

925 level: Mapped[int] = mapped_column(sa.SmallInteger, index=True) 1a

926 flow_run_id: Mapped[Optional[uuid.UUID]] = mapped_column(index=True) 1a

927 task_run_id: Mapped[Optional[uuid.UUID]] = mapped_column(index=True) 1a

928 message: Mapped[str] = mapped_column(sa.Text) 1a

929 

930 # The client-side timestamp of this logged statement. 

931 timestamp: Mapped[DateTime] = mapped_column(index=True) 1a

932 

933 __table_args__: Any = ( 1a

934 sa.Index( 

935 "ix_log__flow_run_id_timestamp", 

936 "flow_run_id", 

937 "timestamp", 

938 ), 

939 ) 

940 

941 

942class ConcurrencyLimit(Base): 1a

943 tag: Mapped[str] 1a

944 concurrency_limit: Mapped[int] 1a

945 active_slots: Mapped[list[str]] = mapped_column( 1a

946 JSON, server_default="[]", default=list 

947 ) 

948 

949 __table_args__: Any = (sa.Index("uq_concurrency_limit__tag", "tag", unique=True),) 1a

950 

951 

952class ConcurrencyLimitV2(Base): 1a

953 active: Mapped[bool] = mapped_column(default=True) 1a

954 name: Mapped[str] 1a

955 limit: Mapped[int] 1a

956 active_slots: Mapped[int] = mapped_column(default=0) 1a

957 denied_slots: Mapped[int] = mapped_column(default=0) 1a

958 

959 slot_decay_per_second: Mapped[float] = mapped_column(default=0.0) 1a

960 avg_slot_occupancy_seconds: Mapped[float] = mapped_column(default=2.0) 1a

961 

962 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a

963 

964 

965class BlockType(Base): 1a

966 name: Mapped[str] 1a

967 slug: Mapped[str] 1a

968 logo_url: Mapped[Optional[str]] 1a

969 documentation_url: Mapped[Optional[str]] 1a

970 description: Mapped[Optional[str]] 1a

971 code_example: Mapped[Optional[str]] 1a

972 is_protected: Mapped[bool] = mapped_column(server_default="0", default=False) 1a

973 

974 __table_args__: Any = ( 1a

975 sa.Index( 

976 "uq_block_type__slug", 

977 "slug", 

978 unique=True, 

979 ), 

980 sa.Index("trgm_ix_block_type_name", "name", postgresql_using="gin").ddl_if( 

981 dialect="postgresql" 

982 ), 

983 ) 

984 

985 

986class BlockSchema(Base): 1a

987 checksum: Mapped[str] 1a

988 fields: Mapped[dict[str, Any]] = mapped_column( 1a

989 JSON, server_default="{}", default=dict 

990 ) 

991 capabilities: Mapped[list[str]] = mapped_column( 1a

992 JSON, server_default="[]", default=list 

993 ) 

994 version: Mapped[str] = mapped_column( 1a

995 server_default=schemas.core.DEFAULT_BLOCK_SCHEMA_VERSION, 

996 ) 

997 

998 block_type_id: Mapped[uuid.UUID] = mapped_column( 1a

999 sa.ForeignKey("block_type.id", ondelete="cascade"), index=True 

1000 ) 

1001 

1002 block_type: Mapped["BlockType"] = relationship(lazy="selectin") 1a

1003 

1004 __table_args__: Any = ( 1a

1005 sa.Index( 

1006 "uq_block_schema__checksum_version", 

1007 "checksum", 

1008 "version", 

1009 unique=True, 

1010 ), 

1011 sa.Index("ix_block_schema__created", "created"), 

1012 sa.Index( 

1013 "ix_block_schema__capabilities", "capabilities", postgresql_using="gin" 

1014 ).ddl_if(dialect="postgresql"), 

1015 ) 

1016 

1017 

1018class BlockSchemaReference(Base): 1a

1019 name: Mapped[str] 1a

1020 

1021 parent_block_schema_id: Mapped[uuid.UUID] = mapped_column( 1a

1022 sa.ForeignKey("block_schema.id", ondelete="cascade") 

1023 ) 

1024 

1025 reference_block_schema_id: Mapped[uuid.UUID] = mapped_column( 1a

1026 sa.ForeignKey("block_schema.id", ondelete="cascade") 

1027 ) 

1028 

1029 

1030class BlockDocument(Base): 1a

1031 name: Mapped[str] = mapped_column(index=True) 1a

1032 data: Mapped[Any] = mapped_column(JSON, server_default="{}", default=dict) 1a

1033 is_anonymous: Mapped[bool] = mapped_column(server_default="0", index=True) 1a

1034 

1035 block_type_name: Mapped[Optional[str]] 1a

1036 

1037 block_type_id: Mapped[uuid.UUID] = mapped_column( 1a

1038 sa.ForeignKey("block_type.id", ondelete="cascade") 

1039 ) 

1040 

1041 block_type: Mapped["BlockType"] = relationship(lazy="selectin") 1a

1042 

1043 block_schema_id: Mapped[uuid.UUID] = mapped_column( 1a

1044 sa.ForeignKey("block_schema.id", ondelete="cascade") 

1045 ) 

1046 

1047 block_schema: Mapped["BlockSchema"] = relationship(lazy="selectin") 1a

1048 

1049 __table_args__: Any = ( 1a

1050 sa.Index( 

1051 "uq_block__type_id_name", 

1052 "block_type_id", 

1053 "name", 

1054 unique=True, 

1055 ), 

1056 sa.Index("ix_block_document__block_type_name__name", "block_type_name", "name"), 

1057 sa.Index("trgm_ix_block_document_name", "name", postgresql_using="gin").ddl_if( 

1058 dialect="postgresql" 

1059 ), 

1060 ) 

1061 

1062 async def encrypt_data(self, session: AsyncSession, data: dict[str, Any]) -> None: 1a

1063 """ 

1064 Store encrypted data on the ORM model 

1065 

1066 Note: will only succeed if the caller has sufficient permission. 

1067 """ 

1068 self.data = await encrypt_fernet(session, data) 

1069 

1070 async def decrypt_data(self, session: AsyncSession) -> dict[str, Any]: 1a

1071 """ 

1072 Retrieve decrypted data from the ORM model. 

1073 

1074 Note: will only succeed if the caller has sufficient permission. 

1075 """ 

1076 return await decrypt_fernet(session, self.data) 

1077 

1078 

1079class BlockDocumentReference(Base): 1a

1080 name: Mapped[str] 1a

1081 

1082 parent_block_document_id: Mapped[uuid.UUID] = mapped_column( 1a

1083 sa.ForeignKey("block_document.id", ondelete="cascade"), 

1084 ) 

1085 

1086 reference_block_document_id: Mapped[uuid.UUID] = mapped_column( 1a

1087 sa.ForeignKey("block_document.id", ondelete="cascade"), 

1088 ) 

1089 

1090 

1091class Configuration(Base): 1a

1092 key: Mapped[str] = mapped_column(index=True) 1a

1093 value: Mapped[dict[str, Any]] = mapped_column(JSON) 1a

1094 

1095 __table_args__: Any = (sa.UniqueConstraint("key"),) 1a

1096 

1097 

1098class SavedSearch(Base): 1a

1099 """SQLAlchemy model of a saved search.""" 

1100 

1101 name: Mapped[str] 1a

1102 filters: Mapped[list[dict[str, Any]]] = mapped_column( 1a

1103 JSON, server_default="[]", default=list 

1104 ) 

1105 

1106 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a

1107 

1108 

1109class WorkQueue(Base): 1a

1110 """SQLAlchemy model of a work queue""" 

1111 

1112 name: Mapped[str] 1a

1113 

1114 filter: Mapped[Optional[schemas.core.QueueFilter]] = mapped_column( 1a

1115 Pydantic(schemas.core.QueueFilter) 

1116 ) 

1117 description: Mapped[str] = mapped_column(default="", server_default="") 1a

1118 is_paused: Mapped[bool] = mapped_column(server_default="0", default=False) 1a

1119 concurrency_limit: Mapped[Optional[int]] 1a

1120 priority: Mapped[int] 1a

1121 

1122 last_polled: Mapped[Optional[DateTime]] 1a

1123 status: Mapped[WorkQueueStatus] = mapped_column( 1a

1124 sa.Enum(WorkQueueStatus, name="work_queue_status"), 

1125 default=WorkQueueStatus.NOT_READY, 

1126 server_default=WorkQueueStatus.NOT_READY, 

1127 ) 

1128 

1129 work_pool_id: Mapped[uuid.UUID] = mapped_column( 1a

1130 sa.ForeignKey("work_pool.id", ondelete="cascade"), index=True 

1131 ) 

1132 

1133 work_pool: Mapped["WorkPool"] = relationship( 1a

1134 lazy="selectin", foreign_keys=[work_pool_id] 

1135 ) 

1136 

1137 __table_args__: ClassVar[Any] = ( 1a

1138 sa.UniqueConstraint("work_pool_id", "name"), 

1139 sa.Index("ix_work_queue__work_pool_id_priority", "work_pool_id", "priority"), 

1140 sa.Index("trgm_ix_work_queue_name", "name", postgresql_using="gin").ddl_if( 

1141 dialect="postgresql" 

1142 ), 

1143 ) 

1144 

1145 

1146class WorkPool(Base): 1a

1147 """SQLAlchemy model of an worker""" 

1148 

1149 name: Mapped[str] 1a

1150 description: Mapped[Optional[str]] 1a

1151 type: Mapped[str] = mapped_column(index=True) 1a

1152 base_job_template: Mapped[dict[str, Any]] = mapped_column( 1a

1153 JSON, server_default="{}", default={} 

1154 ) 

1155 is_paused: Mapped[bool] = mapped_column(server_default="0", default=False) 1a

1156 default_queue_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a

1157 UUID, 

1158 sa.ForeignKey("work_queue.id", ondelete="RESTRICT", use_alter=True), 

1159 nullable=True, 

1160 ) 

1161 concurrency_limit: Mapped[Optional[int]] 1a

1162 

1163 status: Mapped[WorkPoolStatus] = mapped_column( 1a

1164 sa.Enum(WorkPoolStatus, name="work_pool_status"), 

1165 default=WorkPoolStatus.NOT_READY, 

1166 server_default=WorkPoolStatus.NOT_READY, 

1167 ) 

1168 last_transitioned_status_at: Mapped[Optional[DateTime]] 1a

1169 last_status_event_id: Mapped[Optional[uuid.UUID]] 1a

1170 

1171 storage_configuration: Mapped[schemas.core.WorkPoolStorageConfiguration] = ( 1a

1172 mapped_column( 

1173 Pydantic(schemas.core.WorkPoolStorageConfiguration), 

1174 server_default="{}", 

1175 default=schemas.core.WorkPoolStorageConfiguration, 

1176 nullable=False, 

1177 ) 

1178 ) 

1179 

1180 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a

1181 

1182 

1183class Worker(Base): 1a

1184 """SQLAlchemy model of an worker""" 

1185 

1186 work_pool_id: Mapped[uuid.UUID] = mapped_column( 1a

1187 sa.ForeignKey("work_pool.id", ondelete="cascade"), index=True 

1188 ) 

1189 

1190 name: Mapped[str] 1a

1191 last_heartbeat_time: Mapped[DateTime] = mapped_column( 1a

1192 server_default=sa.func.now(), default=lambda: now("UTC") 

1193 ) 

1194 heartbeat_interval_seconds: Mapped[Optional[int]] 1a

1195 

1196 status: Mapped[WorkerStatus] = mapped_column( 1a

1197 sa.Enum(WorkerStatus, name="worker_status"), 

1198 default=WorkerStatus.OFFLINE, 

1199 server_default=WorkerStatus.OFFLINE, 

1200 ) 

1201 

1202 __table_args__: Any = ( 1a

1203 sa.UniqueConstraint("work_pool_id", "name"), 

1204 sa.Index( 

1205 "ix_worker__work_pool_id_last_heartbeat_time", 

1206 "work_pool_id", 

1207 "last_heartbeat_time", 

1208 ), 

1209 ) 

1210 

1211 

1212class Agent(Base): 1a

1213 """SQLAlchemy model of an agent""" 

1214 

1215 name: Mapped[str] 1a

1216 

1217 work_queue_id: Mapped[uuid.UUID] = mapped_column( 1a

1218 sa.ForeignKey("work_queue.id"), index=True 

1219 ) 

1220 

1221 last_activity_time: Mapped[DateTime] = mapped_column( 1a

1222 server_default=sa.func.now(), default=lambda: now("UTC") 

1223 ) 

1224 

1225 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a

1226 

1227 

1228class Variable(Base): 1a

1229 name: Mapped[str] 1a

1230 value: Mapped[Optional[Any]] = mapped_column(JSON) 1a

1231 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a

1232 

1233 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a

1234 

1235 

1236class FlowRunInput(Base): 1a

1237 flow_run_id: Mapped[uuid.UUID] = mapped_column( 1a

1238 sa.ForeignKey("flow_run.id", ondelete="cascade") 

1239 ) 

1240 

1241 key: Mapped[str] 1a

1242 value: Mapped[str] = mapped_column(sa.Text()) 1a

1243 sender: Mapped[Optional[str]] 1a

1244 

1245 __table_args__: Any = (sa.UniqueConstraint("flow_run_id", "key"),) 1a

1246 

1247 

1248class CsrfToken(Base): 1a

1249 token: Mapped[str] 1a

1250 client: Mapped[str] = mapped_column(unique=True) 1a

1251 expiration: Mapped[DateTime] 1a

1252 

1253 

1254class Automation(Base): 1a

1255 name: Mapped[str] 1a

1256 description: Mapped[str] = mapped_column(default="") 1a

1257 

1258 enabled: Mapped[bool] = mapped_column(server_default="1", default=True) 1a

1259 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a

1260 

1261 trigger: Mapped[ServerTriggerTypes] = mapped_column(Pydantic(ServerTriggerTypes)) 1a

1262 

1263 actions: Mapped[ServerActionTypes] = mapped_column( 1a

1264 Pydantic(list[ServerActionTypes]) 

1265 ) 

1266 actions_on_trigger: Mapped[list[ServerActionTypes]] = mapped_column( 1a

1267 Pydantic(list[ServerActionTypes]), server_default="[]", default=list 

1268 ) 

1269 actions_on_resolve: Mapped[list[ServerActionTypes]] = mapped_column( 1a

1270 Pydantic(list[ServerActionTypes]), server_default="[]", default=list 

1271 ) 

1272 

1273 related_resources: Mapped[list["AutomationRelatedResource"]] = relationship( 1a

1274 "AutomationRelatedResource", back_populates="automation", lazy="raise" 

1275 ) 

1276 

1277 @classmethod 1a

1278 def sort_expression(cls, value: AutomationSort) -> sa.ColumnExpressionArgument[Any]: 1a

1279 """Return an expression used to sort Automations""" 

1280 sort_mapping: dict[AutomationSort, sa.ColumnExpressionArgument[Any]] = { 

1281 AutomationSort.CREATED_DESC: cls.created.desc(), 

1282 AutomationSort.UPDATED_DESC: cls.updated.desc(), 

1283 AutomationSort.NAME_ASC: cls.name.asc(), 

1284 AutomationSort.NAME_DESC: cls.name.desc(), 

1285 } 

1286 return sort_mapping[value] 

1287 

1288 

1289class AutomationBucket(Base): 1a

1290 __table_args__: Any = ( 1a

1291 sa.Index( 

1292 "uq_automation_bucket__automation_id__trigger_id__bucketing_key", 

1293 "automation_id", 

1294 "trigger_id", 

1295 "bucketing_key", 

1296 unique=True, 

1297 ), 

1298 sa.Index( 

1299 "ix_automation_bucket__automation_id__end", 

1300 "automation_id", 

1301 "end", 

1302 ), 

1303 ) 

1304 

1305 automation_id: Mapped[uuid.UUID] = mapped_column( 1a

1306 sa.ForeignKey("automation.id", ondelete="CASCADE") 

1307 ) 

1308 

1309 trigger_id: Mapped[uuid.UUID] 1a

1310 

1311 bucketing_key: Mapped[list[str]] = mapped_column( 1a

1312 JSON, server_default="[]", default=list 

1313 ) 

1314 

1315 last_event: Mapped[Optional[ReceivedEvent]] = mapped_column(Pydantic(ReceivedEvent)) 1a

1316 

1317 start: Mapped[DateTime] 1a

1318 end: Mapped[DateTime] 1a

1319 

1320 count: Mapped[int] 1a

1321 

1322 last_operation: Mapped[Optional[str]] 1a

1323 

1324 triggered_at: Mapped[Optional[DateTime]] 1a

1325 

1326 

1327class AutomationRelatedResource(Base): 1a

1328 __table_args__: Any = ( 1a

1329 sa.Index( 

1330 "uq_automation_related_resource__automation_id__resource_id", 

1331 "automation_id", 

1332 "resource_id", 

1333 unique=True, 

1334 ), 

1335 ) 

1336 

1337 automation_id: Mapped[uuid.UUID] = mapped_column( 1a

1338 sa.ForeignKey("automation.id", ondelete="CASCADE") 

1339 ) 

1340 

1341 resource_id: Mapped[Optional[str]] = mapped_column(index=True) 1a

1342 automation_owned_by_resource: Mapped[bool] = mapped_column( 1a

1343 default=False, server_default="0" 

1344 ) 

1345 

1346 automation: Mapped["Automation"] = relationship( 1a

1347 "Automation", back_populates="related_resources", lazy="raise" 

1348 ) 

1349 

1350 

1351class CompositeTriggerChildFiring(Base): 1a

1352 __table_args__: Any = ( 1a

1353 sa.Index( 

1354 "uq_composite_trigger_child_firing__a_id__pt_id__ct__id", 

1355 "automation_id", 

1356 "parent_trigger_id", 

1357 "child_trigger_id", 

1358 unique=True, 

1359 ), 

1360 ) 

1361 

1362 automation_id: Mapped[uuid.UUID] = mapped_column( 1a

1363 sa.ForeignKey("automation.id", ondelete="CASCADE") 

1364 ) 

1365 

1366 parent_trigger_id: Mapped[uuid.UUID] 1a

1367 

1368 child_trigger_id: Mapped[uuid.UUID] 1a

1369 child_firing_id: Mapped[uuid.UUID] 1a

1370 child_fired_at: Mapped[Optional[DateTime]] 1a

1371 child_firing: Mapped[Firing] = mapped_column(Pydantic(Firing)) 1a

1372 

1373 

1374class AutomationEventFollower(Base): 1a

1375 __table_args__: Any = ( 1a

1376 sa.Index( 

1377 "uq_follower_for_scope", 

1378 "scope", 

1379 "follower_event_id", 

1380 unique=True, 

1381 ), 

1382 # allows lookup on (scope, leader_event_id) to use an index-only instead of full table scan 

1383 sa.Index( 

1384 "ix_ae_follower_scope_leader", 

1385 "scope", 

1386 "leader_event_id", 

1387 ), 

1388 ) 

1389 scope: Mapped[str] = mapped_column(default="", index=True) 1a

1390 leader_event_id: Mapped[uuid.UUID] = mapped_column(index=True) 1a

1391 follower_event_id: Mapped[uuid.UUID] 1a

1392 received: Mapped[DateTime] = mapped_column(index=True) 1a

1393 follower: Mapped[ReceivedEvent] = mapped_column(Pydantic(ReceivedEvent)) 1a

1394 

1395 

1396class Event(Base): 1a

1397 @declared_attr.directive 1a

1398 def __tablename__(cls) -> str: 1a

1399 return "events" 1a

1400 

1401 __table_args__: Any = ( 1a

1402 sa.Index( 

1403 "ix_events__related_resource_ids_gin", 

1404 "related_resource_ids", 

1405 postgresql_using="gin", 

1406 ), 

1407 sa.Index("ix_events__occurred", "occurred"), 

1408 sa.Index("ix_events__event__id", "event", "id"), 

1409 sa.Index( 

1410 "ix_events__event_resource_id_occurred", 

1411 "event", 

1412 "resource_id", 

1413 "occurred", 

1414 ), 

1415 sa.Index("ix_events__occurred_id", "occurred", "id"), 

1416 sa.Index("ix_events__event_occurred_id", "event", "occurred", "id"), 

1417 sa.Index( 

1418 "ix_events__related_gin", 

1419 "related", 

1420 postgresql_using="gin", 

1421 ), 

1422 sa.Index( 

1423 "ix_events__event_occurred", 

1424 "event", 

1425 "occurred", 

1426 ), 

1427 ) 

1428 

1429 occurred: Mapped[DateTime] 1a

1430 event: Mapped[str] = mapped_column(sa.Text()) 1a

1431 resource_id: Mapped[str] = mapped_column(sa.Text()) 1a

1432 resource: Mapped[dict[str, Any]] = mapped_column(JSON()) 1a

1433 related_resource_ids: Mapped[list[str]] = mapped_column( 1a

1434 JSON(), server_default="[]", default=list 

1435 ) 

1436 related: Mapped[list[dict[str, Any]]] = mapped_column( 1a

1437 JSON(), server_default="[]", default=list 

1438 ) 

1439 payload: Mapped[dict[str, Any]] = mapped_column(JSON()) 1a

1440 received: Mapped[DateTime] 1a

1441 recorded: Mapped[DateTime] 1a

1442 follows: Mapped[Optional[uuid.UUID]] 1a

1443 

1444 

1445class EventResource(Base): 1a

1446 @declared_attr.directive 1a

1447 def __tablename__(cls) -> str: 1a

1448 return "event_resources" 1a

1449 

1450 __table_args__: Any = ( 1a

1451 sa.Index( 

1452 "ix_event_resources__resource_id__occurred", 

1453 "resource_id", 

1454 "occurred", 

1455 ), 

1456 ) 

1457 

1458 occurred: Mapped[DateTime] 1a

1459 resource_id: Mapped[str] = mapped_column(sa.Text()) 1a

1460 resource_role: Mapped[str] = mapped_column(sa.Text()) 1a

1461 resource: Mapped[dict[str, Any]] = mapped_column(sa_JSON) 1a

1462 event_id: Mapped[uuid.UUID] 1a

1463 

1464 

1465# These are temporary until we've migrated all the references to the new, 

1466# non-ORM names 

1467 

1468ORMFlow = Flow 1a

1469ORMFlowRunState = FlowRunState 1a

1470ORMTaskRunState = TaskRunState 1a

1471ORMArtifact = Artifact 1a

1472ORMArtifactCollection = ArtifactCollection 1a

1473ORMTaskRunStateCache = TaskRunStateCache 1a

1474ORMRun = Run 1a

1475ORMFlowRun = FlowRun 1a

1476ORMTaskRun = TaskRun 1a

1477ORMDeploymentSchedule = DeploymentSchedule 1a

1478ORMDeployment = Deployment 1a

1479ORMLog = Log 1a

1480ORMConcurrencyLimit = ConcurrencyLimit 1a

1481ORMConcurrencyLimitV2 = ConcurrencyLimitV2 1a

1482ORMBlockType = BlockType 1a

1483ORMBlockSchema = BlockSchema 1a

1484ORMBlockSchemaReference = BlockSchemaReference 1a

1485ORMBlockDocument = BlockDocument 1a

1486ORMBlockDocumentReference = BlockDocumentReference 1a

1487ORMConfiguration = Configuration 1a

1488ORMSavedSearch = SavedSearch 1a

1489ORMWorkQueue = WorkQueue 1a

1490ORMWorkPool = WorkPool 1a

1491ORMWorker = Worker 1a

1492ORMAgent = Agent 1a

1493ORMVariable = Variable 1a

1494ORMFlowRunInput = FlowRunInput 1a

1495ORMCsrfToken = CsrfToken 1a

1496ORMAutomation = Automation 1a

1497ORMAutomationBucket = AutomationBucket 1a

1498ORMAutomationRelatedResource = AutomationRelatedResource 1a

1499ORMCompositeTriggerChildFiring = CompositeTriggerChildFiring 1a

1500ORMAutomationEventFollower = AutomationEventFollower 1a

1501ORMEvent = Event 1a

1502ORMEventResource = EventResource 1a

1503 

1504 

1505_UpsertColumns = Iterable[Union[str, "sa.Column[Any]", roles.DDLConstraintColumnRole]] 1a

1506 

1507 

1508class BaseORMConfiguration(ABC): 1a

1509 """ 

1510 Abstract base class used to inject database-specific ORM configuration into Prefect. 

1511 

1512 Modifications to core Prefect REST API data structures can have unintended consequences. 

1513 Use with caution. 

1514 """ 

1515 

1516 def unique_key(self) -> tuple[Hashable, ...]: 1a

1517 """ 

1518 Returns a key used to determine whether to instantiate a new DB interface. 

1519 """ 

1520 return (self.__class__, Base.metadata) 1abcde

1521 

1522 @property 1a

1523 @abstractmethod 1a

1524 def versions_dir(self) -> Path: 1a

1525 """Directory containing migrations""" 

1526 ... 

1527 

1528 @property 1a

1529 def deployment_unique_upsert_columns(self) -> _UpsertColumns: 1a

1530 """Unique columns for upserting a Deployment""" 

1531 return [Deployment.flow_id, Deployment.name] 

1532 

1533 @property 1a

1534 def concurrency_limit_unique_upsert_columns(self) -> _UpsertColumns: 1a

1535 """Unique columns for upserting a ConcurrencyLimit""" 

1536 return [ConcurrencyLimit.tag] 

1537 

1538 @property 1a

1539 def flow_run_unique_upsert_columns(self) -> _UpsertColumns: 1a

1540 """Unique columns for upserting a FlowRun""" 

1541 return [FlowRun.flow_id, FlowRun.idempotency_key] 

1542 

1543 @property 1a

1544 def block_type_unique_upsert_columns(self) -> _UpsertColumns: 1a

1545 """Unique columns for upserting a BlockType""" 

1546 return [BlockType.slug] 1c

1547 

1548 @property 1a

1549 def artifact_collection_unique_upsert_columns(self) -> _UpsertColumns: 1a

1550 """Unique columns for upserting an ArtifactCollection""" 

1551 return [ArtifactCollection.key] 

1552 

1553 @property 1a

1554 def block_schema_unique_upsert_columns(self) -> _UpsertColumns: 1a

1555 """Unique columns for upserting a BlockSchema""" 

1556 return [BlockSchema.checksum, BlockSchema.version] 1c

1557 

1558 @property 1a

1559 def flow_unique_upsert_columns(self) -> _UpsertColumns: 1a

1560 """Unique columns for upserting a Flow""" 

1561 return [Flow.name] 

1562 

1563 @property 1a

1564 def saved_search_unique_upsert_columns(self) -> _UpsertColumns: 1a

1565 """Unique columns for upserting a SavedSearch""" 

1566 return [SavedSearch.name] 

1567 

1568 @property 1a

1569 def task_run_unique_upsert_columns(self) -> _UpsertColumns: 1a

1570 """Unique columns for upserting a TaskRun""" 

1571 return [ 

1572 TaskRun.flow_run_id, 

1573 TaskRun.task_key, 

1574 TaskRun.dynamic_key, 

1575 ] 

1576 

1577 @property 1a

1578 def block_document_unique_upsert_columns(self) -> _UpsertColumns: 1a

1579 """Unique columns for upserting a BlockDocument""" 

1580 return [BlockDocument.block_type_id, BlockDocument.name] 

1581 

1582 

1583class AsyncPostgresORMConfiguration(BaseORMConfiguration): 1a

1584 """Postgres specific orm configuration""" 

1585 

1586 @property 1a

1587 def versions_dir(self) -> Path: 1a

1588 """Directory containing migrations""" 

1589 import prefect.server.database 

1590 

1591 return ( 

1592 Path(prefect.server.database.__file__).parent 

1593 / "_migrations" 

1594 / "versions" 

1595 / "postgresql" 

1596 ) 

1597 

1598 

1599class AioSqliteORMConfiguration(BaseORMConfiguration): 1a

1600 """SQLite specific orm configuration""" 

1601 

1602 @property 1a

1603 def versions_dir(self) -> Path: 1a

1604 """Directory containing migrations""" 

1605 import prefect.server.database 1b

1606 

1607 return ( 1b

1608 Path(prefect.server.database.__file__).parent 

1609 / "_migrations" 

1610 / "versions" 

1611 / "sqlite" 

1612 )