Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/__init__.py: 33%
65 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from typing import TYPE_CHECKING, overload 1a
2from prefect.cli.transfer._migratable_resources.base import ( 1a
3 MigratableProtocol,
4 MigratableType,
5)
8if TYPE_CHECKING: 8 ↛ 9line 8 didn't jump to line 9 because the condition on line 8 was never true1a
9 from prefect.client.schemas.objects import (
10 BlockDocument,
11 BlockSchema,
12 BlockType,
13 Flow,
14 WorkPool,
15 WorkQueue,
16 Variable,
17 )
18 from prefect.client.schemas.responses import (
19 DeploymentResponse,
20 GlobalConcurrencyLimitResponse,
21 )
22 from prefect.events.schemas.automations import Automation
24 from prefect.cli.transfer._migratable_resources.blocks import (
25 MigratableBlockDocument,
26 MigratableBlockSchema,
27 MigratableBlockType,
28 )
29 from prefect.cli.transfer._migratable_resources.concurrency_limits import (
30 MigratableGlobalConcurrencyLimit,
31 )
32 from prefect.cli.transfer._migratable_resources.deployments import (
33 MigratableDeployment,
34 )
35 from prefect.cli.transfer._migratable_resources.flows import MigratableFlow
36 from prefect.cli.transfer._migratable_resources.variables import MigratableVariable
37 from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool
38 from prefect.cli.transfer._migratable_resources.work_queues import (
39 MigratableWorkQueue,
40 )
41 from prefect.cli.transfer._migratable_resources.automations import (
42 MigratableAutomation,
43 )
46@overload 1a
47async def construct_migratable_resource(obj: "WorkPool") -> "MigratableWorkPool": ... 47 ↛ exitline 47 didn't return from function 'construct_migratable_resource' because 1a
48@overload 1a
49async def construct_migratable_resource(obj: "WorkQueue") -> "MigratableWorkQueue": ... 49 ↛ exitline 49 didn't return from function 'construct_migratable_resource' because 1a
50@overload 1a
51async def construct_migratable_resource( 51 ↛ exitline 51 didn't return from function 'construct_migratable_resource' because 1a
52 obj: "DeploymentResponse",
53) -> "MigratableDeployment": ...
54@overload 1a
55async def construct_migratable_resource(obj: "Flow") -> "MigratableFlow": ... 55 ↛ exitline 55 didn't return from function 'construct_migratable_resource' because 1a
56@overload 1a
57async def construct_migratable_resource(obj: "BlockType") -> "MigratableBlockType": ... 57 ↛ exitline 57 didn't return from function 'construct_migratable_resource' because 1a
58@overload 1a
59async def construct_migratable_resource( 59 ↛ exitline 59 didn't return from function 'construct_migratable_resource' because 1a
60 obj: "BlockSchema",
61) -> "MigratableBlockSchema": ...
62@overload 1a
63async def construct_migratable_resource( 63 ↛ exitline 63 didn't return from function 'construct_migratable_resource' because 1a
64 obj: "BlockDocument",
65) -> "MigratableBlockDocument": ...
66@overload 1a
67async def construct_migratable_resource( 67 ↛ exitline 67 didn't return from function 'construct_migratable_resource' because 1a
68 obj: "Automation",
69) -> "MigratableAutomation": ...
70@overload 1a
71async def construct_migratable_resource( 71 ↛ exitline 71 didn't return from function 'construct_migratable_resource' because 1a
72 obj: "GlobalConcurrencyLimitResponse",
73) -> "MigratableGlobalConcurrencyLimit": ...
74@overload 1a
75async def construct_migratable_resource(obj: "Variable") -> "MigratableVariable": ... 75 ↛ exitline 75 didn't return from function 'construct_migratable_resource' because 1a
78async def construct_migratable_resource( 1a
79 obj: MigratableType,
80) -> MigratableProtocol:
81 from prefect.client.schemas.objects import (
82 BlockDocument,
83 BlockSchema,
84 BlockType,
85 Flow,
86 WorkPool,
87 WorkQueue,
88 )
89 from prefect.client.schemas.responses import (
90 DeploymentResponse,
91 GlobalConcurrencyLimitResponse,
92 )
93 from prefect.events.schemas.automations import Automation
95 from prefect.cli.transfer._migratable_resources.blocks import (
96 MigratableBlockDocument,
97 MigratableBlockSchema,
98 MigratableBlockType,
99 )
100 from prefect.cli.transfer._migratable_resources.concurrency_limits import (
101 MigratableGlobalConcurrencyLimit,
102 )
103 from prefect.cli.transfer._migratable_resources.deployments import (
104 MigratableDeployment,
105 )
106 from prefect.cli.transfer._migratable_resources.flows import MigratableFlow
107 from prefect.cli.transfer._migratable_resources.variables import MigratableVariable
108 from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool
109 from prefect.cli.transfer._migratable_resources.work_queues import (
110 MigratableWorkQueue,
111 )
112 from prefect.cli.transfer._migratable_resources.automations import (
113 MigratableAutomation,
114 )
116 if isinstance(obj, WorkPool):
117 return await MigratableWorkPool.construct(obj)
118 elif isinstance(obj, WorkQueue):
119 return await MigratableWorkQueue.construct(obj)
120 elif isinstance(obj, DeploymentResponse):
121 return await MigratableDeployment.construct(obj)
122 elif isinstance(obj, Flow):
123 return await MigratableFlow.construct(obj)
124 elif isinstance(obj, BlockType):
125 return await MigratableBlockType.construct(obj)
126 elif isinstance(obj, BlockSchema):
127 return await MigratableBlockSchema.construct(obj)
128 elif isinstance(obj, BlockDocument):
129 return await MigratableBlockDocument.construct(obj)
130 elif isinstance(obj, Automation):
131 return await MigratableAutomation.construct(obj)
132 elif isinstance(obj, GlobalConcurrencyLimitResponse):
133 return await MigratableGlobalConcurrencyLimit.construct(obj)
134 else:
135 return await MigratableVariable.construct(obj)