Coverage for /usr/local/lib/python3.12/site-packages/prefect/runtime/task_run.py: 19%

67 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Access attributes of the current task run dynamically. 

3 

4Note that if a task run cannot be discovered, all attributes will return empty values. 

5 

6You can mock the runtime attributes for testing purposes by setting environment variables 

7prefixed with `PREFECT__RUNTIME__TASK_RUN`. 

8 

9Available attributes: 

10 - `id`: the task run's unique ID 

11 - `name`: the name of the task run 

12 - `tags`: the task run's set of tags 

13 - `parameters`: the parameters the task was called with 

14 - `run_count`: the number of times this task run has been run 

15 - `task_name`: the name of the task 

16""" 

17 

18from __future__ import annotations 1a

19 

20import os 1a

21from typing import Any, Callable 1a

22 

23from prefect.context import TaskRunContext 1a

24from prefect.settings import get_current_settings 1a

25 

26__all__ = [ 1a

27 "id", 

28 "tags", 

29 "name", 

30 "parameters", 

31 "run_count", 

32 "task_name", 

33 "api_url", 

34 "ui_url", 

35] 

36 

37 

38type_cast: dict[ 1a

39 type[bool] | type[int] | type[float] | type[str] | type[None], Callable[[Any], Any] 

40] = { 

41 bool: lambda x: x.lower() == "true", 

42 int: int, 

43 float: float, 

44 str: str, 

45 # for optional defined attributes, when real value is NoneType, use str 

46 type(None): str, 

47} 

48 

49 

50def __getattr__(name: str) -> Any: 1a

51 """ 

52 Attribute accessor for this submodule; note that imports also work with this: 

53 

54 from prefect.runtime.task_run import id 

55 """ 

56 

57 func = FIELDS.get(name) 

58 

59 # if `name` is an attribute but it is mocked through environment variable, the mocked type will be str, 

60 # which might be different from original one. For consistency, cast env var to the same type 

61 env_key = f"PREFECT__RUNTIME__TASK_RUN__{name.upper()}" 

62 

63 if func is None: 

64 if env_key in os.environ: 

65 return os.environ[env_key] 

66 else: 

67 raise AttributeError(f"{__name__} has no attribute {name!r}") 

68 

69 real_value = func() 

70 if env_key in os.environ: 

71 mocked_value = os.environ[env_key] 

72 # cast `mocked_value` to the same type as `real_value` 

73 try: 

74 cast_func = type_cast[type(real_value)] 

75 return cast_func(mocked_value) 

76 except KeyError: 

77 raise ValueError( 

78 "This runtime context attribute cannot be mocked using an" 

79 " environment variable. Please use monkeypatch instead." 

80 ) 

81 else: 

82 return real_value 

83 

84 

85def __dir__() -> list[str]: 1a

86 return sorted(__all__) 

87 

88 

89def get_id() -> str | None: 1a

90 task_run_ctx = TaskRunContext.get() 

91 if task_run_ctx is not None: 

92 return str(task_run_ctx.task_run.id) 

93 

94 

95def get_tags() -> list[str]: 1a

96 task_run_ctx = TaskRunContext.get() 

97 if task_run_ctx is None: 

98 return [] 

99 else: 

100 return task_run_ctx.task_run.tags 

101 

102 

103def get_run_count() -> int: 1a

104 task_run_ctx = TaskRunContext.get() 

105 if task_run_ctx is None: 

106 return 0 

107 else: 

108 return task_run_ctx.task_run.run_count 

109 

110 

111def get_name() -> str | None: 1a

112 task_run_ctx = TaskRunContext.get() 

113 if task_run_ctx is None: 

114 return None 

115 else: 

116 return task_run_ctx.task_run.name 

117 

118 

119def get_task_name() -> str | None: 1a

120 task_run_ctx = TaskRunContext.get() 

121 if task_run_ctx is None: 

122 return None 

123 else: 

124 return task_run_ctx.task.name 

125 

126 

127def get_parameters() -> dict[str, Any]: 1a

128 task_run_ctx = TaskRunContext.get() 

129 if task_run_ctx is not None: 

130 return task_run_ctx.parameters 

131 else: 

132 return {} 

133 

134 

135def get_task_run_api_url() -> str | None: 1a

136 if (api_url := get_current_settings().api.url) is None: 

137 return None 

138 if (task_run_id := get_id()) is None: 

139 return None 

140 return f"{api_url}/runs/task-run/{task_run_id}" 

141 

142 

143def get_task_run_ui_url() -> str | None: 1a

144 if (ui_url := get_current_settings().ui_url) is None: 

145 return None 

146 if (task_run_id := get_id()) is None: 

147 return None 

148 return f"{ui_url}/runs/task-run/{task_run_id}" 

149 

150 

151FIELDS: dict[str, Callable[[], Any | None]] = { 1a

152 "id": get_id, 

153 "tags": get_tags, 

154 "name": get_name, 

155 "parameters": get_parameters, 

156 "run_count": get_run_count, 

157 "task_name": get_task_name, 

158 "api_url": get_task_run_api_url, 

159 "ui_url": get_task_run_ui_url, 

160}