Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/utilities/encryption.py: 32%
27 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Encryption utilities
3"""
5import json 1a
6import os 1a
7from collections.abc import Mapping 1a
8from typing import Any 1a
10from cryptography.fernet import Fernet 1a
11from sqlalchemy.ext.asyncio import AsyncSession 1a
13from prefect.server import schemas 1a
16async def _get_fernet_encryption(session: AsyncSession) -> Fernet: 1a
17 from prefect.server.models import configuration
19 environment_key = os.getenv(
20 "PREFECT_SERVER_ENCRYPTION_KEY",
21 # Deprecated. Use the `PREFECT_SERVER_ENCRYPTION_KEY` instead.
22 os.getenv("ORION_ENCRYPTION_KEY"),
23 )
24 if environment_key:
25 return Fernet(environment_key.encode())
27 configured_key = await configuration.read_configuration(session, "ENCRYPTION_KEY")
29 if configured_key is None:
30 encryption_key = Fernet.generate_key()
31 configured_key = schemas.core.Configuration(
32 key="ENCRYPTION_KEY", value={"fernet_key": encryption_key.decode()}
33 )
34 await configuration.write_configuration(session, configured_key)
35 else:
36 encryption_key = configured_key.value["fernet_key"].encode()
37 return Fernet(encryption_key)
40async def encrypt_fernet(session: AsyncSession, data: Mapping[str, Any]) -> str: 1a
41 fernet = await _get_fernet_encryption(session)
42 byte_blob = json.dumps(data).encode()
43 return fernet.encrypt(byte_blob).decode()
46async def decrypt_fernet(session: AsyncSession, data: str) -> dict[str, Any]: 1a
47 fernet = await _get_fernet_encryption(session)
48 byte_blob = data.encode()
49 return json.loads(fernet.decrypt(byte_blob).decode())