Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/_engine.py: 16%
43 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""Internal engine utilities"""
3from collections.abc import Callable 1a
4from functools import partial 1a
5from typing import TYPE_CHECKING, Any, Union 1a
6from uuid import uuid4 1a
8from prefect.context import FlowRunContext 1a
9from prefect.flows import Flow 1a
10from prefect.tasks import Task, TaskRunNameCallbackWithParameters 1a
13def dynamic_key_for_task_run( 1a
14 context: FlowRunContext, task: "Task[..., Any]", stable: bool = True
15) -> Union[int, str]:
16 if (
17 stable is False or context.detached
18 ): # this task is running on remote infrastructure
19 return str(uuid4())
20 elif context.flow_run is None: # this is an autonomous task run
21 context.task_run_dynamic_keys[task.task_key] = getattr(
22 task, "dynamic_key", str(uuid4())
23 )
25 elif task.task_key not in context.task_run_dynamic_keys:
26 context.task_run_dynamic_keys[task.task_key] = 0
27 else:
28 dynamic_key = context.task_run_dynamic_keys[task.task_key]
29 if TYPE_CHECKING:
30 assert isinstance(dynamic_key, int)
31 context.task_run_dynamic_keys[task.task_key] = dynamic_key + 1
33 return context.task_run_dynamic_keys[task.task_key]
36def resolve_custom_flow_run_name( 1a
37 flow: "Flow[..., Any]", parameters: dict[str, Any]
38) -> str:
39 if callable(flow.flow_run_name):
40 flow_run_name = flow.flow_run_name()
41 if not TYPE_CHECKING:
42 if not isinstance(flow_run_name, str):
43 raise TypeError(
44 f"Callable {flow.flow_run_name} for 'flow_run_name' returned type"
45 f" {type(flow_run_name).__name__} but a string is required."
46 )
47 elif isinstance(flow.flow_run_name, str):
48 flow_run_name = flow.flow_run_name.format(**parameters)
49 else:
50 raise TypeError(
51 "Expected string or callable for 'flow_run_name'; got"
52 f" {type(flow.flow_run_name).__name__} instead."
53 )
55 return flow_run_name
58def resolve_custom_task_run_name( 1a
59 task: "Task[..., Any]", parameters: dict[str, Any]
60) -> str:
61 if callable(task.task_run_name):
62 # If the callable accepts a 'parameters' kwarg, pass the entire parameters dict
63 if TaskRunNameCallbackWithParameters.is_callback_with_parameters(
64 task.task_run_name
65 ):
66 task_run_name = task.task_run_name(parameters=parameters)
67 else:
68 # If it doesn't expect parameters, call it without arguments
69 task_run_name = task.task_run_name()
71 if not TYPE_CHECKING:
72 if not isinstance(task_run_name, str):
73 raise TypeError(
74 f"Callable {task.task_run_name} for 'task_run_name' returned type"
75 f" {type(task_run_name).__name__} but a string is required."
76 )
77 elif isinstance(task.task_run_name, str):
78 task_run_name = task.task_run_name.format(**parameters)
79 else:
80 raise TypeError(
81 "Expected string or callable for 'task_run_name'; got"
82 f" {type(task.task_run_name).__name__} instead."
83 )
85 return task_run_name
88def get_hook_name(hook: Callable[..., Any]) -> str: 1a
89 return (
90 hook.__name__
91 if hasattr(hook, "__name__")
92 else (
93 hook.func.__name__ if isinstance(hook, partial) else hook.__class__.__name__
94 )
95 )