Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/utilities/encryption.py: 32%

27 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Encryption utilities 

3""" 

4 

5import json 1a

6import os 1a

7from collections.abc import Mapping 1a

8from typing import Any 1a

9 

10from cryptography.fernet import Fernet 1a

11from sqlalchemy.ext.asyncio import AsyncSession 1a

12 

13from prefect.server import schemas 1a

14 

15 

16async def _get_fernet_encryption(session: AsyncSession) -> Fernet: 1a

17 from prefect.server.models import configuration 

18 

19 environment_key = os.getenv( 

20 "PREFECT_SERVER_ENCRYPTION_KEY", 

21 # Deprecated. Use the `PREFECT_SERVER_ENCRYPTION_KEY` instead. 

22 os.getenv("ORION_ENCRYPTION_KEY"), 

23 ) 

24 if environment_key: 

25 return Fernet(environment_key.encode()) 

26 

27 configured_key = await configuration.read_configuration(session, "ENCRYPTION_KEY") 

28 

29 if configured_key is None: 

30 encryption_key = Fernet.generate_key() 

31 configured_key = schemas.core.Configuration( 

32 key="ENCRYPTION_KEY", value={"fernet_key": encryption_key.decode()} 

33 ) 

34 await configuration.write_configuration(session, configured_key) 

35 else: 

36 encryption_key = configured_key.value["fernet_key"].encode() 

37 return Fernet(encryption_key) 

38 

39 

40async def encrypt_fernet(session: AsyncSession, data: Mapping[str, Any]) -> str: 1a

41 fernet = await _get_fernet_encryption(session) 

42 byte_blob = json.dumps(data).encode() 

43 return fernet.encrypt(byte_blob).decode() 

44 

45 

46async def decrypt_fernet(session: AsyncSession, data: str) -> dict[str, Any]: 1a

47 fernet = await _get_fernet_encryption(session) 

48 byte_blob = data.encode() 

49 return json.loads(fernet.decrypt(byte_blob).decode())