Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/csrf_token.py: 50%

26 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import secrets 1a

2from datetime import datetime, timezone 1a

3from typing import Optional 1a

4 

5import sqlalchemy as sa 1a

6from sqlalchemy.ext.asyncio import AsyncSession 1a

7 

8from prefect import settings 1a

9from prefect.server.database import PrefectDBInterface, db_injector 1a

10from prefect.server.schemas import core 1a

11 

12 

13@db_injector 1a

14async def create_or_update_csrf_token( 1a

15 db: PrefectDBInterface, 

16 session: AsyncSession, 

17 client: str, 

18) -> core.CsrfToken: 

19 """Create or update a CSRF token for a client. If the client already has a 

20 token, it will be updated. 

21 

22 Args: 

23 session (AsyncSession): The database session 

24 client (str): The client identifier 

25 

26 Returns: 

27 core.CsrfToken: The CSRF token 

28 """ 

29 

30 expiration = ( 

31 datetime.now(timezone.utc) 

32 + settings.PREFECT_SERVER_CSRF_TOKEN_EXPIRATION.value() 

33 ) 

34 token = secrets.token_hex(32) 

35 

36 await session.execute( 

37 db.queries.insert(db.CsrfToken) 

38 .values( 

39 client=client, 

40 token=token, 

41 expiration=expiration, 

42 ) 

43 .on_conflict_do_update( 

44 index_elements=[db.CsrfToken.client], 

45 set_={"token": token, "expiration": expiration}, 

46 ), 

47 ) 

48 

49 # Return the created / updated token object 

50 csrf_token = await read_token_for_client(session=session, client=client) 

51 assert csrf_token 

52 

53 return csrf_token 

54 

55 

56@db_injector 1a

57async def read_token_for_client( 1a

58 db: PrefectDBInterface, 

59 session: AsyncSession, 

60 client: str, 

61) -> Optional[core.CsrfToken]: 

62 """Read a CSRF token for a client. 

63 

64 Args: 

65 session (AsyncSession): The database session 

66 client (str): The client identifier 

67 

68 Returns: 

69 Optional[core.CsrfToken]: The CSRF token, if it exists and is not 

70 expired. 

71 """ 

72 token = ( 

73 await session.execute( 

74 sa.select(db.CsrfToken).where( 

75 sa.and_( 

76 db.CsrfToken.expiration > datetime.now(timezone.utc), 

77 db.CsrfToken.client == client, 

78 ) 

79 ) 

80 ) 

81 ).scalar_one_or_none() 

82 

83 if token is None: 

84 return None 

85 

86 return core.CsrfToken.model_validate(token, from_attributes=True) 

87 

88 

89@db_injector 1a

90async def delete_expired_tokens(db: PrefectDBInterface, session: AsyncSession) -> int: 1a

91 """Delete expired CSRF tokens. 

92 

93 Args: 

94 session (AsyncSession): The database session 

95 

96 Returns: 

97 int: The number of tokens deleted 

98 """ 

99 

100 result = await session.execute( 

101 sa.delete(db.CsrfToken).where( 

102 db.CsrfToken.expiration < datetime.now(timezone.utc) 

103 ) 

104 ) 

105 return result.rowcount