Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/csrf_token.py: 50%
26 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import secrets 1a
2from datetime import datetime, timezone 1a
3from typing import Optional 1a
5import sqlalchemy as sa 1a
6from sqlalchemy.ext.asyncio import AsyncSession 1a
8from prefect import settings 1a
9from prefect.server.database import PrefectDBInterface, db_injector 1a
10from prefect.server.schemas import core 1a
13@db_injector 1a
14async def create_or_update_csrf_token( 1a
15 db: PrefectDBInterface,
16 session: AsyncSession,
17 client: str,
18) -> core.CsrfToken:
19 """Create or update a CSRF token for a client. If the client already has a
20 token, it will be updated.
22 Args:
23 session (AsyncSession): The database session
24 client (str): The client identifier
26 Returns:
27 core.CsrfToken: The CSRF token
28 """
30 expiration = (
31 datetime.now(timezone.utc)
32 + settings.PREFECT_SERVER_CSRF_TOKEN_EXPIRATION.value()
33 )
34 token = secrets.token_hex(32)
36 await session.execute(
37 db.queries.insert(db.CsrfToken)
38 .values(
39 client=client,
40 token=token,
41 expiration=expiration,
42 )
43 .on_conflict_do_update(
44 index_elements=[db.CsrfToken.client],
45 set_={"token": token, "expiration": expiration},
46 ),
47 )
49 # Return the created / updated token object
50 csrf_token = await read_token_for_client(session=session, client=client)
51 assert csrf_token
53 return csrf_token
56@db_injector 1a
57async def read_token_for_client( 1a
58 db: PrefectDBInterface,
59 session: AsyncSession,
60 client: str,
61) -> Optional[core.CsrfToken]:
62 """Read a CSRF token for a client.
64 Args:
65 session (AsyncSession): The database session
66 client (str): The client identifier
68 Returns:
69 Optional[core.CsrfToken]: The CSRF token, if it exists and is not
70 expired.
71 """
72 token = (
73 await session.execute(
74 sa.select(db.CsrfToken).where(
75 sa.and_(
76 db.CsrfToken.expiration > datetime.now(timezone.utc),
77 db.CsrfToken.client == client,
78 )
79 )
80 )
81 ).scalar_one_or_none()
83 if token is None:
84 return None
86 return core.CsrfToken.model_validate(token, from_attributes=True)
89@db_injector 1a
90async def delete_expired_tokens(db: PrefectDBInterface, session: AsyncSession) -> int: 1a
91 """Delete expired CSRF tokens.
93 Args:
94 session (AsyncSession): The database session
96 Returns:
97 int: The number of tokens deleted
98 """
100 result = await session.execute(
101 sa.delete(db.CsrfToken).where(
102 db.CsrfToken.expiration < datetime.now(timezone.utc)
103 )
104 )
105 return result.rowcount