Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/saved_searches.py: 44%

35 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Functions for interacting with saved search ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6from typing import Optional, Sequence, Union 1a

7from uuid import UUID 1a

8 

9import sqlalchemy as sa 1a

10from sqlalchemy import delete, select 1a

11from sqlalchemy.ext.asyncio import AsyncSession 1a

12 

13import prefect.server.schemas as schemas 1a

14from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

15 

16 

17@db_injector 1a

18async def create_saved_search( 1a

19 db: PrefectDBInterface, 

20 session: AsyncSession, 

21 saved_search: schemas.core.SavedSearch, 

22) -> orm_models.SavedSearch: 

23 """ 

24 Upserts a SavedSearch. 

25 

26 If a SavedSearch with the same name exists, all properties will be updated. 

27 

28 Args: 

29 session (AsyncSession): a database session 

30 saved_search (schemas.core.SavedSearch): a SavedSearch model 

31 

32 Returns: 

33 orm_models.SavedSearch: the newly-created or updated SavedSearch 

34 """ 

35 

36 insert_stmt = ( 

37 db.queries.insert(db.SavedSearch) 

38 .values(**saved_search.model_dump_for_orm(exclude_unset=True)) 

39 .on_conflict_do_update( 

40 index_elements=db.orm.saved_search_unique_upsert_columns, 

41 set_=saved_search.model_dump_for_orm(include={"filters"}), 

42 ) 

43 ) 

44 

45 await session.execute(insert_stmt) 

46 

47 query = ( 

48 sa.select(db.SavedSearch) 

49 .where( 

50 db.SavedSearch.name == saved_search.name, 

51 ) 

52 .execution_options(populate_existing=True) 

53 ) 

54 result = await session.execute(query) 

55 model = result.scalar_one() 

56 

57 return model 

58 

59 

60@db_injector 1a

61async def read_saved_search( 1a

62 db: PrefectDBInterface, session: AsyncSession, saved_search_id: UUID 

63) -> Union[orm_models.SavedSearch, None]: 

64 """ 

65 Reads a SavedSearch by id. 

66 

67 Args: 

68 session (AsyncSession): A database session 

69 saved_search_id (str): a SavedSearch id 

70 

71 Returns: 

72 orm_models.SavedSearch: the SavedSearch 

73 """ 

74 

75 return await session.get(db.SavedSearch, saved_search_id) 

76 

77 

78@db_injector 1a

79async def read_saved_search_by_name( 1a

80 db: PrefectDBInterface, session: AsyncSession, name: str 

81) -> Union[orm_models.SavedSearch, None]: 

82 """ 

83 Reads a SavedSearch by name. 

84 

85 Args: 

86 session (AsyncSession): A database session 

87 name (str): a SavedSearch name 

88 

89 Returns: 

90 orm_models.SavedSearch: the SavedSearch 

91 """ 

92 result = await session.execute( 

93 select(db.SavedSearch).where(db.SavedSearch.name == name).limit(1) 

94 ) 

95 return result.scalar() 

96 

97 

98@db_injector 1a

99async def read_saved_searches( 1a

100 db: PrefectDBInterface, 

101 session: AsyncSession, 

102 offset: Optional[int] = None, 

103 limit: Optional[int] = None, 

104) -> Sequence[orm_models.SavedSearch]: 

105 """ 

106 Read SavedSearches. 

107 

108 Args: 

109 session (AsyncSession): A database session 

110 offset (int): Query offset 

111 limit(int): Query limit 

112 

113 Returns: 

114 List[orm_models.SavedSearch]: SavedSearches 

115 """ 

116 

117 query = select(db.SavedSearch).order_by(db.SavedSearch.name) 

118 

119 if offset is not None: 

120 query = query.offset(offset) 

121 if limit is not None: 

122 query = query.limit(limit) 

123 

124 result = await session.execute(query) 

125 return result.scalars().unique().all() 

126 

127 

128@db_injector 1a

129async def delete_saved_search( 1a

130 db: PrefectDBInterface, session: AsyncSession, saved_search_id: UUID 

131) -> bool: 

132 """ 

133 Delete a SavedSearch by id. 

134 

135 Args: 

136 session (AsyncSession): A database session 

137 saved_search_id (str): a SavedSearch id 

138 

139 Returns: 

140 bool: whether or not the SavedSearch was deleted 

141 """ 

142 

143 result = await session.execute( 

144 delete(db.SavedSearch).where(db.SavedSearch.id == saved_search_id) 

145 ) 

146 return result.rowcount > 0