Coverage for polar/kit/hook.py: 44%

14 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1from collections.abc import Callable, Coroutine 1a

2from typing import Any 1a

3 

4type HookFunc[T] = Callable[[T], Coroutine[Any, Any, Any]] 1a

5 

6 

7class Hook[T]: 1a

8 hooks: list[HookFunc[T]] 

9 

10 def __init__(self) -> None: 1a

11 self.hooks = [] 1a

12 

13 def add(self, fun: HookFunc[T]) -> None: 1a

14 if fun in self.hooks: 

15 raise Exception(f"fun is already registered! fun={fun} hooks={self.hooks}") 

16 

17 self.hooks.append(fun) 

18 

19 async def call(self, payload: T) -> None: 1a

20 for fn in self.hooks: 

21 await fn(payload)