Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/orm_models.py: 94%
550 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import datetime 1a
2import uuid 1a
3from abc import ABC, abstractmethod 1a
4from collections.abc import Hashable, Iterable 1a
5from pathlib import Path 1a
6from typing import TYPE_CHECKING, Any, ClassVar, Optional, Union 1a
8import sqlalchemy as sa 1a
9from sqlalchemy import FetchedValue 1a
10from sqlalchemy.dialects import postgresql 1a
11from sqlalchemy.ext.asyncio import AsyncSession 1a
12from sqlalchemy.ext.hybrid import hybrid_property 1a
13from sqlalchemy.orm import ( 1a
14 DeclarativeBase,
15 Mapped,
16 declared_attr,
17 mapped_column,
18 registry,
19 relationship,
20 synonym,
21)
22from sqlalchemy.orm.decl_api import registry as RegistryType 1a
23from sqlalchemy.sql import roles 1a
24from sqlalchemy.sql.functions import coalesce 1a
26import prefect.server.schemas as schemas 1a
27from prefect.server.events.actions import ServerActionTypes 1a
28from prefect.server.events.schemas.automations import ( 1a
29 AutomationSort,
30 Firing,
31 ServerTriggerTypes,
32)
33from prefect.server.events.schemas.events import ReceivedEvent 1a
34from prefect.server.schemas.statuses import ( 1a
35 DeploymentStatus,
36 WorkerStatus,
37 WorkPoolStatus,
38 WorkQueueStatus,
39)
40from prefect.server.utilities.database import ( 1a
41 CAMEL_TO_SNAKE,
42 JSON,
43 UUID,
44 GenerateUUID,
45 Pydantic,
46 Timestamp,
47)
48from prefect.server.utilities.encryption import decrypt_fernet, encrypt_fernet 1a
49from prefect.types._datetime import DateTime, now 1a
50from prefect.utilities.names import generate_slug 1a
52# for 'plain JSON' columns, use the postgresql variant (which comes with an
53# extra operator) and fall back to the generic JSON variant for SQLite
54sa_JSON: postgresql.JSON = postgresql.JSON().with_variant(sa.JSON(), "sqlite") 1a
57class Base(DeclarativeBase): 1a
58 """
59 Base SQLAlchemy model that automatically infers the table name
60 and provides ID, created, and updated columns
61 """
63 registry: ClassVar[RegistryType] = registry( 1a
64 metadata=sa.schema.MetaData(
65 # define naming conventions for our Base class to use
66 # sqlalchemy will use the following templated strings
67 # to generate the names of indices, constraints, and keys
68 #
69 # we offset the table name with two underscores (__) to
70 # help differentiate, for example, between "flow_run.state_type"
71 # and "flow_run_state.type".
72 #
73 # more information on this templating and available
74 # customization can be found here
75 # https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.MetaData
76 #
77 # this also allows us to avoid having to specify names explicitly
78 # when using sa.ForeignKey.use_alter = True
79 # https://docs.sqlalchemy.org/en/14/core/constraints.html
80 naming_convention={
81 "ix": "ix_%(table_name)s__%(column_0_N_name)s",
82 "uq": "uq_%(table_name)s__%(column_0_N_name)s",
83 "ck": "ck_%(table_name)s__%(constraint_name)s",
84 "fk": "fk_%(table_name)s__%(column_0_N_name)s__%(referred_table_name)s",
85 "pk": "pk_%(table_name)s",
86 }
87 ),
88 type_annotation_map={
89 uuid.UUID: UUID,
90 DateTime: Timestamp,
91 },
92 )
94 # required in order to access columns with server defaults
95 # or SQL expression defaults, subsequent to a flush, without
96 # triggering an expired load
97 #
98 # this allows us to load attributes with a server default after
99 # an INSERT, for example
100 #
101 # https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#preventing-implicit-io-when-using-asyncsession
102 __mapper_args__: dict[str, Any] = {"eager_defaults": True} 1a
104 def __repr__(self) -> str: 1a
105 return f"{self.__class__.__name__}(id={self.id})" 1bc
107 @declared_attr.directive 1a
108 def __tablename__(cls) -> str: 1a
109 """
110 By default, turn the model's camel-case class name
111 into a snake-case table name. Override by providing
112 an explicit `__tablename__` class property.
113 """
114 return CAMEL_TO_SNAKE.sub("_", cls.__name__).lower() 1a
116 id: Mapped[uuid.UUID] = mapped_column( 1a
117 primary_key=True,
118 server_default=GenerateUUID(),
119 default=uuid.uuid4,
120 )
122 created: Mapped[DateTime] = mapped_column( 1a
123 server_default=sa.func.now(), default=lambda: now("UTC")
124 )
126 # onupdate is only called when statements are actually issued
127 # against the database. until COMMIT is issued, this column
128 # will not be updated
129 updated: Mapped[DateTime] = mapped_column( 1a
130 index=True,
131 server_default=sa.func.now(),
132 default=lambda: now("UTC"),
133 onupdate=sa.func.now(),
134 server_onupdate=FetchedValue(),
135 )
138class Flow(Base): 1a
139 """SQLAlchemy mixin of a flow."""
141 name: Mapped[str] 1a
142 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a
143 labels: Mapped[Optional[schemas.core.KeyValueLabels]] = mapped_column(JSON) 1a
145 flow_runs: Mapped[list["FlowRun"]] = relationship( 1a
146 back_populates="flow", lazy="raise"
147 )
148 deployments: Mapped[list["Deployment"]] = relationship( 1a
149 back_populates="flow", lazy="raise"
150 )
152 __table_args__: Any = ( 1a
153 sa.UniqueConstraint("name"),
154 sa.Index("ix_flow__created", "created"),
155 sa.Index("trgm_ix_flow_name", "name", postgresql_using="gin").ddl_if(
156 dialect="postgresql"
157 ),
158 )
161class FlowRunState(Base): 1a
162 """SQLAlchemy mixin of a flow run state."""
164 flow_run_id: Mapped[uuid.UUID] = mapped_column( 1a
165 sa.ForeignKey("flow_run.id", ondelete="cascade")
166 )
168 type: Mapped[schemas.states.StateType] = mapped_column( 1a
169 sa.Enum(schemas.states.StateType, name="state_type"), index=True
170 )
171 timestamp: Mapped[DateTime] = mapped_column( 1a
172 server_default=sa.func.now(), default=lambda: now("UTC")
173 )
174 name: Mapped[str] = mapped_column(index=True) 1a
175 message: Mapped[Optional[str]] 1a
176 state_details: Mapped[schemas.states.StateDetails] = mapped_column( 1a
177 Pydantic(schemas.states.StateDetails),
178 server_default="{}",
179 default=schemas.states.StateDetails,
180 )
181 _data: Mapped[Optional[Any]] = mapped_column(JSON, name="data") 1a
183 result_artifact_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
184 sa.ForeignKey("artifact.id", ondelete="SET NULL", use_alter=True),
185 index=True,
186 )
188 _result_artifact: Mapped[Optional["Artifact"]] = relationship( 1a
189 lazy="selectin",
190 foreign_keys=[result_artifact_id],
191 primaryjoin="Artifact.id==FlowRunState.result_artifact_id",
192 )
194 @hybrid_property 1a
195 def data(self) -> Optional[Any]: 1a
196 if self._data: 196 ↛ 198line 196 didn't jump to line 198 because the condition on line 196 was never true1bc
197 # ensures backwards compatibility for results stored on state objects
198 return self._data
199 if not self.result_artifact_id: 199 ↛ 202line 199 didn't jump to line 202 because the condition on line 199 was always true1bc
200 # do not try to load the relationship if there's no artifact id
201 return None 1bc
202 if TYPE_CHECKING:
203 assert self._result_artifact is not None
204 return self._result_artifact.data
206 flow_run: Mapped["FlowRun"] = relationship(lazy="raise", foreign_keys=[flow_run_id]) 1a
208 def as_state(self) -> schemas.states.State: 1a
209 return schemas.states.State.model_validate(self, from_attributes=True)
211 @declared_attr.directive 1a
212 @classmethod 1a
213 def __table_args__(cls) -> Iterable[sa.Index]: 1a
214 return ( 1a
215 sa.Index(
216 "uq_flow_run_state__flow_run_id_timestamp_desc",
217 cls.flow_run_id,
218 cls.timestamp.desc(),
219 unique=True,
220 ),
221 )
224class TaskRunState(Base): 1a
225 """SQLAlchemy model of a task run state."""
227 # this column isn't explicitly indexed because it is included in
228 # the unique compound index on (task_run_id, timestamp)
229 task_run_id: Mapped[uuid.UUID] = mapped_column( 1a
230 sa.ForeignKey("task_run.id", ondelete="cascade")
231 )
233 type: Mapped[schemas.states.StateType] = mapped_column( 1a
234 sa.Enum(schemas.states.StateType, name="state_type"), index=True
235 )
236 timestamp: Mapped[DateTime] = mapped_column( 1a
237 server_default=sa.func.now(), default=lambda: now("UTC")
238 )
239 name: Mapped[str] = mapped_column(index=True) 1a
240 message: Mapped[Optional[str]] 1a
241 state_details: Mapped[schemas.states.StateDetails] = mapped_column( 1a
242 Pydantic(schemas.states.StateDetails),
243 server_default="{}",
244 default=schemas.states.StateDetails,
245 )
246 _data: Mapped[Optional[Any]] = mapped_column(JSON, name="data") 1a
248 result_artifact_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
249 sa.ForeignKey("artifact.id", ondelete="SET NULL", use_alter=True), index=True
250 )
252 _result_artifact: Mapped[Optional["Artifact"]] = relationship( 1a
253 lazy="selectin",
254 foreign_keys=[result_artifact_id],
255 primaryjoin="Artifact.id==TaskRunState.result_artifact_id",
256 )
258 @hybrid_property 1a
259 def data(self) -> Optional[Any]: 1a
260 if self._data: 260 ↛ 262line 260 didn't jump to line 262 because the condition on line 260 was never true1bdc
261 # ensures backwards compatibility for results stored on state objects
262 return self._data
263 if not self.result_artifact_id: 1bdc
264 # do not try to load the relationship if there's no artifact id
265 return None 1bdc
266 if TYPE_CHECKING: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true1bdc
267 assert self._result_artifact is not None
268 return self._result_artifact.data 1bdc
270 task_run: Mapped["TaskRun"] = relationship(lazy="raise", foreign_keys=[task_run_id]) 1a
272 def as_state(self) -> schemas.states.State: 1a
273 return schemas.states.State.model_validate(self, from_attributes=True)
275 @declared_attr.directive 1a
276 @classmethod 1a
277 def __table_args__(cls) -> Iterable[sa.Index]: 1a
278 return ( 1a
279 sa.Index(
280 "uq_task_run_state__task_run_id_timestamp_desc",
281 cls.task_run_id,
282 cls.timestamp.desc(),
283 unique=True,
284 ),
285 )
288class Artifact(Base): 1a
289 """
290 SQLAlchemy model of artifacts.
291 """
293 key: Mapped[Optional[str]] = mapped_column(index=True) 1a
295 task_run_id: Mapped[Optional[uuid.UUID]] = mapped_column(index=True) 1a
297 flow_run_id: Mapped[Optional[uuid.UUID]] = mapped_column(index=True) 1a
299 type: Mapped[Optional[str]] 1a
300 data: Mapped[Optional[Any]] = mapped_column(sa_JSON) 1a
301 description: Mapped[Optional[str]] 1a
303 # Suffixed with underscore as attribute name 'metadata' is reserved for the MetaData instance when using a declarative base class.
304 metadata_: Mapped[Optional[dict[str, str]]] = mapped_column(sa_JSON) 1a
306 @declared_attr.directive 1a
307 @classmethod 1a
308 def __table_args__(cls) -> Iterable[sa.Index]: 1a
309 return ( 1a
310 sa.Index(
311 "ix_artifact__key",
312 cls.key,
313 ),
314 sa.Index(
315 "ix_artifact__key_created_desc",
316 cls.key,
317 cls.created.desc(),
318 postgresql_include=[
319 "id",
320 "updated",
321 "type",
322 "task_run_id",
323 "flow_run_id",
324 ],
325 ),
326 )
329class ArtifactCollection(Base): 1a
330 key: Mapped[str] 1a
332 latest_id: Mapped[uuid.UUID] 1a
334 task_run_id: Mapped[Optional[uuid.UUID]] 1a
336 flow_run_id: Mapped[Optional[uuid.UUID]] 1a
338 type: Mapped[Optional[str]] 1a
339 data: Mapped[Optional[Any]] = mapped_column(sa_JSON) 1a
340 description: Mapped[Optional[str]] 1a
341 metadata_: Mapped[Optional[dict[str, str]]] = mapped_column(sa_JSON) 1a
343 __table_args__: Any = ( 1a
344 sa.UniqueConstraint("key"),
345 sa.Index(
346 "ix_artifact_collection__key_latest_id",
347 "key",
348 "latest_id",
349 ),
350 )
353class TaskRunStateCache(Base): 1a
354 """
355 SQLAlchemy model of a task run state cache.
356 """
358 cache_key: Mapped[str] = mapped_column() 1a
359 cache_expiration: Mapped[Optional[DateTime]] 1a
360 task_run_state_id: Mapped[uuid.UUID] 1a
362 @declared_attr.directive 1a
363 @classmethod 1a
364 def __table_args__(cls) -> Iterable[sa.Index]: 1a
365 return ( 1a
366 sa.Index(
367 "ix_task_run_state_cache__cache_key_created_desc",
368 cls.cache_key,
369 cls.created.desc(),
370 ),
371 )
374class Run(Base): 1a
375 """
376 Common columns and logic for FlowRun and TaskRun models
377 """
379 __abstract__ = True 1a
381 name: Mapped[str] = mapped_column(default=lambda: generate_slug(2), index=True) 1a
382 state_type: Mapped[Optional[schemas.states.StateType]] = mapped_column( 1a
383 sa.Enum(schemas.states.StateType, name="state_type")
384 )
385 state_name: Mapped[Optional[str]] 1a
386 state_timestamp: Mapped[Optional[DateTime]] 1a
387 run_count: Mapped[int] = mapped_column(server_default="0", default=0) 1a
388 expected_start_time: Mapped[Optional[DateTime]] 1a
389 next_scheduled_start_time: Mapped[Optional[DateTime]] 1a
390 start_time: Mapped[Optional[DateTime]] 1a
391 end_time: Mapped[Optional[DateTime]] 1a
392 total_run_time: Mapped[datetime.timedelta] = mapped_column( 1a
393 server_default="0", default=datetime.timedelta(0)
394 )
396 @hybrid_property 1a
397 def estimated_run_time(self) -> datetime.timedelta: 1a
398 """Total run time is incremented in the database whenever a RUNNING
399 state is exited. To give up-to-date estimates, we estimate incremental
400 run time for any runs currently in a RUNNING state."""
401 if self.state_type and self.state_type == schemas.states.StateType.RUNNING: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true1bdc
402 if TYPE_CHECKING:
403 assert self.state_timestamp is not None
404 return self.total_run_time + (now("UTC") - self.state_timestamp)
405 else:
406 return self.total_run_time 1bdc
408 @estimated_run_time.inplace.expression 1a
409 @classmethod 1a
410 def _estimated_run_time_expression(cls) -> sa.Label[datetime.timedelta]: 1a
411 return (
412 sa.select(
413 sa.case(
414 (
415 cls.state_type == schemas.states.StateType.RUNNING,
416 sa.func.interval_add(
417 cls.total_run_time,
418 sa.func.date_diff(sa.func.now(), cls.state_timestamp),
419 ),
420 ),
421 else_=cls.total_run_time,
422 )
423 )
424 # add a correlate statement so this can reuse the `FROM` clause
425 # of any parent query
426 .correlate(cls)
427 .label("estimated_run_time")
428 )
430 @hybrid_property 1a
431 def estimated_start_time_delta(self) -> datetime.timedelta: 1a
432 """The delta to the expected start time (or "lateness") is computed as
433 the difference between the actual start time and expected start time. To
434 give up-to-date estimates, we estimate lateness for any runs that don't
435 have a start time and are not in a final state and were expected to
436 start already."""
437 if ( 437 ↛ 442line 437 didn't jump to line 442 because the condition on line 437 was never true
438 self.start_time
439 and self.expected_start_time is not None
440 and self.start_time > (self.expected_start_time)
441 ):
442 return self.start_time - self.expected_start_time
443 elif (
444 self.start_time is None
445 and self.expected_start_time
446 and self.expected_start_time < now("UTC")
447 and self.state_type not in schemas.states.TERMINAL_STATES
448 ):
449 return now("UTC") - self.expected_start_time 1bdc
450 else:
451 return datetime.timedelta(0) 1bdc
453 @estimated_start_time_delta.inplace.expression 1a
454 @classmethod 1a
455 def _estimated_start_time_delta_expression( 1a
456 cls,
457 ) -> sa.SQLColumnExpression[datetime.timedelta]:
458 return sa.case(
459 (
460 cls.start_time > cls.expected_start_time,
461 sa.func.date_diff(cls.start_time, cls.expected_start_time),
462 ),
463 (
464 sa.and_(
465 cls.start_time.is_(None),
466 cls.state_type.not_in(schemas.states.TERMINAL_STATES),
467 cls.expected_start_time < sa.func.now(),
468 ),
469 sa.func.date_diff(sa.func.now(), cls.expected_start_time),
470 ),
471 else_=datetime.timedelta(0),
472 )
475class FlowRun(Run): 1a
476 """SQLAlchemy model of a flow run."""
478 flow_id: Mapped[uuid.UUID] = mapped_column( 1a
479 sa.ForeignKey("flow.id", ondelete="cascade"), index=True
480 )
482 deployment_id: Mapped[Optional[uuid.UUID]] = mapped_column() 1a
483 work_queue_name: Mapped[Optional[str]] = mapped_column(index=True) 1a
484 flow_version: Mapped[Optional[str]] = mapped_column(index=True) 1a
485 deployment_version: Mapped[Optional[str]] = mapped_column(index=True) 1a
486 parameters: Mapped[dict[str, Any]] = mapped_column( 1a
487 JSON, server_default="{}", default=dict
488 )
489 idempotency_key: Mapped[Optional[str]] = mapped_column() 1a
490 context: Mapped[dict[str, Any]] = mapped_column( 1a
491 JSON, server_default="{}", default=dict
492 )
493 empirical_policy: Mapped[schemas.core.FlowRunPolicy] = mapped_column( 1a
494 Pydantic(schemas.core.FlowRunPolicy),
495 server_default="{}",
496 default=schemas.core.FlowRunPolicy,
497 )
498 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a
499 labels: Mapped[Optional[schemas.core.KeyValueLabels]] = mapped_column(JSON) 1a
501 created_by: Mapped[Optional[schemas.core.CreatedBy]] = mapped_column( 1a
502 Pydantic(schemas.core.CreatedBy)
503 )
505 infrastructure_pid: Mapped[Optional[str]] 1a
506 job_variables: Mapped[Optional[dict[str, Any]]] = mapped_column( 1a
507 JSON, server_default="{}", default=dict
508 )
510 infrastructure_document_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
511 sa.ForeignKey("block_document.id", ondelete="CASCADE"), index=True
512 )
514 parent_task_run_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
515 sa.ForeignKey("task_run.id", ondelete="SET NULL", use_alter=True), index=True
516 )
518 auto_scheduled: Mapped[bool] = mapped_column(server_default="0", default=False) 1a
520 # TODO remove this foreign key for significant delete performance gains
521 state_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
522 sa.ForeignKey("flow_run_state.id", ondelete="SET NULL", use_alter=True),
523 index=True,
524 )
526 work_queue_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
527 sa.ForeignKey("work_queue.id", ondelete="SET NULL"), index=True
528 )
530 # -------------------------- relationships
532 # current states are eagerly loaded unless otherwise specified
533 _state: Mapped[Optional["FlowRunState"]] = relationship( 1a
534 lazy="selectin",
535 foreign_keys=[state_id],
536 primaryjoin="FlowRunState.id==FlowRun.state_id",
537 )
539 @hybrid_property 1a
540 def state(self) -> Optional[FlowRunState]: 1a
541 return self._state 1bc
543 @state.inplace.setter 1a
544 def _set_state(self, value: Optional[FlowRunState]) -> None: 1a
545 # because this is a slightly non-standard SQLAlchemy relationship, we
546 # prefer an explicit setter method to a setter property, because
547 # user expectations about SQLAlchemy attribute assignment might not be
548 # met, namely that an unrelated (from SQLAlchemy's perspective) field of
549 # the provided state is also modified. However, property assignment
550 # still works because the ORM model's __init__ depends on it.
551 return self.set_state(value)
553 def set_state(self, state: Optional[FlowRunState]) -> None: 1a
554 """
555 If a state is assigned to this run, populate its run id.
557 This would normally be handled by the back-populated SQLAlchemy
558 relationship, but because this is a one-to-one pointer to a
559 one-to-many relationship, SQLAlchemy can't figure it out.
560 """
561 if state is not None: 561 ↛ 563line 561 didn't jump to line 563 because the condition on line 561 was always true1bc
562 state.flow_run_id = self.id 1bc
563 self._state = state 1bc
565 flow: Mapped["Flow"] = relationship(back_populates="flow_runs", lazy="raise") 1a
567 task_runs: Mapped[list["TaskRun"]] = relationship( 1a
568 back_populates="flow_run",
569 lazy="raise",
570 # foreign_keys=lambda: [flow_run_id],
571 primaryjoin="TaskRun.flow_run_id==FlowRun.id",
572 )
574 parent_task_run: Mapped[Optional["TaskRun"]] = relationship( 1a
575 back_populates="subflow_run",
576 lazy="raise",
577 foreign_keys=[parent_task_run_id],
578 )
580 work_queue: Mapped[Optional["WorkQueue"]] = relationship( 1a
581 lazy="selectin", foreign_keys=[work_queue_id]
582 )
584 @declared_attr.directive 1a
585 @classmethod 1a
586 def __table_args__(cls) -> Iterable[sa.Index]: 1a
587 return ( 1a
588 sa.Index(
589 "uq_flow_run__flow_id_idempotency_key",
590 cls.flow_id,
591 cls.idempotency_key,
592 unique=True,
593 ),
594 sa.Index(
595 "ix_flow_run__coalesce_start_time_expected_start_time_desc",
596 coalesce(cls.start_time, cls.expected_start_time).desc(),
597 ),
598 sa.Index(
599 "ix_flow_run__coalesce_start_time_expected_start_time_asc",
600 coalesce(cls.start_time, cls.expected_start_time).asc(),
601 ),
602 sa.Index(
603 "ix_flow_run__expected_start_time_desc",
604 cls.expected_start_time.desc(),
605 ),
606 sa.Index(
607 "ix_flow_run__next_scheduled_start_time_asc",
608 cls.next_scheduled_start_time.asc(),
609 ),
610 sa.Index(
611 "ix_flow_run__end_time_desc",
612 cls.end_time.desc(),
613 ),
614 sa.Index(
615 "ix_flow_run__start_time",
616 cls.start_time,
617 ),
618 sa.Index(
619 "ix_flow_run__state_type",
620 cls.state_type,
621 ),
622 sa.Index(
623 "ix_flow_run__state_name",
624 cls.state_name,
625 ),
626 sa.Index(
627 "ix_flow_run__state_timestamp",
628 cls.state_timestamp,
629 ),
630 sa.Index("trgm_ix_flow_run_name", cls.name, postgresql_using="gin").ddl_if(
631 dialect="postgresql"
632 ),
633 sa.Index(
634 # index names are at most 63 characters long.
635 "ix_flow_run__scheduler_deployment_id_auto_scheduled_next_schedu",
636 cls.deployment_id,
637 cls.auto_scheduled,
638 cls.next_scheduled_start_time,
639 postgresql_where=cls.state_type == schemas.states.StateType.SCHEDULED,
640 sqlite_where=cls.state_type == schemas.states.StateType.SCHEDULED,
641 ),
642 )
645_TaskInput = Union[ 1a
646 schemas.core.TaskRunResult,
647 schemas.core.FlowRunResult,
648 schemas.core.Parameter,
649 schemas.core.Constant,
650]
651_TaskInputs = dict[str, list[_TaskInput]] 1a
654class TaskRun(Run): 1a
655 """SQLAlchemy model of a task run."""
657 flow_run_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
658 sa.ForeignKey("flow_run.id", ondelete="cascade"), index=True
659 )
661 task_key: Mapped[str] = mapped_column() 1a
662 dynamic_key: Mapped[str] = mapped_column() 1a
663 cache_key: Mapped[Optional[str]] 1a
664 cache_expiration: Mapped[Optional[DateTime]] 1a
665 task_version: Mapped[Optional[str]] 1a
666 flow_run_run_count: Mapped[int] = mapped_column(server_default="0", default=0) 1a
667 empirical_policy: Mapped[schemas.core.TaskRunPolicy] = mapped_column( 1a
668 Pydantic(schemas.core.TaskRunPolicy),
669 server_default="{}",
670 default=schemas.core.TaskRunPolicy,
671 )
672 task_inputs: Mapped[_TaskInputs] = mapped_column( 1a
673 Pydantic(_TaskInputs), server_default="{}", default=dict
674 )
675 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a
676 labels: Mapped[Optional[schemas.core.KeyValueLabels]] = mapped_column(JSON) 1a
678 state_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
679 UUID,
680 index=True,
681 )
683 # -------------------------- relationships
685 # current states are eagerly loaded unless otherwise specified
686 _state: Mapped[Optional[TaskRunState]] = relationship( 1a
687 lazy="selectin",
688 foreign_keys=[state_id],
689 primaryjoin="TaskRunState.id==TaskRun.state_id",
690 )
692 @hybrid_property 1a
693 def state(self) -> Optional[TaskRunState]: 1a
694 return self._state 1bdc
696 @state.inplace.setter 1a
697 def _set_state(self, value: Optional[TaskRunState]) -> None: 1a
698 # because this is a slightly non-standard SQLAlchemy relationship, we
699 # prefer an explicit setter method to a setter property, because
700 # user expectations about SQLAlchemy attribute assignment might not be
701 # met, namely that an unrelated (from SQLAlchemy's perspective) field of
702 # the provided state is also modified. However, property assignment
703 # still works because the ORM model's __init__ depends on it.
704 return self.set_state(value) 1bdc
706 def set_state(self, state: Optional[TaskRunState]) -> None: 1a
707 """
708 If a state is assigned to this run, populate its run id.
710 This would normally be handled by the back-populated SQLAlchemy
711 relationship, but because this is a one-to-one pointer to a
712 one-to-many relationship, SQLAlchemy can't figure it out.
713 """
714 if state is not None: 1bdc
715 state.task_run_id = self.id 1bdc
716 self._state = state 1bdc
718 flow_run: Mapped[Optional["FlowRun"]] = relationship( 1a
719 back_populates="task_runs",
720 lazy="raise",
721 foreign_keys=[flow_run_id],
722 )
724 subflow_run: Mapped["FlowRun"] = relationship( 1a
725 back_populates="parent_task_run",
726 lazy="raise",
727 # foreign_keys=["FlowRun.parent_task_run_id"],
728 primaryjoin="FlowRun.parent_task_run_id==TaskRun.id",
729 uselist=False,
730 )
732 @declared_attr.directive 1a
733 @classmethod 1a
734 def __table_args__(cls) -> Iterable[sa.Index]: 1a
735 return ( 1a
736 sa.Index(
737 "uq_task_run__flow_run_id_task_key_dynamic_key",
738 cls.flow_run_id,
739 cls.task_key,
740 cls.dynamic_key,
741 unique=True,
742 ),
743 sa.Index(
744 "ix_task_run__expected_start_time_desc",
745 cls.expected_start_time.desc(),
746 ),
747 sa.Index(
748 "ix_task_run__next_scheduled_start_time_asc",
749 cls.next_scheduled_start_time.asc(),
750 ),
751 sa.Index(
752 "ix_task_run__end_time_desc",
753 cls.end_time.desc(),
754 ),
755 sa.Index(
756 "ix_task_run__start_time",
757 cls.start_time,
758 ),
759 sa.Index(
760 "ix_task_run__state_type",
761 cls.state_type,
762 ),
763 sa.Index(
764 "ix_task_run__state_name",
765 cls.state_name,
766 ),
767 sa.Index(
768 "ix_task_run__state_timestamp",
769 cls.state_timestamp,
770 ),
771 sa.Index("trgm_ix_task_run_name", cls.name, postgresql_using="gin").ddl_if(
772 dialect="postgresql"
773 ),
774 )
777class DeploymentSchedule(Base): 1a
778 deployment_id: Mapped[uuid.UUID] = mapped_column( 1a
779 sa.ForeignKey("deployment.id", ondelete="CASCADE"), index=True
780 )
782 schedule: Mapped[schemas.schedules.SCHEDULE_TYPES] = mapped_column( 1a
783 Pydantic(schemas.schedules.SCHEDULE_TYPES)
784 )
785 active: Mapped[bool] = mapped_column(default=True) 1a
786 max_scheduled_runs: Mapped[Optional[int]] 1a
787 parameters: Mapped[dict[str, Any]] = mapped_column( 1a
788 JSON, server_default="{}", default=dict, nullable=False
789 )
790 slug: Mapped[Optional[str]] = mapped_column(sa.String, nullable=True) 1a
792 @declared_attr.directive 1a
793 @classmethod 1a
794 def __table_args__(cls) -> Iterable[sa.Index]: 1a
795 return ( 1a
796 sa.Index(
797 "ix_deployment_schedule__deployment_id__slug",
798 cls.deployment_id,
799 cls.slug,
800 unique=True,
801 ),
802 sa.Index(
803 "ix_deployment_schedule__slug",
804 cls.slug,
805 unique=False,
806 ),
807 )
810class Deployment(Base): 1a
811 """SQLAlchemy model of a deployment."""
813 name: Mapped[str] 1a
814 version: Mapped[Optional[str]] 1a
815 description: Mapped[Optional[str]] = mapped_column(sa.Text()) 1a
816 work_queue_name: Mapped[Optional[str]] = mapped_column(index=True) 1a
817 infra_overrides: Mapped[dict[str, Any]] = mapped_column( 1a
818 JSON, server_default="{}", default=dict
819 )
820 path: Mapped[Optional[str]] 1a
821 entrypoint: Mapped[Optional[str]] 1a
823 last_polled: Mapped[Optional[DateTime]] 1a
824 status: Mapped[DeploymentStatus] = mapped_column( 1a
825 sa.Enum(DeploymentStatus, name="deployment_status"),
826 default=DeploymentStatus.NOT_READY,
827 server_default="NOT_READY",
828 )
830 @declared_attr 1a
831 def job_variables(self) -> Mapped[dict[str, Any]]: 1a
832 return synonym("infra_overrides") 1a
834 flow_id: Mapped[uuid.UUID] = mapped_column( 1a
835 sa.ForeignKey("flow.id", ondelete="CASCADE"), index=True
836 )
838 work_queue_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
839 sa.ForeignKey("work_queue.id", ondelete="SET NULL"), index=True
840 )
841 paused: Mapped[bool] = mapped_column(server_default="0", default=False, index=True) 1a
843 schedules: Mapped[list["DeploymentSchedule"]] = relationship( 1a
844 lazy="selectin", order_by=lambda: DeploymentSchedule.updated.desc()
845 )
847 # deprecated in favor of `concurrency_limit_id` FK
848 _concurrency_limit: Mapped[Optional[int]] = mapped_column(name="concurrency_limit") 1a
849 concurrency_limit_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
850 sa.ForeignKey("concurrency_limit_v2.id", ondelete="SET NULL"),
851 )
852 global_concurrency_limit: Mapped[Optional["ConcurrencyLimitV2"]] = relationship( 1a
853 lazy="selectin",
854 )
855 concurrency_options: Mapped[Optional[schemas.core.ConcurrencyOptions]] = ( 1a
856 mapped_column(
857 Pydantic(schemas.core.ConcurrencyOptions),
858 server_default=None,
859 nullable=True,
860 default=None,
861 )
862 )
864 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a
865 labels: Mapped[Optional[schemas.core.KeyValueLabels]] = mapped_column(JSON) 1a
866 parameters: Mapped[dict[str, Any]] = mapped_column( 1a
867 JSON, server_default="{}", default=dict
868 )
869 pull_steps: Mapped[Optional[list[dict[str, Any]]]] = mapped_column( 1a
870 JSON, default=list
871 )
872 parameter_openapi_schema: Mapped[Optional[dict[str, Any]]] = mapped_column( 1a
873 JSON, default=dict
874 )
875 enforce_parameter_schema: Mapped[bool] = mapped_column( 1a
876 default=True, server_default="0"
877 )
878 created_by: Mapped[Optional[schemas.core.CreatedBy]] = mapped_column( 1a
879 Pydantic(schemas.core.CreatedBy)
880 )
881 updated_by: Mapped[Optional[schemas.core.UpdatedBy]] = mapped_column( 1a
882 Pydantic(schemas.core.UpdatedBy)
883 )
885 infrastructure_document_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
886 sa.ForeignKey("block_document.id", ondelete="CASCADE"), index=False
887 )
889 storage_document_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
890 sa.ForeignKey("block_document.id", ondelete="CASCADE"),
891 index=False,
892 )
894 flow: Mapped["Flow"] = relationship( 1a
895 "Flow", back_populates="deployments", lazy="raise"
896 )
898 work_queue: Mapped[Optional["WorkQueue"]] = relationship( 1a
899 lazy="selectin", foreign_keys=[work_queue_id]
900 )
902 __table_args__: Any = ( 1a
903 sa.Index(
904 "uq_deployment__flow_id_name",
905 "flow_id",
906 "name",
907 unique=True,
908 ),
909 sa.Index(
910 "ix_deployment__created",
911 "created",
912 ),
913 sa.Index("trgm_ix_deployment_name", "name", postgresql_using="gin").ddl_if(
914 dialect="postgresql"
915 ),
916 )
919class Log(Base): 1a
920 """
921 SQLAlchemy model of a logging statement.
922 """
924 name: Mapped[str] 1a
925 level: Mapped[int] = mapped_column(sa.SmallInteger, index=True) 1a
926 flow_run_id: Mapped[Optional[uuid.UUID]] = mapped_column(index=True) 1a
927 task_run_id: Mapped[Optional[uuid.UUID]] = mapped_column(index=True) 1a
928 message: Mapped[str] = mapped_column(sa.Text) 1a
930 # The client-side timestamp of this logged statement.
931 timestamp: Mapped[DateTime] = mapped_column(index=True) 1a
933 __table_args__: Any = ( 1a
934 sa.Index(
935 "ix_log__flow_run_id_timestamp",
936 "flow_run_id",
937 "timestamp",
938 ),
939 )
942class ConcurrencyLimit(Base): 1a
943 tag: Mapped[str] 1a
944 concurrency_limit: Mapped[int] 1a
945 active_slots: Mapped[list[str]] = mapped_column( 1a
946 JSON, server_default="[]", default=list
947 )
949 __table_args__: Any = (sa.Index("uq_concurrency_limit__tag", "tag", unique=True),) 1a
952class ConcurrencyLimitV2(Base): 1a
953 active: Mapped[bool] = mapped_column(default=True) 1a
954 name: Mapped[str] 1a
955 limit: Mapped[int] 1a
956 active_slots: Mapped[int] = mapped_column(default=0) 1a
957 denied_slots: Mapped[int] = mapped_column(default=0) 1a
959 slot_decay_per_second: Mapped[float] = mapped_column(default=0.0) 1a
960 avg_slot_occupancy_seconds: Mapped[float] = mapped_column(default=2.0) 1a
962 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a
965class BlockType(Base): 1a
966 name: Mapped[str] 1a
967 slug: Mapped[str] 1a
968 logo_url: Mapped[Optional[str]] 1a
969 documentation_url: Mapped[Optional[str]] 1a
970 description: Mapped[Optional[str]] 1a
971 code_example: Mapped[Optional[str]] 1a
972 is_protected: Mapped[bool] = mapped_column(server_default="0", default=False) 1a
974 __table_args__: Any = ( 1a
975 sa.Index(
976 "uq_block_type__slug",
977 "slug",
978 unique=True,
979 ),
980 sa.Index("trgm_ix_block_type_name", "name", postgresql_using="gin").ddl_if(
981 dialect="postgresql"
982 ),
983 )
986class BlockSchema(Base): 1a
987 checksum: Mapped[str] 1a
988 fields: Mapped[dict[str, Any]] = mapped_column( 1a
989 JSON, server_default="{}", default=dict
990 )
991 capabilities: Mapped[list[str]] = mapped_column( 1a
992 JSON, server_default="[]", default=list
993 )
994 version: Mapped[str] = mapped_column( 1a
995 server_default=schemas.core.DEFAULT_BLOCK_SCHEMA_VERSION,
996 )
998 block_type_id: Mapped[uuid.UUID] = mapped_column( 1a
999 sa.ForeignKey("block_type.id", ondelete="cascade"), index=True
1000 )
1002 block_type: Mapped["BlockType"] = relationship(lazy="selectin") 1a
1004 __table_args__: Any = ( 1a
1005 sa.Index(
1006 "uq_block_schema__checksum_version",
1007 "checksum",
1008 "version",
1009 unique=True,
1010 ),
1011 sa.Index("ix_block_schema__created", "created"),
1012 sa.Index(
1013 "ix_block_schema__capabilities", "capabilities", postgresql_using="gin"
1014 ).ddl_if(dialect="postgresql"),
1015 )
1018class BlockSchemaReference(Base): 1a
1019 name: Mapped[str] 1a
1021 parent_block_schema_id: Mapped[uuid.UUID] = mapped_column( 1a
1022 sa.ForeignKey("block_schema.id", ondelete="cascade")
1023 )
1025 reference_block_schema_id: Mapped[uuid.UUID] = mapped_column( 1a
1026 sa.ForeignKey("block_schema.id", ondelete="cascade")
1027 )
1030class BlockDocument(Base): 1a
1031 name: Mapped[str] = mapped_column(index=True) 1a
1032 data: Mapped[Any] = mapped_column(JSON, server_default="{}", default=dict) 1a
1033 is_anonymous: Mapped[bool] = mapped_column(server_default="0", index=True) 1a
1035 block_type_name: Mapped[Optional[str]] 1a
1037 block_type_id: Mapped[uuid.UUID] = mapped_column( 1a
1038 sa.ForeignKey("block_type.id", ondelete="cascade")
1039 )
1041 block_type: Mapped["BlockType"] = relationship(lazy="selectin") 1a
1043 block_schema_id: Mapped[uuid.UUID] = mapped_column( 1a
1044 sa.ForeignKey("block_schema.id", ondelete="cascade")
1045 )
1047 block_schema: Mapped["BlockSchema"] = relationship(lazy="selectin") 1a
1049 __table_args__: Any = ( 1a
1050 sa.Index(
1051 "uq_block__type_id_name",
1052 "block_type_id",
1053 "name",
1054 unique=True,
1055 ),
1056 sa.Index("ix_block_document__block_type_name__name", "block_type_name", "name"),
1057 sa.Index("trgm_ix_block_document_name", "name", postgresql_using="gin").ddl_if(
1058 dialect="postgresql"
1059 ),
1060 )
1062 async def encrypt_data(self, session: AsyncSession, data: dict[str, Any]) -> None: 1a
1063 """
1064 Store encrypted data on the ORM model
1066 Note: will only succeed if the caller has sufficient permission.
1067 """
1068 self.data = await encrypt_fernet(session, data)
1070 async def decrypt_data(self, session: AsyncSession) -> dict[str, Any]: 1a
1071 """
1072 Retrieve decrypted data from the ORM model.
1074 Note: will only succeed if the caller has sufficient permission.
1075 """
1076 return await decrypt_fernet(session, self.data)
1079class BlockDocumentReference(Base): 1a
1080 name: Mapped[str] 1a
1082 parent_block_document_id: Mapped[uuid.UUID] = mapped_column( 1a
1083 sa.ForeignKey("block_document.id", ondelete="cascade"),
1084 )
1086 reference_block_document_id: Mapped[uuid.UUID] = mapped_column( 1a
1087 sa.ForeignKey("block_document.id", ondelete="cascade"),
1088 )
1091class Configuration(Base): 1a
1092 key: Mapped[str] = mapped_column(index=True) 1a
1093 value: Mapped[dict[str, Any]] = mapped_column(JSON) 1a
1095 __table_args__: Any = (sa.UniqueConstraint("key"),) 1a
1098class SavedSearch(Base): 1a
1099 """SQLAlchemy model of a saved search."""
1101 name: Mapped[str] 1a
1102 filters: Mapped[list[dict[str, Any]]] = mapped_column( 1a
1103 JSON, server_default="[]", default=list
1104 )
1106 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a
1109class WorkQueue(Base): 1a
1110 """SQLAlchemy model of a work queue"""
1112 name: Mapped[str] 1a
1114 filter: Mapped[Optional[schemas.core.QueueFilter]] = mapped_column( 1a
1115 Pydantic(schemas.core.QueueFilter)
1116 )
1117 description: Mapped[str] = mapped_column(default="", server_default="") 1a
1118 is_paused: Mapped[bool] = mapped_column(server_default="0", default=False) 1a
1119 concurrency_limit: Mapped[Optional[int]] 1a
1120 priority: Mapped[int] 1a
1122 last_polled: Mapped[Optional[DateTime]] 1a
1123 status: Mapped[WorkQueueStatus] = mapped_column( 1a
1124 sa.Enum(WorkQueueStatus, name="work_queue_status"),
1125 default=WorkQueueStatus.NOT_READY,
1126 server_default=WorkQueueStatus.NOT_READY,
1127 )
1129 work_pool_id: Mapped[uuid.UUID] = mapped_column( 1a
1130 sa.ForeignKey("work_pool.id", ondelete="cascade"), index=True
1131 )
1133 work_pool: Mapped["WorkPool"] = relationship( 1a
1134 lazy="selectin", foreign_keys=[work_pool_id]
1135 )
1137 __table_args__: ClassVar[Any] = ( 1a
1138 sa.UniqueConstraint("work_pool_id", "name"),
1139 sa.Index("ix_work_queue__work_pool_id_priority", "work_pool_id", "priority"),
1140 sa.Index("trgm_ix_work_queue_name", "name", postgresql_using="gin").ddl_if(
1141 dialect="postgresql"
1142 ),
1143 )
1146class WorkPool(Base): 1a
1147 """SQLAlchemy model of an worker"""
1149 name: Mapped[str] 1a
1150 description: Mapped[Optional[str]] 1a
1151 type: Mapped[str] = mapped_column(index=True) 1a
1152 base_job_template: Mapped[dict[str, Any]] = mapped_column( 1a
1153 JSON, server_default="{}", default={}
1154 )
1155 is_paused: Mapped[bool] = mapped_column(server_default="0", default=False) 1a
1156 default_queue_id: Mapped[Optional[uuid.UUID]] = mapped_column( 1a
1157 UUID,
1158 sa.ForeignKey("work_queue.id", ondelete="RESTRICT", use_alter=True),
1159 nullable=True,
1160 )
1161 concurrency_limit: Mapped[Optional[int]] 1a
1163 status: Mapped[WorkPoolStatus] = mapped_column( 1a
1164 sa.Enum(WorkPoolStatus, name="work_pool_status"),
1165 default=WorkPoolStatus.NOT_READY,
1166 server_default=WorkPoolStatus.NOT_READY,
1167 )
1168 last_transitioned_status_at: Mapped[Optional[DateTime]] 1a
1169 last_status_event_id: Mapped[Optional[uuid.UUID]] 1a
1171 storage_configuration: Mapped[schemas.core.WorkPoolStorageConfiguration] = ( 1a
1172 mapped_column(
1173 Pydantic(schemas.core.WorkPoolStorageConfiguration),
1174 server_default="{}",
1175 default=schemas.core.WorkPoolStorageConfiguration,
1176 nullable=False,
1177 )
1178 )
1180 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a
1183class Worker(Base): 1a
1184 """SQLAlchemy model of an worker"""
1186 work_pool_id: Mapped[uuid.UUID] = mapped_column( 1a
1187 sa.ForeignKey("work_pool.id", ondelete="cascade"), index=True
1188 )
1190 name: Mapped[str] 1a
1191 last_heartbeat_time: Mapped[DateTime] = mapped_column( 1a
1192 server_default=sa.func.now(), default=lambda: now("UTC")
1193 )
1194 heartbeat_interval_seconds: Mapped[Optional[int]] 1a
1196 status: Mapped[WorkerStatus] = mapped_column( 1a
1197 sa.Enum(WorkerStatus, name="worker_status"),
1198 default=WorkerStatus.OFFLINE,
1199 server_default=WorkerStatus.OFFLINE,
1200 )
1202 __table_args__: Any = ( 1a
1203 sa.UniqueConstraint("work_pool_id", "name"),
1204 sa.Index(
1205 "ix_worker__work_pool_id_last_heartbeat_time",
1206 "work_pool_id",
1207 "last_heartbeat_time",
1208 ),
1209 )
1212class Agent(Base): 1a
1213 """SQLAlchemy model of an agent"""
1215 name: Mapped[str] 1a
1217 work_queue_id: Mapped[uuid.UUID] = mapped_column( 1a
1218 sa.ForeignKey("work_queue.id"), index=True
1219 )
1221 last_activity_time: Mapped[DateTime] = mapped_column( 1a
1222 server_default=sa.func.now(), default=lambda: now("UTC")
1223 )
1225 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a
1228class Variable(Base): 1a
1229 name: Mapped[str] 1a
1230 value: Mapped[Optional[Any]] = mapped_column(JSON) 1a
1231 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a
1233 __table_args__: Any = (sa.UniqueConstraint("name"),) 1a
1236class FlowRunInput(Base): 1a
1237 flow_run_id: Mapped[uuid.UUID] = mapped_column( 1a
1238 sa.ForeignKey("flow_run.id", ondelete="cascade")
1239 )
1241 key: Mapped[str] 1a
1242 value: Mapped[str] = mapped_column(sa.Text()) 1a
1243 sender: Mapped[Optional[str]] 1a
1245 __table_args__: Any = (sa.UniqueConstraint("flow_run_id", "key"),) 1a
1248class CsrfToken(Base): 1a
1249 token: Mapped[str] 1a
1250 client: Mapped[str] = mapped_column(unique=True) 1a
1251 expiration: Mapped[DateTime] 1a
1254class Automation(Base): 1a
1255 name: Mapped[str] 1a
1256 description: Mapped[str] = mapped_column(default="") 1a
1258 enabled: Mapped[bool] = mapped_column(server_default="1", default=True) 1a
1259 tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 1a
1261 trigger: Mapped[ServerTriggerTypes] = mapped_column(Pydantic(ServerTriggerTypes)) 1a
1263 actions: Mapped[ServerActionTypes] = mapped_column( 1a
1264 Pydantic(list[ServerActionTypes])
1265 )
1266 actions_on_trigger: Mapped[list[ServerActionTypes]] = mapped_column( 1a
1267 Pydantic(list[ServerActionTypes]), server_default="[]", default=list
1268 )
1269 actions_on_resolve: Mapped[list[ServerActionTypes]] = mapped_column( 1a
1270 Pydantic(list[ServerActionTypes]), server_default="[]", default=list
1271 )
1273 related_resources: Mapped[list["AutomationRelatedResource"]] = relationship( 1a
1274 "AutomationRelatedResource", back_populates="automation", lazy="raise"
1275 )
1277 @classmethod 1a
1278 def sort_expression(cls, value: AutomationSort) -> sa.ColumnExpressionArgument[Any]: 1a
1279 """Return an expression used to sort Automations"""
1280 sort_mapping: dict[AutomationSort, sa.ColumnExpressionArgument[Any]] = { 1bdc
1281 AutomationSort.CREATED_DESC: cls.created.desc(),
1282 AutomationSort.UPDATED_DESC: cls.updated.desc(),
1283 AutomationSort.NAME_ASC: cls.name.asc(),
1284 AutomationSort.NAME_DESC: cls.name.desc(),
1285 }
1286 return sort_mapping[value] 1bdc
1289class AutomationBucket(Base): 1a
1290 __table_args__: Any = ( 1a
1291 sa.Index(
1292 "uq_automation_bucket__automation_id__trigger_id__bucketing_key",
1293 "automation_id",
1294 "trigger_id",
1295 "bucketing_key",
1296 unique=True,
1297 ),
1298 sa.Index(
1299 "ix_automation_bucket__automation_id__end",
1300 "automation_id",
1301 "end",
1302 ),
1303 )
1305 automation_id: Mapped[uuid.UUID] = mapped_column( 1a
1306 sa.ForeignKey("automation.id", ondelete="CASCADE")
1307 )
1309 trigger_id: Mapped[uuid.UUID] 1a
1311 bucketing_key: Mapped[list[str]] = mapped_column( 1a
1312 JSON, server_default="[]", default=list
1313 )
1315 last_event: Mapped[Optional[ReceivedEvent]] = mapped_column(Pydantic(ReceivedEvent)) 1a
1317 start: Mapped[DateTime] 1a
1318 end: Mapped[DateTime] 1a
1320 count: Mapped[int] 1a
1322 last_operation: Mapped[Optional[str]] 1a
1324 triggered_at: Mapped[Optional[DateTime]] 1a
1327class AutomationRelatedResource(Base): 1a
1328 __table_args__: Any = ( 1a
1329 sa.Index(
1330 "uq_automation_related_resource__automation_id__resource_id",
1331 "automation_id",
1332 "resource_id",
1333 unique=True,
1334 ),
1335 )
1337 automation_id: Mapped[uuid.UUID] = mapped_column( 1a
1338 sa.ForeignKey("automation.id", ondelete="CASCADE")
1339 )
1341 resource_id: Mapped[Optional[str]] = mapped_column(index=True) 1a
1342 automation_owned_by_resource: Mapped[bool] = mapped_column( 1a
1343 default=False, server_default="0"
1344 )
1346 automation: Mapped["Automation"] = relationship( 1a
1347 "Automation", back_populates="related_resources", lazy="raise"
1348 )
1351class CompositeTriggerChildFiring(Base): 1a
1352 __table_args__: Any = ( 1a
1353 sa.Index(
1354 "uq_composite_trigger_child_firing__a_id__pt_id__ct__id",
1355 "automation_id",
1356 "parent_trigger_id",
1357 "child_trigger_id",
1358 unique=True,
1359 ),
1360 )
1362 automation_id: Mapped[uuid.UUID] = mapped_column( 1a
1363 sa.ForeignKey("automation.id", ondelete="CASCADE")
1364 )
1366 parent_trigger_id: Mapped[uuid.UUID] 1a
1368 child_trigger_id: Mapped[uuid.UUID] 1a
1369 child_firing_id: Mapped[uuid.UUID] 1a
1370 child_fired_at: Mapped[Optional[DateTime]] 1a
1371 child_firing: Mapped[Firing] = mapped_column(Pydantic(Firing)) 1a
1374class AutomationEventFollower(Base): 1a
1375 __table_args__: Any = ( 1a
1376 sa.Index(
1377 "uq_follower_for_scope",
1378 "scope",
1379 "follower_event_id",
1380 unique=True,
1381 ),
1382 # allows lookup on (scope, leader_event_id) to use an index-only instead of full table scan
1383 sa.Index(
1384 "ix_ae_follower_scope_leader",
1385 "scope",
1386 "leader_event_id",
1387 ),
1388 )
1389 scope: Mapped[str] = mapped_column(default="", index=True) 1a
1390 leader_event_id: Mapped[uuid.UUID] = mapped_column(index=True) 1a
1391 follower_event_id: Mapped[uuid.UUID] 1a
1392 received: Mapped[DateTime] = mapped_column(index=True) 1a
1393 follower: Mapped[ReceivedEvent] = mapped_column(Pydantic(ReceivedEvent)) 1a
1396class Event(Base): 1a
1397 @declared_attr.directive 1a
1398 def __tablename__(cls) -> str: 1a
1399 return "events" 1a
1401 __table_args__: Any = ( 1a
1402 sa.Index(
1403 "ix_events__related_resource_ids_gin",
1404 "related_resource_ids",
1405 postgresql_using="gin",
1406 ),
1407 sa.Index("ix_events__occurred", "occurred"),
1408 sa.Index("ix_events__event__id", "event", "id"),
1409 sa.Index(
1410 "ix_events__event_resource_id_occurred",
1411 "event",
1412 "resource_id",
1413 "occurred",
1414 ),
1415 sa.Index("ix_events__occurred_id", "occurred", "id"),
1416 sa.Index("ix_events__event_occurred_id", "event", "occurred", "id"),
1417 sa.Index(
1418 "ix_events__related_gin",
1419 "related",
1420 postgresql_using="gin",
1421 ),
1422 sa.Index(
1423 "ix_events__event_occurred",
1424 "event",
1425 "occurred",
1426 ),
1427 )
1429 occurred: Mapped[DateTime] 1a
1430 event: Mapped[str] = mapped_column(sa.Text()) 1a
1431 resource_id: Mapped[str] = mapped_column(sa.Text()) 1a
1432 resource: Mapped[dict[str, Any]] = mapped_column(JSON()) 1a
1433 related_resource_ids: Mapped[list[str]] = mapped_column( 1a
1434 JSON(), server_default="[]", default=list
1435 )
1436 related: Mapped[list[dict[str, Any]]] = mapped_column( 1a
1437 JSON(), server_default="[]", default=list
1438 )
1439 payload: Mapped[dict[str, Any]] = mapped_column(JSON()) 1a
1440 received: Mapped[DateTime] 1a
1441 recorded: Mapped[DateTime] 1a
1442 follows: Mapped[Optional[uuid.UUID]] 1a
1445class EventResource(Base): 1a
1446 @declared_attr.directive 1a
1447 def __tablename__(cls) -> str: 1a
1448 return "event_resources" 1a
1450 __table_args__: Any = ( 1a
1451 sa.Index(
1452 "ix_event_resources__resource_id__occurred",
1453 "resource_id",
1454 "occurred",
1455 ),
1456 )
1458 occurred: Mapped[DateTime] 1a
1459 resource_id: Mapped[str] = mapped_column(sa.Text()) 1a
1460 resource_role: Mapped[str] = mapped_column(sa.Text()) 1a
1461 resource: Mapped[dict[str, Any]] = mapped_column(sa_JSON) 1a
1462 event_id: Mapped[uuid.UUID] 1a
1465# These are temporary until we've migrated all the references to the new,
1466# non-ORM names
1468ORMFlow = Flow 1a
1469ORMFlowRunState = FlowRunState 1a
1470ORMTaskRunState = TaskRunState 1a
1471ORMArtifact = Artifact 1a
1472ORMArtifactCollection = ArtifactCollection 1a
1473ORMTaskRunStateCache = TaskRunStateCache 1a
1474ORMRun = Run 1a
1475ORMFlowRun = FlowRun 1a
1476ORMTaskRun = TaskRun 1a
1477ORMDeploymentSchedule = DeploymentSchedule 1a
1478ORMDeployment = Deployment 1a
1479ORMLog = Log 1a
1480ORMConcurrencyLimit = ConcurrencyLimit 1a
1481ORMConcurrencyLimitV2 = ConcurrencyLimitV2 1a
1482ORMBlockType = BlockType 1a
1483ORMBlockSchema = BlockSchema 1a
1484ORMBlockSchemaReference = BlockSchemaReference 1a
1485ORMBlockDocument = BlockDocument 1a
1486ORMBlockDocumentReference = BlockDocumentReference 1a
1487ORMConfiguration = Configuration 1a
1488ORMSavedSearch = SavedSearch 1a
1489ORMWorkQueue = WorkQueue 1a
1490ORMWorkPool = WorkPool 1a
1491ORMWorker = Worker 1a
1492ORMAgent = Agent 1a
1493ORMVariable = Variable 1a
1494ORMFlowRunInput = FlowRunInput 1a
1495ORMCsrfToken = CsrfToken 1a
1496ORMAutomation = Automation 1a
1497ORMAutomationBucket = AutomationBucket 1a
1498ORMAutomationRelatedResource = AutomationRelatedResource 1a
1499ORMCompositeTriggerChildFiring = CompositeTriggerChildFiring 1a
1500ORMAutomationEventFollower = AutomationEventFollower 1a
1501ORMEvent = Event 1a
1502ORMEventResource = EventResource 1a
1505_UpsertColumns = Iterable[Union[str, "sa.Column[Any]", roles.DDLConstraintColumnRole]] 1a
1508class BaseORMConfiguration(ABC): 1a
1509 """
1510 Abstract base class used to inject database-specific ORM configuration into Prefect.
1512 Modifications to core Prefect REST API data structures can have unintended consequences.
1513 Use with caution.
1514 """
1516 def unique_key(self) -> tuple[Hashable, ...]: 1a
1517 """
1518 Returns a key used to determine whether to instantiate a new DB interface.
1519 """
1520 return (self.__class__, Base.metadata) 1aefghbidjklc
1522 @property 1a
1523 @abstractmethod 1a
1524 def versions_dir(self) -> Path: 1a
1525 """Directory containing migrations"""
1526 ...
1528 @property 1a
1529 def deployment_unique_upsert_columns(self) -> _UpsertColumns: 1a
1530 """Unique columns for upserting a Deployment"""
1531 return [Deployment.flow_id, Deployment.name] 1bdc
1533 @property 1a
1534 def concurrency_limit_unique_upsert_columns(self) -> _UpsertColumns: 1a
1535 """Unique columns for upserting a ConcurrencyLimit"""
1536 return [ConcurrencyLimit.tag]
1538 @property 1a
1539 def flow_run_unique_upsert_columns(self) -> _UpsertColumns: 1a
1540 """Unique columns for upserting a FlowRun"""
1541 return [FlowRun.flow_id, FlowRun.idempotency_key] 1bc
1543 @property 1a
1544 def block_type_unique_upsert_columns(self) -> _UpsertColumns: 1a
1545 """Unique columns for upserting a BlockType"""
1546 return [BlockType.slug] 1fb
1548 @property 1a
1549 def artifact_collection_unique_upsert_columns(self) -> _UpsertColumns: 1a
1550 """Unique columns for upserting an ArtifactCollection"""
1551 return [ArtifactCollection.key] 1bc
1553 @property 1a
1554 def block_schema_unique_upsert_columns(self) -> _UpsertColumns: 1a
1555 """Unique columns for upserting a BlockSchema"""
1556 return [BlockSchema.checksum, BlockSchema.version] 1f
1558 @property 1a
1559 def flow_unique_upsert_columns(self) -> _UpsertColumns: 1a
1560 """Unique columns for upserting a Flow"""
1561 return [Flow.name] 1bdc
1563 @property 1a
1564 def saved_search_unique_upsert_columns(self) -> _UpsertColumns: 1a
1565 """Unique columns for upserting a SavedSearch"""
1566 return [SavedSearch.name] 1b
1568 @property 1a
1569 def task_run_unique_upsert_columns(self) -> _UpsertColumns: 1a
1570 """Unique columns for upserting a TaskRun"""
1571 return [
1572 TaskRun.flow_run_id,
1573 TaskRun.task_key,
1574 TaskRun.dynamic_key,
1575 ]
1577 @property 1a
1578 def block_document_unique_upsert_columns(self) -> _UpsertColumns: 1a
1579 """Unique columns for upserting a BlockDocument"""
1580 return [BlockDocument.block_type_id, BlockDocument.name]
1583class AsyncPostgresORMConfiguration(BaseORMConfiguration): 1a
1584 """Postgres specific orm configuration"""
1586 @property 1a
1587 def versions_dir(self) -> Path: 1a
1588 """Directory containing migrations"""
1589 import prefect.server.database
1591 return (
1592 Path(prefect.server.database.__file__).parent
1593 / "_migrations"
1594 / "versions"
1595 / "postgresql"
1596 )
1599class AioSqliteORMConfiguration(BaseORMConfiguration): 1a
1600 """SQLite specific orm configuration"""
1602 @property 1a
1603 def versions_dir(self) -> Path: 1a
1604 """Directory containing migrations"""
1605 import prefect.server.database 1e
1607 return ( 1e
1608 Path(prefect.server.database.__file__).parent
1609 / "_migrations"
1610 / "versions"
1611 / "sqlite"
1612 )