Coverage for polar/organization_access_token/tasks.py: 67%

9 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import uuid 1a

2from datetime import UTC, datetime 1a

3 

4from polar.worker import AsyncSessionMaker, TaskPriority, actor 1a

5 

6from .repository import OrganizationAccessTokenRepository 1a

7 

8 

9@actor(actor_name="organization_access_token.record_usage", priority=TaskPriority.LOW) 1a

10async def record_usage( 1a

11 organization_access_token_id: uuid.UUID, last_used_at: float 

12) -> None: 

13 async with AsyncSessionMaker() as session: 

14 repository = OrganizationAccessTokenRepository.from_session(session) 

15 await repository.record_usage( 

16 organization_access_token_id, datetime.fromtimestamp(last_used_at, tz=UTC) 

17 )