Spaces:
Paused
Paused
| """ | |
| This file contains the transformation logic for the Gemini realtime API. | |
| """ | |
| import json | |
| import os | |
| import uuid | |
| from typing import Any, Dict, List, Optional, Union, cast | |
| from litellm import verbose_logger | |
| from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj | |
| from litellm.llms.base_llm.realtime.transformation import BaseRealtimeConfig | |
| from litellm.llms.vertex_ai.gemini.vertex_and_google_ai_studio_gemini import ( | |
| VertexGeminiConfig, | |
| ) | |
| from litellm.responses.litellm_completion_transformation.transformation import ( | |
| LiteLLMCompletionResponsesConfig, | |
| ) | |
| from litellm.types.llms.gemini import ( | |
| AutomaticActivityDetection, | |
| BidiGenerateContentRealtimeInput, | |
| BidiGenerateContentRealtimeInputConfig, | |
| BidiGenerateContentServerContent, | |
| BidiGenerateContentServerMessage, | |
| BidiGenerateContentSetup, | |
| ) | |
| from litellm.types.llms.openai import ( | |
| OpenAIRealtimeContentPartDone, | |
| OpenAIRealtimeConversationItemCreated, | |
| OpenAIRealtimeDoneEvent, | |
| OpenAIRealtimeEvents, | |
| OpenAIRealtimeEventTypes, | |
| OpenAIRealtimeOutputItemDone, | |
| OpenAIRealtimeResponseAudioDone, | |
| OpenAIRealtimeResponseContentPartAdded, | |
| OpenAIRealtimeResponseDelta, | |
| OpenAIRealtimeResponseDoneObject, | |
| OpenAIRealtimeResponseTextDone, | |
| OpenAIRealtimeStreamResponseBaseObject, | |
| OpenAIRealtimeStreamResponseOutputItemAdded, | |
| OpenAIRealtimeStreamSession, | |
| OpenAIRealtimeStreamSessionEvents, | |
| OpenAIRealtimeTurnDetection, | |
| ) | |
| from litellm.types.llms.vertex_ai import ( | |
| GeminiResponseModalities, | |
| HttpxBlobType, | |
| HttpxContentType, | |
| ) | |
| from litellm.types.realtime import ( | |
| ALL_DELTA_TYPES, | |
| RealtimeModalityResponseTransformOutput, | |
| RealtimeResponseTransformInput, | |
| RealtimeResponseTypedDict, | |
| ) | |
| from litellm.utils import get_empty_usage | |
| from ..common_utils import encode_unserializable_types | |
| MAP_GEMINI_FIELD_TO_OPENAI_EVENT: Dict[str, OpenAIRealtimeEventTypes] = { | |
| "setupComplete": OpenAIRealtimeEventTypes.SESSION_CREATED, | |
| "serverContent.generationComplete": OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE, | |
| "serverContent.turnComplete": OpenAIRealtimeEventTypes.RESPONSE_DONE, | |
| "serverContent.interrupted": OpenAIRealtimeEventTypes.RESPONSE_DONE, | |
| } | |
| class GeminiRealtimeConfig(BaseRealtimeConfig): | |
| def validate_environment( | |
| self, headers: dict, model: str, api_key: Optional[str] = None | |
| ) -> dict: | |
| return headers | |
| def get_complete_url( | |
| self, api_base: Optional[str], model: str, api_key: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Example output: | |
| "BACKEND_WS_URL = "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent""; | |
| """ | |
| if api_base is None: | |
| api_base = "wss://generativelanguage.googleapis.com" | |
| if api_key is None: | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if api_key is None: | |
| raise ValueError("api_key is required for Gemini API calls") | |
| api_base = api_base.replace("https://", "wss://") | |
| api_base = api_base.replace("http://", "ws://") | |
| return f"{api_base}/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent?key={api_key}" | |
| def map_model_turn_event( | |
| self, model_turn: HttpxContentType | |
| ) -> OpenAIRealtimeEventTypes: | |
| """ | |
| Map the model turn event to the OpenAI realtime events. | |
| Returns either: | |
| - response.text.delta - model_turn: {"parts": [{"text": "..."}]} | |
| - response.audio.delta - model_turn: {"parts": [{"inlineData": {"mimeType": "audio/pcm", "data": "..."}}]} | |
| Assumes parts is a single element list. | |
| """ | |
| if "parts" in model_turn: | |
| parts = model_turn["parts"] | |
| if len(parts) != 1: | |
| verbose_logger.warning( | |
| f"Realtime: Expected 1 part, got {len(parts)} for Gemini model turn event." | |
| ) | |
| part = parts[0] | |
| if "text" in part: | |
| return OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA | |
| elif "inlineData" in part: | |
| return OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA | |
| else: | |
| raise ValueError(f"Unexpected part type: {part}") | |
| raise ValueError(f"Unexpected model turn event, no 'parts' key: {model_turn}") | |
| def map_generation_complete_event( | |
| self, delta_type: Optional[ALL_DELTA_TYPES] | |
| ) -> OpenAIRealtimeEventTypes: | |
| if delta_type == "text": | |
| return OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE | |
| elif delta_type == "audio": | |
| return OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE | |
| else: | |
| raise ValueError(f"Unexpected delta type: {delta_type}") | |
| def get_audio_mime_type(self, input_audio_format: str = "pcm16"): | |
| mime_types = { | |
| "pcm16": "audio/pcm", | |
| "g711_ulaw": "audio/pcmu", | |
| "g711_alaw": "audio/pcma", | |
| } | |
| return mime_types.get(input_audio_format, "application/octet-stream") | |
| def map_automatic_turn_detection( | |
| self, value: OpenAIRealtimeTurnDetection | |
| ) -> AutomaticActivityDetection: | |
| automatic_activity_dection = AutomaticActivityDetection() | |
| if "create_response" in value and isinstance(value["create_response"], bool): | |
| automatic_activity_dection["disabled"] = not value["create_response"] | |
| else: | |
| automatic_activity_dection["disabled"] = True | |
| if "prefix_padding_ms" in value and isinstance(value["prefix_padding_ms"], int): | |
| automatic_activity_dection["prefixPaddingMs"] = value["prefix_padding_ms"] | |
| if "silence_duration_ms" in value and isinstance( | |
| value["silence_duration_ms"], int | |
| ): | |
| automatic_activity_dection["silenceDurationMs"] = value[ | |
| "silence_duration_ms" | |
| ] | |
| return automatic_activity_dection | |
| def get_supported_openai_params(self, model: str) -> List[str]: | |
| return [ | |
| "instructions", | |
| "temperature", | |
| "max_response_output_tokens", | |
| "modalities", | |
| "tools", | |
| "input_audio_transcription", | |
| "turn_detection", | |
| ] | |
| def map_openai_params( | |
| self, optional_params: dict, non_default_params: dict | |
| ) -> dict: | |
| if "generationConfig" not in optional_params: | |
| optional_params["generationConfig"] = {} | |
| for key, value in non_default_params.items(): | |
| if key == "instructions": | |
| optional_params["systemInstruction"] = HttpxContentType( | |
| role="user", parts=[{"text": value}] | |
| ) | |
| elif key == "temperature": | |
| optional_params["generationConfig"]["temperature"] = value | |
| elif key == "max_response_output_tokens": | |
| optional_params["generationConfig"]["maxOutputTokens"] = value | |
| elif key == "modalities": | |
| optional_params["generationConfig"]["responseModalities"] = [ | |
| modality.upper() for modality in cast(List[str], value) | |
| ] | |
| elif key == "tools": | |
| from litellm.llms.vertex_ai.gemini.vertex_and_google_ai_studio_gemini import ( | |
| VertexGeminiConfig, | |
| ) | |
| vertex_gemini_config = VertexGeminiConfig() | |
| vertex_gemini_config._map_function(value) | |
| optional_params["generationConfig"][ | |
| "tools" | |
| ] = vertex_gemini_config._map_function(value) | |
| elif key == "input_audio_transcription" and value is not None: | |
| optional_params["inputAudioTranscription"] = {} | |
| elif key == "turn_detection": | |
| value_typed = cast(OpenAIRealtimeTurnDetection, value) | |
| transformed_audio_activity_config = self.map_automatic_turn_detection( | |
| value_typed | |
| ) | |
| if ( | |
| len(transformed_audio_activity_config) > 0 | |
| ): # if the config is not empty, add it to the optional params | |
| optional_params[ | |
| "realtimeInputConfig" | |
| ] = BidiGenerateContentRealtimeInputConfig( | |
| automaticActivityDetection=transformed_audio_activity_config | |
| ) | |
| if len(optional_params["generationConfig"]) == 0: | |
| optional_params.pop("generationConfig") | |
| return optional_params | |
| def transform_realtime_request( | |
| self, | |
| message: str, | |
| model: str, | |
| session_configuration_request: Optional[str] = None, | |
| ) -> List[str]: | |
| realtime_input_dict: BidiGenerateContentRealtimeInput = {} | |
| try: | |
| json_message = json.loads(message) | |
| except json.JSONDecodeError: | |
| if isinstance(message, bytes): | |
| message_str = message.decode("utf-8", errors="replace") | |
| else: | |
| message_str = str(message) | |
| raise ValueError(f"Invalid JSON message: {message_str}") | |
| ## HANDLE SESSION UPDATE ## | |
| messages: List[str] = [] | |
| if "type" in json_message and json_message["type"] == "session.update": | |
| client_session_configuration_request = self.map_openai_params( | |
| optional_params={}, non_default_params=json_message["session"] | |
| ) | |
| client_session_configuration_request["model"] = f"models/{model}" | |
| messages.append( | |
| json.dumps( | |
| { | |
| "setup": client_session_configuration_request, | |
| } | |
| ) | |
| ) | |
| # elif session_configuration_request is None: | |
| # default_session_configuration_request = self.session_configuration_request(model) | |
| # messages.append(default_session_configuration_request) | |
| ## HANDLE INPUT AUDIO BUFFER ## | |
| if ( | |
| "type" in json_message | |
| and json_message["type"] == "input_audio_buffer.append" | |
| ): | |
| realtime_input_dict["audio"] = HttpxBlobType( | |
| mimeType=self.get_audio_mime_type(), data=json_message["audio"] | |
| ) | |
| else: | |
| realtime_input_dict["text"] = message | |
| if len(realtime_input_dict) != 1: | |
| raise ValueError( | |
| f"Only one argument can be set, got {len(realtime_input_dict)}:" | |
| f" {list(realtime_input_dict.keys())}" | |
| ) | |
| realtime_input_dict = cast( | |
| BidiGenerateContentRealtimeInput, | |
| encode_unserializable_types(cast(Dict[str, object], realtime_input_dict)), | |
| ) | |
| messages.append(json.dumps({"realtime_input": realtime_input_dict})) | |
| return messages | |
| def transform_session_created_event( | |
| self, | |
| model: str, | |
| logging_session_id: str, | |
| session_configuration_request: Optional[str] = None, | |
| ) -> OpenAIRealtimeStreamSessionEvents: | |
| if session_configuration_request: | |
| session_configuration_request_dict: BidiGenerateContentSetup = json.loads( | |
| session_configuration_request | |
| ).get("setup", {}) | |
| else: | |
| session_configuration_request_dict = {} | |
| _model = session_configuration_request_dict.get("model") or model | |
| generation_config = ( | |
| session_configuration_request_dict.get("generationConfig", {}) or {} | |
| ) | |
| gemini_modalities = generation_config.get("responseModalities", ["TEXT"]) | |
| _modalities = [ | |
| modality.lower() for modality in cast(List[str], gemini_modalities) | |
| ] | |
| _system_instruction = session_configuration_request_dict.get( | |
| "systemInstruction" | |
| ) | |
| session = OpenAIRealtimeStreamSession( | |
| id=logging_session_id, | |
| modalities=_modalities, | |
| ) | |
| if _system_instruction is not None and isinstance(_system_instruction, str): | |
| session["instructions"] = _system_instruction | |
| if _model is not None and isinstance(_model, str): | |
| session["model"] = _model.strip( | |
| "models/" | |
| ) # keep it consistent with how openai returns the model name | |
| return OpenAIRealtimeStreamSessionEvents( | |
| type="session.created", | |
| session=session, | |
| event_id=str(uuid.uuid4()), | |
| ) | |
| def _is_new_content_delta( | |
| self, | |
| previous_messages: Optional[List[OpenAIRealtimeEvents]] = None, | |
| ) -> bool: | |
| if previous_messages is None or len(previous_messages) == 0: | |
| return True | |
| if "type" in previous_messages[-1] and previous_messages[-1]["type"].endswith( | |
| "delta" | |
| ): | |
| return False | |
| return True | |
| def return_new_content_delta_events( | |
| self, | |
| response_id: str, | |
| output_item_id: str, | |
| conversation_id: str, | |
| delta_type: ALL_DELTA_TYPES, | |
| session_configuration_request: Optional[str] = None, | |
| ) -> List[OpenAIRealtimeEvents]: | |
| if session_configuration_request is None: | |
| raise ValueError( | |
| "session_configuration_request is required for Gemini API calls" | |
| ) | |
| session_configuration_request_dict: BidiGenerateContentSetup = json.loads( | |
| session_configuration_request | |
| ).get("setup", {}) | |
| generation_config = session_configuration_request_dict.get( | |
| "generationConfig", {} | |
| ) | |
| gemini_modalities = generation_config.get("responseModalities", ["TEXT"]) | |
| _modalities = [ | |
| modality.lower() for modality in cast(List[str], gemini_modalities) | |
| ] | |
| _temperature = generation_config.get("temperature") | |
| _max_output_tokens = generation_config.get("maxOutputTokens") | |
| response_items: List[OpenAIRealtimeEvents] = [] | |
| ## - return response.created | |
| response_created = OpenAIRealtimeStreamResponseBaseObject( | |
| type="response.created", | |
| event_id="event_{}".format(uuid.uuid4()), | |
| response={ | |
| "object": "realtime.response", | |
| "id": response_id, | |
| "status": "in_progress", | |
| "output": [], | |
| "conversation_id": conversation_id, | |
| "modalities": _modalities, | |
| "temperature": _temperature, | |
| "max_output_tokens": _max_output_tokens, | |
| }, | |
| ) | |
| response_items.append(response_created) | |
| ## - return response.output_item.added ← adds ‘item_id’ same for all subsequent events | |
| response_output_item_added = OpenAIRealtimeStreamResponseOutputItemAdded( | |
| type="response.output_item.added", | |
| response_id=response_id, | |
| output_index=0, | |
| item={ | |
| "id": output_item_id, | |
| "object": "realtime.item", | |
| "type": "message", | |
| "status": "in_progress", | |
| "role": "assistant", | |
| "content": [], | |
| }, | |
| ) | |
| response_items.append(response_output_item_added) | |
| ## - return conversation.item.created | |
| conversation_item_created = OpenAIRealtimeConversationItemCreated( | |
| type="conversation.item.created", | |
| event_id="event_{}".format(uuid.uuid4()), | |
| item={ | |
| "id": output_item_id, | |
| "object": "realtime.item", | |
| "type": "message", | |
| "status": "in_progress", | |
| "role": "assistant", | |
| "content": [], | |
| }, | |
| ) | |
| response_items.append(conversation_item_created) | |
| ## - return response.content_part.added | |
| response_content_part_added = OpenAIRealtimeResponseContentPartAdded( | |
| type="response.content_part.added", | |
| content_index=0, | |
| output_index=0, | |
| event_id="event_{}".format(uuid.uuid4()), | |
| item_id=output_item_id, | |
| part={ | |
| "type": "text", | |
| "text": "", | |
| } | |
| if delta_type == "text" | |
| else { | |
| "type": "audio", | |
| "transcript": "", | |
| }, | |
| response_id=response_id, | |
| ) | |
| response_items.append(response_content_part_added) | |
| return response_items | |
| def transform_content_delta_events( | |
| self, | |
| message: BidiGenerateContentServerContent, | |
| output_item_id: str, | |
| response_id: str, | |
| delta_type: ALL_DELTA_TYPES, | |
| ) -> OpenAIRealtimeResponseDelta: | |
| delta = "" | |
| try: | |
| if "modelTurn" in message and "parts" in message["modelTurn"]: | |
| for part in message["modelTurn"]["parts"]: | |
| if "text" in part: | |
| delta += part["text"] | |
| elif "inlineData" in part: | |
| delta += part["inlineData"]["data"] | |
| except Exception as e: | |
| raise ValueError( | |
| f"Error transforming content delta events: {e}, got message: {message}" | |
| ) | |
| return OpenAIRealtimeResponseDelta( | |
| type="response.text.delta" | |
| if delta_type == "text" | |
| else "response.audio.delta", | |
| content_index=0, | |
| event_id="event_{}".format(uuid.uuid4()), | |
| item_id=output_item_id, | |
| output_index=0, | |
| response_id=response_id, | |
| delta=delta, | |
| ) | |
| def transform_content_done_event( | |
| self, | |
| delta_chunks: Optional[List[OpenAIRealtimeResponseDelta]], | |
| current_output_item_id: Optional[str], | |
| current_response_id: Optional[str], | |
| delta_type: ALL_DELTA_TYPES, | |
| ) -> Union[OpenAIRealtimeResponseTextDone, OpenAIRealtimeResponseAudioDone]: | |
| if delta_chunks: | |
| delta = "".join([delta_chunk["delta"] for delta_chunk in delta_chunks]) | |
| else: | |
| delta = "" | |
| if current_output_item_id is None or current_response_id is None: | |
| raise ValueError( | |
| "current_output_item_id and current_response_id cannot be None for a 'done' event." | |
| ) | |
| if delta_type == "text": | |
| return OpenAIRealtimeResponseTextDone( | |
| type="response.text.done", | |
| content_index=0, | |
| event_id="event_{}".format(uuid.uuid4()), | |
| item_id=current_output_item_id, | |
| output_index=0, | |
| response_id=current_response_id, | |
| text=delta, | |
| ) | |
| elif delta_type == "audio": | |
| return OpenAIRealtimeResponseAudioDone( | |
| type="response.audio.done", | |
| content_index=0, | |
| event_id="event_{}".format(uuid.uuid4()), | |
| item_id=current_output_item_id, | |
| output_index=0, | |
| response_id=current_response_id, | |
| ) | |
| def return_additional_content_done_events( | |
| self, | |
| current_output_item_id: Optional[str], | |
| current_response_id: Optional[str], | |
| delta_done_event: Union[ | |
| OpenAIRealtimeResponseTextDone, OpenAIRealtimeResponseAudioDone | |
| ], | |
| delta_type: ALL_DELTA_TYPES, | |
| ) -> List[OpenAIRealtimeEvents]: | |
| """ | |
| - return response.content_part.done | |
| - return response.output_item.done | |
| """ | |
| if current_output_item_id is None or current_response_id is None: | |
| raise ValueError( | |
| "current_output_item_id and current_response_id cannot be None for a 'done' event." | |
| ) | |
| returned_items: List[OpenAIRealtimeEvents] = [] | |
| delta_done_event_text = cast(Optional[str], delta_done_event.get("text")) | |
| # response.content_part.done | |
| response_content_part_done = OpenAIRealtimeContentPartDone( | |
| type="response.content_part.done", | |
| content_index=0, | |
| event_id="event_{}".format(uuid.uuid4()), | |
| item_id=current_output_item_id, | |
| output_index=0, | |
| part={"type": "text", "text": delta_done_event_text} | |
| if delta_done_event_text and delta_type == "text" | |
| else { | |
| "type": "audio", | |
| "transcript": "", # gemini doesn't return transcript for audio | |
| }, | |
| response_id=current_response_id, | |
| ) | |
| returned_items.append(response_content_part_done) | |
| # response.output_item.done | |
| response_output_item_done = OpenAIRealtimeOutputItemDone( | |
| type="response.output_item.done", | |
| event_id="event_{}".format(uuid.uuid4()), | |
| output_index=0, | |
| response_id=current_response_id, | |
| item={ | |
| "id": current_output_item_id, | |
| "object": "realtime.item", | |
| "type": "message", | |
| "status": "completed", | |
| "role": "assistant", | |
| "content": [ | |
| {"type": "text", "text": delta_done_event_text} | |
| if delta_done_event_text and delta_type == "text" | |
| else { | |
| "type": "audio", | |
| "transcript": "", | |
| } | |
| ], | |
| }, | |
| ) | |
| returned_items.append(response_output_item_done) | |
| return returned_items | |
| def get_nested_value(obj: dict, path: str) -> Any: | |
| keys = path.split(".") | |
| current = obj | |
| for key in keys: | |
| if isinstance(current, dict) and key in current: | |
| current = current[key] | |
| else: | |
| return None | |
| return current | |
| def update_current_delta_chunks( | |
| self, | |
| transformed_message: Union[OpenAIRealtimeEvents, List[OpenAIRealtimeEvents]], | |
| current_delta_chunks: Optional[List[OpenAIRealtimeResponseDelta]], | |
| ) -> Optional[List[OpenAIRealtimeResponseDelta]]: | |
| try: | |
| if isinstance(transformed_message, list): | |
| current_delta_chunks = [] | |
| any_delta_chunk = False | |
| for event in transformed_message: | |
| if event["type"] == "response.text.delta": | |
| current_delta_chunks.append( | |
| cast(OpenAIRealtimeResponseDelta, event) | |
| ) | |
| any_delta_chunk = True | |
| if not any_delta_chunk: | |
| current_delta_chunks = ( | |
| None # reset current_delta_chunks if no delta chunks | |
| ) | |
| else: | |
| if ( | |
| transformed_message["type"] == "response.text.delta" | |
| ): # ONLY ACCUMULATE TEXT DELTA CHUNKS - AUDIO WILL CAUSE SERVER MEMORY ISSUES | |
| if current_delta_chunks is None: | |
| current_delta_chunks = [] | |
| current_delta_chunks.append( | |
| cast(OpenAIRealtimeResponseDelta, transformed_message) | |
| ) | |
| else: | |
| current_delta_chunks = None | |
| return current_delta_chunks | |
| except Exception as e: | |
| raise ValueError( | |
| f"Error updating current delta chunks: {e}, got transformed_message: {transformed_message}" | |
| ) | |
| def update_current_item_chunks( | |
| self, | |
| transformed_message: Union[OpenAIRealtimeEvents, List[OpenAIRealtimeEvents]], | |
| current_item_chunks: Optional[List[OpenAIRealtimeOutputItemDone]], | |
| ) -> Optional[List[OpenAIRealtimeOutputItemDone]]: | |
| try: | |
| if isinstance(transformed_message, list): | |
| current_item_chunks = [] | |
| any_item_chunk = False | |
| for event in transformed_message: | |
| if event["type"] == "response.output_item.done": | |
| current_item_chunks.append( | |
| cast(OpenAIRealtimeOutputItemDone, event) | |
| ) | |
| any_item_chunk = True | |
| if not any_item_chunk: | |
| current_item_chunks = ( | |
| None # reset current_item_chunks if no item chunks | |
| ) | |
| else: | |
| if transformed_message["type"] == "response.output_item.done": | |
| if current_item_chunks is None: | |
| current_item_chunks = [] | |
| current_item_chunks.append( | |
| cast(OpenAIRealtimeOutputItemDone, transformed_message) | |
| ) | |
| else: | |
| current_item_chunks = None | |
| return current_item_chunks | |
| except Exception as e: | |
| raise ValueError( | |
| f"Error updating current item chunks: {e}, got transformed_message: {transformed_message}" | |
| ) | |
| def transform_response_done_event( | |
| self, | |
| message: BidiGenerateContentServerMessage, | |
| current_response_id: Optional[str], | |
| current_conversation_id: Optional[str], | |
| output_items: Optional[List[OpenAIRealtimeOutputItemDone]], | |
| session_configuration_request: Optional[str] = None, | |
| ) -> OpenAIRealtimeDoneEvent: | |
| if current_conversation_id is None or current_response_id is None: | |
| raise ValueError( | |
| f"current_conversation_id and current_response_id must all be set for a 'done' event. Got=current_conversation_id: {current_conversation_id}, current_response_id: {current_response_id}" | |
| ) | |
| if session_configuration_request: | |
| session_configuration_request_dict: BidiGenerateContentSetup = json.loads( | |
| session_configuration_request | |
| ).get("setup", {}) | |
| else: | |
| session_configuration_request_dict = {} | |
| generation_config = session_configuration_request_dict.get( | |
| "generationConfig", {} | |
| ) | |
| temperature = generation_config.get("temperature") | |
| max_output_tokens = generation_config.get("max_output_tokens") | |
| gemini_modalities = generation_config.get("responseModalities", ["TEXT"]) | |
| _modalities = [ | |
| modality.lower() for modality in cast(List[str], gemini_modalities) | |
| ] | |
| if "usageMetadata" in message: | |
| _chat_completion_usage = VertexGeminiConfig._calculate_usage( | |
| completion_response=message, | |
| ) | |
| else: | |
| _chat_completion_usage = get_empty_usage() | |
| responses_api_usage = LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage( | |
| _chat_completion_usage, | |
| ) | |
| response_done_event = OpenAIRealtimeDoneEvent( | |
| type="response.done", | |
| event_id="event_{}".format(uuid.uuid4()), | |
| response=OpenAIRealtimeResponseDoneObject( | |
| object="realtime.response", | |
| id=current_response_id, | |
| status="completed", | |
| output=[output_item["item"] for output_item in output_items] | |
| if output_items | |
| else [], | |
| conversation_id=current_conversation_id, | |
| modalities=_modalities, | |
| usage=responses_api_usage.model_dump(), | |
| ), | |
| ) | |
| if temperature is not None: | |
| response_done_event["response"]["temperature"] = temperature | |
| if max_output_tokens is not None: | |
| response_done_event["response"]["max_output_tokens"] = max_output_tokens | |
| return response_done_event | |
| def handle_openai_modality_event( | |
| self, | |
| openai_event: OpenAIRealtimeEventTypes, | |
| json_message: dict, | |
| realtime_response_transform_input: RealtimeResponseTransformInput, | |
| delta_type: ALL_DELTA_TYPES, | |
| ) -> RealtimeModalityResponseTransformOutput: | |
| current_output_item_id = realtime_response_transform_input[ | |
| "current_output_item_id" | |
| ] | |
| current_response_id = realtime_response_transform_input["current_response_id"] | |
| current_conversation_id = realtime_response_transform_input[ | |
| "current_conversation_id" | |
| ] | |
| current_delta_chunks = realtime_response_transform_input["current_delta_chunks"] | |
| session_configuration_request = realtime_response_transform_input[ | |
| "session_configuration_request" | |
| ] | |
| returned_message: List[OpenAIRealtimeEvents] = [] | |
| if ( | |
| openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA | |
| or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA | |
| ): | |
| current_response_id = current_response_id or "resp_{}".format(uuid.uuid4()) | |
| if not current_output_item_id: | |
| # send the list of standard 'new' content.delta events | |
| current_output_item_id = "item_{}".format(uuid.uuid4()) | |
| current_conversation_id = current_conversation_id or "conv_{}".format( | |
| uuid.uuid4() | |
| ) | |
| returned_message = self.return_new_content_delta_events( | |
| session_configuration_request=session_configuration_request, | |
| response_id=current_response_id, | |
| output_item_id=current_output_item_id, | |
| conversation_id=current_conversation_id, | |
| delta_type=delta_type, | |
| ) | |
| # send the list of standard 'new' content.delta events | |
| transformed_message = self.transform_content_delta_events( | |
| BidiGenerateContentServerContent(**json_message["serverContent"]), | |
| current_output_item_id, | |
| current_response_id, | |
| delta_type=delta_type, | |
| ) | |
| returned_message.append(transformed_message) | |
| elif ( | |
| openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE | |
| or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE | |
| ): | |
| transformed_content_done_event = self.transform_content_done_event( | |
| current_output_item_id=current_output_item_id, | |
| current_response_id=current_response_id, | |
| delta_chunks=current_delta_chunks, | |
| delta_type=delta_type, | |
| ) | |
| returned_message = [transformed_content_done_event] | |
| additional_items = self.return_additional_content_done_events( | |
| current_output_item_id=current_output_item_id, | |
| current_response_id=current_response_id, | |
| delta_done_event=transformed_content_done_event, | |
| delta_type=delta_type, | |
| ) | |
| returned_message.extend(additional_items) | |
| return { | |
| "returned_message": returned_message, | |
| "current_output_item_id": current_output_item_id, | |
| "current_response_id": current_response_id, | |
| "current_conversation_id": current_conversation_id, | |
| "current_delta_chunks": current_delta_chunks, | |
| "current_delta_type": delta_type, | |
| } | |
| def map_openai_event( | |
| self, | |
| key: str, | |
| value: dict, | |
| current_delta_type: Optional[ALL_DELTA_TYPES], | |
| json_message: dict, | |
| ) -> OpenAIRealtimeEventTypes: | |
| model_turn_event = value.get("modelTurn") | |
| generation_complete_event = value.get("generationComplete") | |
| openai_event: Optional[OpenAIRealtimeEventTypes] = None | |
| if model_turn_event: # check if model turn event | |
| openai_event = self.map_model_turn_event(model_turn_event) | |
| elif generation_complete_event: | |
| openai_event = self.map_generation_complete_event( | |
| delta_type=current_delta_type | |
| ) | |
| else: | |
| # Check if this key or any nested key matches our mapping | |
| for map_key, openai_event in MAP_GEMINI_FIELD_TO_OPENAI_EVENT.items(): | |
| if map_key == key or ( | |
| "." in map_key | |
| and GeminiRealtimeConfig.get_nested_value(json_message, map_key) | |
| is not None | |
| ): | |
| openai_event = openai_event | |
| break | |
| if openai_event is None: | |
| raise ValueError(f"Unknown openai event: {key}, value: {value}") | |
| return openai_event | |
| def transform_realtime_response( | |
| self, | |
| message: Union[str, bytes], | |
| model: str, | |
| logging_obj: LiteLLMLoggingObj, | |
| realtime_response_transform_input: RealtimeResponseTransformInput, | |
| ) -> RealtimeResponseTypedDict: | |
| """ | |
| Keep this state less - leave the state management (e.g. tracking current_output_item_id, current_response_id, current_conversation_id, current_delta_chunks) to the caller. | |
| """ | |
| try: | |
| json_message = json.loads(message) | |
| except json.JSONDecodeError: | |
| if isinstance(message, bytes): | |
| message_str = message.decode("utf-8", errors="replace") | |
| else: | |
| message_str = str(message) | |
| raise ValueError(f"Invalid JSON message: {message_str}") | |
| logging_session_id = logging_obj.litellm_trace_id | |
| current_output_item_id = realtime_response_transform_input[ | |
| "current_output_item_id" | |
| ] | |
| current_response_id = realtime_response_transform_input["current_response_id"] | |
| current_conversation_id = realtime_response_transform_input[ | |
| "current_conversation_id" | |
| ] | |
| current_delta_chunks = realtime_response_transform_input["current_delta_chunks"] | |
| session_configuration_request = realtime_response_transform_input[ | |
| "session_configuration_request" | |
| ] | |
| current_item_chunks = realtime_response_transform_input["current_item_chunks"] | |
| current_delta_type: Optional[ | |
| ALL_DELTA_TYPES | |
| ] = realtime_response_transform_input["current_delta_type"] | |
| returned_message: List[OpenAIRealtimeEvents] = [] | |
| for key, value in json_message.items(): | |
| # Check if this key or any nested key matches our mapping | |
| openai_event = self.map_openai_event( | |
| key=key, | |
| value=value, | |
| current_delta_type=current_delta_type, | |
| json_message=json_message, | |
| ) | |
| if openai_event == OpenAIRealtimeEventTypes.SESSION_CREATED: | |
| transformed_message = self.transform_session_created_event( | |
| model, | |
| logging_session_id, | |
| realtime_response_transform_input["session_configuration_request"], | |
| ) | |
| session_configuration_request = json.dumps(transformed_message) | |
| returned_message.append(transformed_message) | |
| elif openai_event == OpenAIRealtimeEventTypes.RESPONSE_DONE: | |
| transformed_response_done_event = self.transform_response_done_event( | |
| message=BidiGenerateContentServerMessage(**json_message), # type: ignore | |
| current_response_id=current_response_id, | |
| current_conversation_id=current_conversation_id, | |
| session_configuration_request=session_configuration_request, | |
| output_items=None, | |
| ) | |
| returned_message.append(transformed_response_done_event) | |
| elif ( | |
| openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA | |
| or openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE | |
| or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA | |
| or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE | |
| ): | |
| _returned_message = self.handle_openai_modality_event( | |
| openai_event, | |
| json_message, | |
| realtime_response_transform_input, | |
| delta_type="text" if "text" in openai_event.value else "audio", | |
| ) | |
| returned_message.extend(_returned_message["returned_message"]) | |
| current_output_item_id = _returned_message["current_output_item_id"] | |
| current_response_id = _returned_message["current_response_id"] | |
| current_conversation_id = _returned_message["current_conversation_id"] | |
| current_delta_chunks = _returned_message["current_delta_chunks"] | |
| current_delta_type = _returned_message["current_delta_type"] | |
| else: | |
| raise ValueError(f"Unknown openai event: {openai_event}") | |
| if len(returned_message) == 0: | |
| if isinstance(message, bytes): | |
| message_str = message.decode("utf-8", errors="replace") | |
| else: | |
| message_str = str(message) | |
| raise ValueError(f"Unknown message type: {message_str}") | |
| current_delta_chunks = self.update_current_delta_chunks( | |
| transformed_message=returned_message, | |
| current_delta_chunks=current_delta_chunks, | |
| ) | |
| current_item_chunks = self.update_current_item_chunks( | |
| transformed_message=returned_message, | |
| current_item_chunks=current_item_chunks, | |
| ) | |
| return { | |
| "response": returned_message, | |
| "current_output_item_id": current_output_item_id, | |
| "current_response_id": current_response_id, | |
| "current_delta_chunks": current_delta_chunks, | |
| "current_conversation_id": current_conversation_id, | |
| "current_item_chunks": current_item_chunks, | |
| "current_delta_type": current_delta_type, | |
| "session_configuration_request": session_configuration_request, | |
| } | |
| def requires_session_configuration(self) -> bool: | |
| return True | |
| def session_configuration_request(self, model: str) -> str: | |
| """ | |
| ``` | |
| { | |
| "model": string, | |
| "generationConfig": { | |
| "candidateCount": integer, | |
| "maxOutputTokens": integer, | |
| "temperature": number, | |
| "topP": number, | |
| "topK": integer, | |
| "presencePenalty": number, | |
| "frequencyPenalty": number, | |
| "responseModalities": [string], | |
| "speechConfig": object, | |
| "mediaResolution": object | |
| }, | |
| "systemInstruction": string, | |
| "tools": [object] | |
| } | |
| ``` | |
| """ | |
| response_modalities: List[GeminiResponseModalities] = ["AUDIO"] | |
| output_audio_transcription = False | |
| # if "audio" in model: ## UNCOMMENT THIS WHEN AUDIO IS SUPPORTED | |
| # output_audio_transcription = True | |
| setup_config: BidiGenerateContentSetup = { | |
| "model": f"models/{model}", | |
| "generationConfig": {"responseModalities": response_modalities}, | |
| } | |
| if output_audio_transcription: | |
| setup_config["outputAudioTranscription"] = {} | |
| return json.dumps( | |
| { | |
| "setup": setup_config, | |
| } | |
| ) | |