Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/task_run_states.py: 62%

16 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Functions for interacting with task run state ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6from typing import Sequence, Union 1a

7from uuid import UUID 1a

8 

9from sqlalchemy import delete, select 1a

10from sqlalchemy.ext.asyncio import AsyncSession 1a

11 

12from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

13 

14 

15@db_injector 1a

16async def read_task_run_state( 1a

17 db: PrefectDBInterface, session: AsyncSession, task_run_state_id: UUID 

18) -> Union[orm_models.TaskRunState, None]: 

19 """ 

20 Reads a task run state by id. 

21 

22 Args: 

23 session: A database session 

24 task_run_state_id: a task run state id 

25 

26 Returns: 

27 orm_models.TaskRunState: the task state 

28 """ 

29 

30 return await session.get(db.TaskRunState, task_run_state_id) 

31 

32 

33@db_injector 1a

34async def read_task_run_states( 1a

35 db: PrefectDBInterface, session: AsyncSession, task_run_id: UUID 

36) -> Sequence[orm_models.TaskRunState]: 

37 """ 

38 Reads task runs states for a task run. 

39 

40 Args: 

41 session: A database session 

42 task_run_id: the task run id 

43 

44 Returns: 

45 List[orm_models.TaskRunState]: the task run states 

46 """ 

47 

48 query = ( 

49 select(db.TaskRunState) 

50 .filter_by(task_run_id=task_run_id) 

51 .order_by(db.TaskRunState.timestamp) 

52 ) 

53 result = await session.execute(query) 

54 return result.scalars().unique().all() 

55 

56 

57async def delete_task_run_state( 1a

58 db: PrefectDBInterface, session: AsyncSession, task_run_state_id: UUID 

59) -> bool: 

60 """ 

61 Delete a task run state by id. 

62 

63 Args: 

64 session: A database session 

65 task_run_state_id: a task run state id 

66 

67 Returns: 

68 bool: whether or not the task run state was deleted 

69 """ 

70 

71 result = await session.execute( 

72 delete(db.TaskRunState).where(db.TaskRunState.id == task_run_state_id) 

73 ) 

74 return result.rowcount > 0