Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/_engine.py: 16%

43 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1"""Internal engine utilities""" 

2 

3from collections.abc import Callable 1a

4from functools import partial 1a

5from typing import TYPE_CHECKING, Any, Union 1a

6from uuid import uuid4 1a

7 

8from prefect.context import FlowRunContext 1a

9from prefect.flows import Flow 1a

10from prefect.tasks import Task, TaskRunNameCallbackWithParameters 1a

11 

12 

13def dynamic_key_for_task_run( 1a

14 context: FlowRunContext, task: "Task[..., Any]", stable: bool = True 

15) -> Union[int, str]: 

16 if ( 

17 stable is False or context.detached 

18 ): # this task is running on remote infrastructure 

19 return str(uuid4()) 

20 elif context.flow_run is None: # this is an autonomous task run 

21 context.task_run_dynamic_keys[task.task_key] = getattr( 

22 task, "dynamic_key", str(uuid4()) 

23 ) 

24 

25 elif task.task_key not in context.task_run_dynamic_keys: 

26 context.task_run_dynamic_keys[task.task_key] = 0 

27 else: 

28 dynamic_key = context.task_run_dynamic_keys[task.task_key] 

29 if TYPE_CHECKING: 

30 assert isinstance(dynamic_key, int) 

31 context.task_run_dynamic_keys[task.task_key] = dynamic_key + 1 

32 

33 return context.task_run_dynamic_keys[task.task_key] 

34 

35 

36def resolve_custom_flow_run_name( 1a

37 flow: "Flow[..., Any]", parameters: dict[str, Any] 

38) -> str: 

39 if callable(flow.flow_run_name): 

40 flow_run_name = flow.flow_run_name() 

41 if not TYPE_CHECKING: 

42 if not isinstance(flow_run_name, str): 

43 raise TypeError( 

44 f"Callable {flow.flow_run_name} for 'flow_run_name' returned type" 

45 f" {type(flow_run_name).__name__} but a string is required." 

46 ) 

47 elif isinstance(flow.flow_run_name, str): 

48 flow_run_name = flow.flow_run_name.format(**parameters) 

49 else: 

50 raise TypeError( 

51 "Expected string or callable for 'flow_run_name'; got" 

52 f" {type(flow.flow_run_name).__name__} instead." 

53 ) 

54 

55 return flow_run_name 

56 

57 

58def resolve_custom_task_run_name( 1a

59 task: "Task[..., Any]", parameters: dict[str, Any] 

60) -> str: 

61 if callable(task.task_run_name): 

62 # If the callable accepts a 'parameters' kwarg, pass the entire parameters dict 

63 if TaskRunNameCallbackWithParameters.is_callback_with_parameters( 

64 task.task_run_name 

65 ): 

66 task_run_name = task.task_run_name(parameters=parameters) 

67 else: 

68 # If it doesn't expect parameters, call it without arguments 

69 task_run_name = task.task_run_name() 

70 

71 if not TYPE_CHECKING: 

72 if not isinstance(task_run_name, str): 

73 raise TypeError( 

74 f"Callable {task.task_run_name} for 'task_run_name' returned type" 

75 f" {type(task_run_name).__name__} but a string is required." 

76 ) 

77 elif isinstance(task.task_run_name, str): 

78 task_run_name = task.task_run_name.format(**parameters) 

79 else: 

80 raise TypeError( 

81 "Expected string or callable for 'task_run_name'; got" 

82 f" {type(task.task_run_name).__name__} instead." 

83 ) 

84 

85 return task_run_name 

86 

87 

88def get_hook_name(hook: Callable[..., Any]) -> str: 1a

89 return ( 

90 hook.__name__ 

91 if hasattr(hook, "__name__") 

92 else ( 

93 hook.func.__name__ if isinstance(hook, partial) else hook.__class__.__name__ 

94 ) 

95 )