Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/openai/openai.py: 35%

93 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1import base64 1a

2import inspect 1a

3import json 1a

4import os 1a

5from abc import ABC, abstractmethod 1a

6from pathlib import Path 1a

7from textwrap import dedent 1a

8 

9from openai import NOT_GIVEN, AsyncOpenAI 1a

10from openai.types.chat import ChatCompletion 1a

11from pydantic import BaseModel, field_validator 1a

12 

13from mealie.core.config import get_app_settings 1a

14from mealie.pkgs import img 1a

15 

16from .._base_service import BaseService 1a

17 

18 

19class OpenAIDataInjection(BaseModel): 1a

20 description: str 1a

21 value: str 1a

22 

23 @field_validator("value", mode="before") 1a

24 def parse_value(cls, value): 1a

25 if not value: 

26 raise ValueError("Value cannot be empty") 

27 if isinstance(value, str): 

28 return value 

29 

30 # convert Pydantic models to JSON 

31 if isinstance(value, BaseModel): 

32 return value.model_dump_json() 

33 

34 # convert Pydantic types to their JSON schema definition 

35 if inspect.isclass(value) and issubclass(value, BaseModel): 

36 value = value.model_json_schema() 

37 

38 # attempt to convert object to JSON 

39 try: 

40 return json.dumps(value, separators=(",", ":")) 

41 except TypeError: 

42 return value 

43 

44 

45class OpenAIImageBase(BaseModel, ABC): 1a

46 @abstractmethod 1a

47 def get_image_url(self) -> str: ... 47 ↛ exitline 47 didn't return from function 'get_image_url' because 1a

48 

49 def build_message(self) -> dict: 1a

50 return { 

51 "type": "image_url", 

52 "image_url": {"url": self.get_image_url()}, 

53 } 

54 

55 

56class OpenAIImageExternal(OpenAIImageBase): 1a

57 url: str 1a

58 

59 def get_image_url(self) -> str: 1a

60 return self.url 

61 

62 

63class OpenAILocalImage(OpenAIImageBase): 1a

64 filename: str 1a

65 path: Path 1a

66 

67 def get_image_url(self) -> str: 1a

68 image = img.PillowMinifier.to_jpg( 

69 self.path, dest=self.path.parent.joinpath(f"{self.filename}-min-original.jpg") 

70 ) 

71 with open(image, "rb") as f: 

72 b64content = base64.b64encode(f.read()).decode("utf-8") 

73 return f"data:image/jpeg;base64,{b64content}" 

74 

75 

76class OpenAIService(BaseService): 1a

77 PROMPTS_DIR = Path(os.path.dirname(os.path.abspath(__file__))) / "prompts" 1a

78 

79 def __init__(self) -> None: 1a

80 settings = get_app_settings() 1bcd

81 if not settings.OPENAI_ENABLED: 81 ↛ 84line 81 didn't jump to line 84 because the condition on line 81 was always true1bcd

82 raise ValueError("OpenAI is not enabled") 1bcd

83 

84 self.model = settings.OPENAI_MODEL 

85 self.workers = settings.OPENAI_WORKERS 

86 self.send_db_data = settings.OPENAI_SEND_DATABASE_DATA 

87 self.enable_image_services = settings.OPENAI_ENABLE_IMAGE_SERVICES 

88 

89 self.get_client = lambda: AsyncOpenAI( 

90 base_url=settings.OPENAI_BASE_URL, 

91 api_key=settings.OPENAI_API_KEY, 

92 timeout=settings.OPENAI_REQUEST_TIMEOUT, 

93 default_headers=settings.OPENAI_CUSTOM_HEADERS, 

94 default_query=settings.OPENAI_CUSTOM_PARAMS, 

95 ) 

96 

97 super().__init__() 

98 

99 @classmethod 1a

100 def get_prompt(cls, name: str, data_injections: list[OpenAIDataInjection] | None = None) -> str: 1a

101 """ 

102 Load stored prompt and inject data into it. 

103 

104 Access prompts with dot notation. 

105 For example, to access `prompts/recipes/parse-recipe-ingredients.txt`, use 

106 `recipes.parse-recipe-ingredients` 

107 """ 

108 

109 if not name: 

110 raise ValueError("Prompt name cannot be empty") 

111 

112 tree = name.split(".") 

113 prompt_dir = os.path.join(cls.PROMPTS_DIR, *tree[:-1], tree[-1] + ".txt") 

114 try: 

115 with open(prompt_dir) as f: 

116 content = f.read() 

117 except OSError as e: 

118 raise OSError(f"Unable to load prompt {name}") from e 

119 

120 if not data_injections: 

121 return content 

122 

123 content_parts = [content] 

124 for data_injection in data_injections: 

125 content_parts.append( 

126 dedent( 

127 f""" 

128 ### 

129 {data_injection.description} 

130 --- 

131 

132 {data_injection.value} 

133 """ 

134 ) 

135 ) 

136 return "\n".join(content_parts) 

137 

138 async def _get_raw_response(self, prompt: str, content: list[dict], force_json_response=True) -> ChatCompletion: 1a

139 client = self.get_client() 

140 return await client.chat.completions.create( 

141 messages=[ 

142 { 

143 "role": "system", 

144 "content": prompt, 

145 }, 

146 { 

147 "role": "user", 

148 "content": content, 

149 }, 

150 ], 

151 model=self.model, 

152 response_format={"type": "json_object"} if force_json_response else NOT_GIVEN, 

153 ) 

154 

155 async def get_response( 1a

156 self, 

157 prompt: str, 

158 message: str, 

159 *, 

160 images: list[OpenAIImageBase] | None = None, 

161 force_json_response=True, 

162 ) -> str | None: 

163 """Send data to OpenAI and return the response message content""" 

164 if images and not self.enable_image_services: 

165 self.logger.warning("OpenAI image services are disabled, ignoring images") 

166 images = None 

167 

168 try: 

169 user_messages = [{"type": "text", "text": message}] 

170 for image in images or []: 

171 user_messages.append(image.build_message()) 

172 

173 response = await self._get_raw_response(prompt, user_messages, force_json_response) 

174 if not response.choices: 

175 return None 

176 return response.choices[0].message.content 

177 except Exception as e: 

178 raise Exception(f"OpenAI Request Failed. {e.__class__.__name__}: {e}") from e