Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flow_run_input.py: 45%
29 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import uuid 1a
2from typing import List, Optional 1a
4import sqlalchemy as sa 1a
5from sqlalchemy.ext.asyncio import AsyncSession 1a
7import prefect.server.schemas as schemas 1a
8from prefect.server.database import PrefectDBInterface, db_injector 1a
11@db_injector 1a
12async def create_flow_run_input( 1a
13 db: PrefectDBInterface,
14 session: AsyncSession,
15 flow_run_input: schemas.core.FlowRunInput,
16) -> schemas.core.FlowRunInput:
17 model = db.FlowRunInput(**flow_run_input.model_dump())
18 session.add(model)
19 await session.flush()
21 return schemas.core.FlowRunInput.model_validate(model, from_attributes=True)
24@db_injector 1a
25async def filter_flow_run_input( 1a
26 db: PrefectDBInterface,
27 session: AsyncSession,
28 flow_run_id: uuid.UUID,
29 prefix: str,
30 limit: int,
31 exclude_keys: List[str],
32) -> List[schemas.core.FlowRunInput]:
33 query = (
34 sa.select(db.FlowRunInput)
35 .where(
36 sa.and_(
37 db.FlowRunInput.flow_run_id == flow_run_id,
38 db.FlowRunInput.key.like(prefix + "%"),
39 db.FlowRunInput.key.not_in(exclude_keys),
40 )
41 )
42 .order_by(db.FlowRunInput.created)
43 .limit(limit)
44 )
46 result = await session.execute(query)
47 return [
48 schemas.core.FlowRunInput.model_validate(model, from_attributes=True)
49 for model in result.scalars().all()
50 ]
53@db_injector 1a
54async def read_flow_run_input( 1a
55 db: PrefectDBInterface,
56 session: AsyncSession,
57 flow_run_id: uuid.UUID,
58 key: str,
59) -> Optional[schemas.core.FlowRunInput]:
60 query = sa.select(db.FlowRunInput).where(
61 sa.and_(
62 db.FlowRunInput.flow_run_id == flow_run_id,
63 db.FlowRunInput.key == key,
64 )
65 )
67 result = await session.execute(query)
68 model = result.scalar()
69 if model:
70 return schemas.core.FlowRunInput.model_validate(model, from_attributes=True)
72 return None
75@db_injector 1a
76async def delete_flow_run_input( 1a
77 db: PrefectDBInterface,
78 session: AsyncSession,
79 flow_run_id: uuid.UUID,
80 key: str,
81) -> bool:
82 result = await session.execute(
83 sa.delete(db.FlowRunInput).where(
84 sa.and_(
85 db.FlowRunInput.flow_run_id == flow_run_id,
86 db.FlowRunInput.key == key,
87 )
88 )
89 )
91 return result.rowcount > 0