Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/concurrency_limits.py: 0%
42 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations
3import uuid
5from typing_extensions import Self
7from prefect.cli.transfer._exceptions import TransferSkipped
8from prefect.cli.transfer._migratable_resources.base import (
9 MigratableProtocol,
10 MigratableResource,
11)
12from prefect.client.orchestration import get_client
13from prefect.client.schemas.actions import (
14 GlobalConcurrencyLimitCreate,
15)
16from prefect.client.schemas.responses import (
17 GlobalConcurrencyLimitResponse,
18)
19from prefect.exceptions import (
20 ObjectAlreadyExists,
21)
24class MigratableGlobalConcurrencyLimit(
25 MigratableResource[GlobalConcurrencyLimitResponse]
26):
27 _instances: dict[uuid.UUID, Self] = {}
29 def __init__(self, global_concurrency_limit: GlobalConcurrencyLimitResponse):
30 self.source_global_concurrency_limit = global_concurrency_limit
31 self.destination_global_concurrency_limit: (
32 GlobalConcurrencyLimitResponse | None
33 ) = None
35 @property
36 def source_id(self) -> uuid.UUID:
37 return self.source_global_concurrency_limit.id
39 @property
40 def destination_id(self) -> uuid.UUID | None:
41 return (
42 self.destination_global_concurrency_limit.id
43 if self.destination_global_concurrency_limit
44 else None
45 )
47 @classmethod
48 async def construct(cls, obj: GlobalConcurrencyLimitResponse) -> Self:
49 if obj.id in cls._instances:
50 return cls._instances[obj.id]
51 instance = cls(obj)
52 cls._instances[obj.id] = instance
53 return instance
55 @classmethod
56 async def get_instance(
57 cls, id: uuid.UUID
58 ) -> "MigratableResource[GlobalConcurrencyLimitResponse] | None":
59 if id in cls._instances:
60 return cls._instances[id]
61 return None
63 async def get_dependencies(self) -> "list[MigratableProtocol]":
64 return []
66 async def migrate(self) -> None:
67 async with get_client() as client:
68 try:
69 await client.create_global_concurrency_limit(
70 concurrency_limit=GlobalConcurrencyLimitCreate(
71 name=self.source_global_concurrency_limit.name,
72 limit=self.source_global_concurrency_limit.limit,
73 active=self.source_global_concurrency_limit.active,
74 active_slots=self.source_global_concurrency_limit.active_slots,
75 ),
76 )
77 self.destination_global_concurrency_limit = (
78 await client.read_global_concurrency_limit_by_name(
79 self.source_global_concurrency_limit.name
80 )
81 )
82 except ObjectAlreadyExists:
83 self.destination_global_concurrency_limit = (
84 await client.read_global_concurrency_limit_by_name(
85 self.source_global_concurrency_limit.name
86 )
87 )
88 raise TransferSkipped("Already exists")