Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/ui/task_runs.py: 45%
58 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from datetime import datetime 1a
2from typing import TYPE_CHECKING, List, Optional 1a
3from uuid import UUID 1a
5import sqlalchemy as sa 1a
6from fastapi import Depends, HTTPException, Path 1a
7from pydantic import Field, model_serializer 1a
9import prefect.server.schemas as schemas 1a
10from prefect._internal.compatibility.starlette import status 1a
11from prefect.logging import get_logger 1a
12from prefect.server import models 1a
13from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
14from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a
15from prefect.server.utilities.server import PrefectRouter 1a
16from prefect.types._datetime import end_of_period, now 1a
18if TYPE_CHECKING: 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true1a
19 import logging
21logger: "logging.Logger" = get_logger("server.api.ui.task_runs") 1a
23router: PrefectRouter = PrefectRouter(prefix="/ui/task_runs", tags=["Task Runs", "UI"]) 1a
25FAILED_STATES = [schemas.states.StateType.CRASHED, schemas.states.StateType.FAILED] 1a
28class TaskRunCount(PrefectBaseModel): 1a
29 completed: int = Field( 1a
30 default=..., description="The number of completed task runs."
31 )
32 failed: int = Field(default=..., description="The number of failed task runs.") 1a
34 @model_serializer 1a
35 def ser_model(self) -> dict[str, int]: 1a
36 return {
37 "completed": int(self.completed),
38 "failed": int(self.failed),
39 }
42@router.post("/dashboard/counts") 1a
43async def read_dashboard_task_run_counts( 1a
44 task_runs: schemas.filters.TaskRunFilter,
45 flows: Optional[schemas.filters.FlowFilter] = None,
46 flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
47 deployments: Optional[schemas.filters.DeploymentFilter] = None,
48 work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
49 work_queues: Optional[schemas.filters.WorkQueueFilter] = None,
50 db: PrefectDBInterface = Depends(provide_database_interface),
51) -> List[TaskRunCount]:
52 if task_runs.start_time is None or task_runs.start_time.after_ is None:
53 raise HTTPException(
54 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
55 detail="task_runs.start_time.after_ is required",
56 )
58 # We only care about task runs that are in a terminal state, all others
59 # should be ignored.
60 task_runs.state = schemas.filters.TaskRunFilterState(
61 type=schemas.filters.TaskRunFilterStateType(
62 any_=list(schemas.states.TERMINAL_STATES)
63 )
64 )
66 bucket_count = 20
67 start_time = task_runs.start_time.after_.replace(microsecond=0, second=0)
68 end_time = (
69 end_of_period(task_runs.start_time.before_, "minute")
70 if task_runs.start_time.before_
71 else end_of_period(now("UTC"), "minute")
72 )
73 window = end_time - start_time
74 delta = window / bucket_count
76 async with db.session_context(begin_transaction=False) as session:
77 # Gather the raw counts. The counts are divided into buckets of time
78 # and each bucket contains the number of successful and failed task
79 # runs.
80 # SQLAlchemy doesn't play nicely with our DateTime type so we convert it
81 # to a datetime object.
82 start_datetime = datetime(
83 start_time.year,
84 start_time.month,
85 start_time.day,
86 start_time.hour,
87 start_time.minute,
88 start_time.second,
89 start_time.microsecond,
90 start_time.tzinfo,
91 )
92 bucket_expression = sa.func.floor(
93 sa.func.date_diff_seconds(db.TaskRun.start_time, start_datetime)
94 / delta.total_seconds()
95 ).label("bucket")
97 raw_counts = (
98 (
99 await models.task_runs._apply_task_run_filters(
100 db,
101 sa.select(
102 bucket_expression,
103 sa.func.min(db.TaskRun.end_time).label("oldest"),
104 sa.func.sum(
105 sa.case(
106 (
107 db.TaskRun.state_type.in_(FAILED_STATES),
108 1,
109 ),
110 else_=0,
111 )
112 ).label("failed_count"),
113 sa.func.sum(
114 sa.case(
115 (
116 db.TaskRun.state_type.notin_(FAILED_STATES),
117 1,
118 ),
119 else_=0,
120 )
121 ).label("successful_count"),
122 ),
123 flow_filter=flows,
124 flow_run_filter=flow_runs,
125 task_run_filter=task_runs,
126 deployment_filter=deployments,
127 work_pool_filter=work_pools,
128 work_queue_filter=work_queues,
129 )
130 )
131 .group_by("bucket", db.TaskRun.start_time)
132 .subquery()
133 )
135 # Aggregate the raw counts by bucket
136 query = (
137 sa.select(
138 raw_counts.c.bucket.label("bucket"),
139 sa.func.min(raw_counts.c.oldest).label("oldest"),
140 sa.func.sum(raw_counts.c.failed_count).label("failed_count"),
141 sa.func.sum(raw_counts.c.successful_count).label("successful_count"),
142 )
143 .select_from(raw_counts)
144 .group_by(raw_counts.c.bucket)
145 .order_by(sa.asc("oldest"))
146 )
148 result = await session.execute(query)
150 # Ensure that all buckets of time are present in the result even if no
151 # matching task runs occurred during the given time period.
152 buckets = [TaskRunCount(completed=0, failed=0) for _ in range(bucket_count)]
154 for row in result:
155 index = int(row.bucket)
156 buckets[index].completed = row.successful_count
157 buckets[index].failed = row.failed_count
159 return buckets
162@router.post("/count") 1a
163async def read_task_run_counts_by_state( 1a
164 flows: Optional[schemas.filters.FlowFilter] = None,
165 flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
166 task_runs: Optional[schemas.filters.TaskRunFilter] = None,
167 deployments: Optional[schemas.filters.DeploymentFilter] = None,
168 db: PrefectDBInterface = Depends(provide_database_interface),
169) -> schemas.states.CountByState:
170 async with db.session_context(begin_transaction=False) as session:
171 return await models.task_runs.count_task_runs_by_state(
172 session=session,
173 flow_filter=flows,
174 flow_run_filter=flow_runs,
175 task_run_filter=task_runs,
176 deployment_filter=deployments,
177 )
180@router.get("/{id:uuid}") 1a
181async def read_task_run_with_flow_run_name( 1a
182 task_run_id: UUID = Path(..., description="The task run id", alias="id"),
183 db: PrefectDBInterface = Depends(provide_database_interface),
184) -> schemas.ui.UITaskRun:
185 """
186 Get a task run by id.
187 """
188 async with db.session_context() as session:
189 task_run = await models.task_runs.read_task_run_with_flow_run_name(
190 session=session, task_run_id=task_run_id
191 )
193 if not task_run:
194 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Task not found")
196 return schemas.ui.UITaskRun.model_validate(task_run)