# What is this? ## File for 'response_cost' calculation in Logging import time from functools import lru_cache from typing import Any, List, Literal, Optional, Tuple, Union from pydantic import BaseModel import litellm import litellm._logging from litellm import verbose_logger from litellm.litellm_core_utils.llm_cost_calc.utils import _generic_cost_per_character from litellm.llms.anthropic.cost_calculation import ( cost_per_token as anthropic_cost_per_token, ) from litellm.llms.azure.cost_calculation import ( cost_per_token as azure_openai_cost_per_token, ) from litellm.llms.azure_ai.cost_calculator import ( cost_per_query as azure_ai_rerank_cost_per_query, ) from litellm.llms.bedrock.image.cost_calculator import ( cost_calculator as bedrock_image_cost_calculator, ) from litellm.llms.cohere.cost_calculator import ( cost_per_query as cohere_rerank_cost_per_query, ) from litellm.llms.databricks.cost_calculator import ( cost_per_token as databricks_cost_per_token, ) from litellm.llms.deepseek.cost_calculator import ( cost_per_token as deepseek_cost_per_token, ) from litellm.llms.fireworks_ai.cost_calculator import ( cost_per_token as fireworks_ai_cost_per_token, ) from litellm.llms.gemini.cost_calculator import cost_per_token as gemini_cost_per_token from litellm.llms.openai.cost_calculation import ( cost_per_second as openai_cost_per_second, ) from litellm.llms.openai.cost_calculation import cost_per_token as openai_cost_per_token from litellm.llms.together_ai.cost_calculator import get_model_params_and_category from litellm.llms.vertex_ai.cost_calculator import ( cost_per_character as google_cost_per_character, ) from litellm.llms.vertex_ai.cost_calculator import ( cost_per_token as google_cost_per_token, ) from litellm.llms.vertex_ai.cost_calculator import cost_router as google_cost_router from litellm.llms.vertex_ai.image_generation.cost_calculator import ( cost_calculator as vertex_ai_image_cost_calculator, ) from litellm.types.llms.openai import HttpxBinaryResponseContent from litellm.types.rerank import RerankResponse from litellm.types.utils import ( CallTypesLiteral, LlmProvidersSet, PassthroughCallTypes, Usage, ) from litellm.utils import ( CallTypes, CostPerToken, EmbeddingResponse, ImageResponse, ModelResponse, TextCompletionResponse, TranscriptionResponse, _cached_get_model_info_helper, token_counter, ) def _cost_per_token_custom_pricing_helper( prompt_tokens: float = 0, completion_tokens: float = 0, response_time_ms: Optional[float] = 0.0, ### CUSTOM PRICING ### custom_cost_per_token: Optional[CostPerToken] = None, custom_cost_per_second: Optional[float] = None, ) -> Optional[Tuple[float, float]]: """Internal helper function for calculating cost, if custom pricing given""" if custom_cost_per_token is None and custom_cost_per_second is None: return None if custom_cost_per_token is not None: input_cost = custom_cost_per_token["input_cost_per_token"] * prompt_tokens output_cost = custom_cost_per_token["output_cost_per_token"] * completion_tokens return input_cost, output_cost elif custom_cost_per_second is not None: output_cost = custom_cost_per_second * response_time_ms / 1000 # type: ignore return 0, output_cost return None def cost_per_token( # noqa: PLR0915 model: str = "", prompt_tokens: int = 0, completion_tokens: int = 0, response_time_ms: Optional[float] = 0.0, custom_llm_provider: Optional[str] = None, region_name=None, ### CHARACTER PRICING ### prompt_characters: Optional[int] = None, completion_characters: Optional[int] = None, ### PROMPT CACHING PRICING ### - used for anthropic cache_creation_input_tokens: Optional[int] = 0, cache_read_input_tokens: Optional[int] = 0, ### CUSTOM PRICING ### custom_cost_per_token: Optional[CostPerToken] = None, custom_cost_per_second: Optional[float] = None, ### NUMBER OF QUERIES ### number_of_queries: Optional[int] = None, ### USAGE OBJECT ### usage_object: Optional[Usage] = None, # just read the usage object if provided ### CALL TYPE ### call_type: CallTypesLiteral = "completion", audio_transcription_file_duration: float = 0.0, # for audio transcription calls - the file time in seconds ) -> Tuple[float, float]: # type: ignore """ Calculates the cost per token for a given model, prompt tokens, and completion tokens. Parameters: model (str): The name of the model to use. Default is "" prompt_tokens (int): The number of tokens in the prompt. completion_tokens (int): The number of tokens in the completion. response_time (float): The amount of time, in milliseconds, it took the call to complete. prompt_characters (float): The number of characters in the prompt. Used for vertex ai cost calculation. completion_characters (float): The number of characters in the completion response. Used for vertex ai cost calculation. custom_llm_provider (str): The llm provider to whom the call was made (see init.py for full list) custom_cost_per_token: Optional[CostPerToken]: the cost per input + output token for the llm api call. custom_cost_per_second: Optional[float]: the cost per second for the llm api call. call_type: Optional[str]: the call type Returns: tuple: A tuple containing the cost in USD dollars for prompt tokens and completion tokens, respectively. """ if model is None: raise Exception("Invalid arg. Model cannot be none.") ## RECONSTRUCT USAGE BLOCK ## if usage_object is not None: usage_block = usage_object else: usage_block = Usage( prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=prompt_tokens + completion_tokens, cache_creation_input_tokens=cache_creation_input_tokens, cache_read_input_tokens=cache_read_input_tokens, ) ## CUSTOM PRICING ## response_cost = _cost_per_token_custom_pricing_helper( prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, response_time_ms=response_time_ms, custom_cost_per_second=custom_cost_per_second, custom_cost_per_token=custom_cost_per_token, ) if response_cost is not None: return response_cost[0], response_cost[1] # given prompt_tokens_cost_usd_dollar: float = 0 completion_tokens_cost_usd_dollar: float = 0 model_cost_ref = litellm.model_cost model_with_provider = model if custom_llm_provider is not None: model_with_provider = custom_llm_provider + "/" + model if region_name is not None: model_with_provider_and_region = ( f"{custom_llm_provider}/{region_name}/{model}" ) if ( model_with_provider_and_region in model_cost_ref ): # use region based pricing, if it's available model_with_provider = model_with_provider_and_region else: _, custom_llm_provider, _, _ = litellm.get_llm_provider(model=model) model_without_prefix = model model_parts = model.split("/", 1) if len(model_parts) > 1: model_without_prefix = model_parts[1] else: model_without_prefix = model """ Code block that formats model to lookup in litellm.model_cost Option1. model = "bedrock/ap-northeast-1/anthropic.claude-instant-v1". This is the most accurate since it is region based. Should always be option 1 Option2. model = "openai/gpt-4" - model = provider/model Option3. model = "anthropic.claude-3" - model = model """ if ( model_with_provider in model_cost_ref ): # Option 2. use model with provider, model = "openai/gpt-4" model = model_with_provider elif model in model_cost_ref: # Option 1. use model passed, model="gpt-4" model = model elif ( model_without_prefix in model_cost_ref ): # Option 3. if user passed model="bedrock/anthropic.claude-3", use model="anthropic.claude-3" model = model_without_prefix # see this https://learn.microsoft.com/en-us/azure/ai-services/openai/concepts/models if call_type == "speech" or call_type == "aspeech": if prompt_characters is None: raise ValueError( "prompt_characters must be provided for tts calls. prompt_characters={}, model={}, custom_llm_provider={}, call_type={}".format( prompt_characters, model, custom_llm_provider, call_type, ) ) prompt_cost, completion_cost = _generic_cost_per_character( model=model_without_prefix, custom_llm_provider=custom_llm_provider, prompt_characters=prompt_characters, completion_characters=0, custom_prompt_cost=None, custom_completion_cost=0, ) if prompt_cost is None or completion_cost is None: raise ValueError( "cost for tts call is None. prompt_cost={}, completion_cost={}, model={}, custom_llm_provider={}, prompt_characters={}, completion_characters={}".format( prompt_cost, completion_cost, model_without_prefix, custom_llm_provider, prompt_characters, completion_characters, ) ) return prompt_cost, completion_cost elif call_type == "arerank" or call_type == "rerank": return rerank_cost( model=model, custom_llm_provider=custom_llm_provider, ) elif call_type == "atranscription" or call_type == "transcription": return openai_cost_per_second( model=model, custom_llm_provider=custom_llm_provider, duration=audio_transcription_file_duration, ) elif custom_llm_provider == "vertex_ai": cost_router = google_cost_router( model=model_without_prefix, custom_llm_provider=custom_llm_provider, call_type=call_type, ) if cost_router == "cost_per_character": return google_cost_per_character( model=model_without_prefix, custom_llm_provider=custom_llm_provider, prompt_characters=prompt_characters, completion_characters=completion_characters, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) elif cost_router == "cost_per_token": return google_cost_per_token( model=model_without_prefix, custom_llm_provider=custom_llm_provider, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) elif custom_llm_provider == "anthropic": return anthropic_cost_per_token(model=model, usage=usage_block) elif custom_llm_provider == "openai": return openai_cost_per_token(model=model, usage=usage_block) elif custom_llm_provider == "databricks": return databricks_cost_per_token(model=model, usage=usage_block) elif custom_llm_provider == "fireworks_ai": return fireworks_ai_cost_per_token(model=model, usage=usage_block) elif custom_llm_provider == "azure": return azure_openai_cost_per_token( model=model, usage=usage_block, response_time_ms=response_time_ms ) elif custom_llm_provider == "gemini": return gemini_cost_per_token(model=model, usage=usage_block) elif custom_llm_provider == "deepseek": return deepseek_cost_per_token(model=model, usage=usage_block) else: model_info = _cached_get_model_info_helper( model=model, custom_llm_provider=custom_llm_provider ) if model_info["input_cost_per_token"] > 0: ## COST PER TOKEN ## prompt_tokens_cost_usd_dollar = ( model_info["input_cost_per_token"] * prompt_tokens ) elif ( model_info.get("input_cost_per_second", None) is not None and response_time_ms is not None ): verbose_logger.debug( "For model=%s - input_cost_per_second: %s; response time: %s", model, model_info.get("input_cost_per_second", None), response_time_ms, ) ## COST PER SECOND ## prompt_tokens_cost_usd_dollar = ( model_info["input_cost_per_second"] * response_time_ms / 1000 # type: ignore ) if model_info["output_cost_per_token"] > 0: completion_tokens_cost_usd_dollar = ( model_info["output_cost_per_token"] * completion_tokens ) elif ( model_info.get("output_cost_per_second", None) is not None and response_time_ms is not None ): verbose_logger.debug( "For model=%s - output_cost_per_second: %s; response time: %s", model, model_info.get("output_cost_per_second", None), response_time_ms, ) ## COST PER SECOND ## completion_tokens_cost_usd_dollar = ( model_info["output_cost_per_second"] * response_time_ms / 1000 # type: ignore ) verbose_logger.debug( "Returned custom cost for model=%s - prompt_tokens_cost_usd_dollar: %s, completion_tokens_cost_usd_dollar: %s", model, prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar, ) return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar def get_replicate_completion_pricing(completion_response: dict, total_time=0.0): # see https://replicate.com/pricing # for all litellm currently supported LLMs, almost all requests go to a100_80gb a100_80gb_price_per_second_public = ( 0.001400 # assume all calls sent to A100 80GB for now ) if total_time == 0.0: # total time is in ms start_time = completion_response.get("created", time.time()) end_time = getattr(completion_response, "ended", time.time()) total_time = end_time - start_time return a100_80gb_price_per_second_public * total_time / 1000 def has_hidden_params(obj: Any) -> bool: return hasattr(obj, "_hidden_params") def _get_provider_for_cost_calc( model: Optional[str], custom_llm_provider: Optional[str] = None, ) -> Optional[str]: if custom_llm_provider is not None: return custom_llm_provider if model is None: return None try: _, custom_llm_provider, _, _ = litellm.get_llm_provider(model=model) except Exception as e: verbose_logger.debug( f"litellm.cost_calculator.py::_get_provider_for_cost_calc() - Error inferring custom_llm_provider - {str(e)}" ) return None return custom_llm_provider def _select_model_name_for_cost_calc( model: Optional[str], completion_response: Optional[Any], base_model: Optional[str] = None, custom_pricing: Optional[bool] = None, custom_llm_provider: Optional[str] = None, ) -> Optional[str]: """ 1. If custom pricing is true, return received model name 2. If base_model is set (e.g. for azure models), return that 3. If completion response has model set return that 4. Check if model is passed in return that """ return_model: Optional[str] = None region_name: Optional[str] = None custom_llm_provider = _get_provider_for_cost_calc( model=model, custom_llm_provider=custom_llm_provider ) if custom_pricing is True: return_model = model if base_model is not None: return_model = base_model completion_response_model: Optional[str] = getattr( completion_response, "model", None ) hidden_params: Optional[dict] = getattr(completion_response, "_hidden_params", None) if completion_response_model is None and hidden_params is not None: if ( hidden_params.get("model", None) is not None and len(hidden_params["model"]) > 0 ): return_model = hidden_params.get("model", model) if hidden_params is not None and hidden_params.get("region_name", None) is not None: region_name = hidden_params.get("region_name", None) if return_model is None and completion_response_model is not None: return_model = completion_response_model if return_model is None and model is not None: return_model = model if ( return_model is not None and custom_llm_provider is not None and not _model_contains_known_llm_provider(return_model) ): # add provider prefix if not already present, to match model_cost if region_name is not None: return_model = f"{custom_llm_provider}/{region_name}/{return_model}" else: return_model = f"{custom_llm_provider}/{return_model}" return return_model @lru_cache(maxsize=16) def _model_contains_known_llm_provider(model: str) -> bool: """ Check if the model contains a known llm provider """ _provider_prefix = model.split("/")[0] return _provider_prefix in LlmProvidersSet def _get_usage_object( completion_response: Any, ) -> Optional[Usage]: usage_obj: Optional[Usage] = None if completion_response is not None and isinstance( completion_response, ModelResponse ): usage_obj = completion_response.get("usage") return usage_obj def _infer_call_type( call_type: Optional[CallTypesLiteral], completion_response: Any ) -> Optional[CallTypesLiteral]: if call_type is not None: return call_type if completion_response is None: return None if isinstance(completion_response, ModelResponse): return "completion" elif isinstance(completion_response, EmbeddingResponse): return "embedding" elif isinstance(completion_response, TranscriptionResponse): return "transcription" elif isinstance(completion_response, HttpxBinaryResponseContent): return "speech" elif isinstance(completion_response, RerankResponse): return "rerank" elif isinstance(completion_response, ImageResponse): return "image_generation" elif isinstance(completion_response, TextCompletionResponse): return "text_completion" return call_type def completion_cost( # noqa: PLR0915 completion_response=None, model: Optional[str] = None, prompt="", messages: List = [], completion="", total_time: Optional[float] = 0.0, # used for replicate, sagemaker call_type: Optional[CallTypesLiteral] = None, ### REGION ### custom_llm_provider=None, region_name=None, # used for bedrock pricing ### IMAGE GEN ### size: Optional[str] = None, quality: Optional[str] = None, n: Optional[int] = None, # number of images ### CUSTOM PRICING ### custom_cost_per_token: Optional[CostPerToken] = None, custom_cost_per_second: Optional[float] = None, optional_params: Optional[dict] = None, custom_pricing: Optional[bool] = None, base_model: Optional[str] = None, ) -> float: """ Calculate the cost of a given completion call fot GPT-3.5-turbo, llama2, any litellm supported llm. Parameters: completion_response (litellm.ModelResponses): [Required] The response received from a LiteLLM completion request. [OPTIONAL PARAMS] model (str): Optional. The name of the language model used in the completion calls prompt (str): Optional. The input prompt passed to the llm completion (str): Optional. The output completion text from the llm total_time (float, int): Optional. (Only used for Replicate LLMs) The total time used for the request in seconds custom_cost_per_token: Optional[CostPerToken]: the cost per input + output token for the llm api call. custom_cost_per_second: Optional[float]: the cost per second for the llm api call. Returns: float: The cost in USD dollars for the completion based on the provided parameters. Exceptions: Raises exception if model not in the litellm model cost map. Register model, via custom pricing or PR - https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json Note: - If completion_response is provided, the function extracts token information and the model name from it. - If completion_response is not provided, the function calculates token counts based on the model and input text. - The cost is calculated based on the model, prompt tokens, and completion tokens. - For certain models containing "togethercomputer" in the name, prices are based on the model size. - For un-mapped Replicate models, the cost is calculated based on the total time used for the request. """ try: call_type = _infer_call_type(call_type, completion_response) or "completion" if ( (call_type == "aimage_generation" or call_type == "image_generation") and model is not None and isinstance(model, str) and len(model) == 0 and custom_llm_provider == "azure" ): model = "dall-e-2" # for dall-e-2, azure expects an empty model name # Handle Inputs to completion_cost prompt_tokens = 0 prompt_characters: Optional[int] = None completion_tokens = 0 completion_characters: Optional[int] = None cache_creation_input_tokens: Optional[int] = None cache_read_input_tokens: Optional[int] = None audio_transcription_file_duration: float = 0.0 cost_per_token_usage_object: Optional[Usage] = _get_usage_object( completion_response=completion_response ) model = _select_model_name_for_cost_calc( model=model, completion_response=completion_response, custom_llm_provider=custom_llm_provider, custom_pricing=custom_pricing, base_model=base_model, ) verbose_logger.debug( f"completion_response _select_model_name_for_cost_calc: {model}" ) if completion_response is not None and ( isinstance(completion_response, BaseModel) or isinstance(completion_response, dict) ): # tts returns a custom class if isinstance(completion_response, dict): usage_obj: Optional[Union[dict, Usage]] = completion_response.get( "usage", {} ) else: usage_obj = getattr(completion_response, "usage", {}) if isinstance(usage_obj, BaseModel) and not isinstance( usage_obj, litellm.Usage ): setattr( completion_response, "usage", litellm.Usage(**usage_obj.model_dump()), ) if usage_obj is None: _usage = {} elif isinstance(usage_obj, BaseModel): _usage = usage_obj.model_dump() else: _usage = usage_obj # get input/output tokens from completion_response prompt_tokens = _usage.get("prompt_tokens", 0) completion_tokens = _usage.get("completion_tokens", 0) cache_creation_input_tokens = _usage.get("cache_creation_input_tokens", 0) cache_read_input_tokens = _usage.get("cache_read_input_tokens", 0) if ( "prompt_tokens_details" in _usage and _usage["prompt_tokens_details"] != {} and _usage["prompt_tokens_details"] ): prompt_tokens_details = _usage.get("prompt_tokens_details", {}) cache_read_input_tokens = prompt_tokens_details.get("cached_tokens", 0) total_time = getattr(completion_response, "_response_ms", 0) hidden_params = getattr(completion_response, "_hidden_params", None) if hidden_params is not None: custom_llm_provider = hidden_params.get( "custom_llm_provider", custom_llm_provider or None ) region_name = hidden_params.get("region_name", region_name) size = hidden_params.get("optional_params", {}).get( "size", "1024-x-1024" ) # openai default quality = hidden_params.get("optional_params", {}).get( "quality", "standard" ) # openai default n = hidden_params.get("optional_params", {}).get( "n", 1 ) # openai default else: if model is None: raise ValueError( f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}" ) if len(messages) > 0: prompt_tokens = token_counter(model=model, messages=messages) elif len(prompt) > 0: prompt_tokens = token_counter(model=model, text=prompt) completion_tokens = token_counter(model=model, text=completion) if model is None: raise ValueError( f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}" ) if custom_llm_provider is None: try: model, custom_llm_provider, _, _ = litellm.get_llm_provider( model=model ) # strip the llm provider from the model name -> for image gen cost calculation except Exception as e: verbose_logger.debug( "litellm.cost_calculator.py::completion_cost() - Error inferring custom_llm_provider - {}".format( str(e) ) ) if ( call_type == CallTypes.image_generation.value or call_type == CallTypes.aimage_generation.value or call_type == PassthroughCallTypes.passthrough_image_generation.value ): ### IMAGE GENERATION COST CALCULATION ### if custom_llm_provider == "vertex_ai": if isinstance(completion_response, ImageResponse): return vertex_ai_image_cost_calculator( model=model, image_response=completion_response, ) elif custom_llm_provider == "bedrock": if isinstance(completion_response, ImageResponse): return bedrock_image_cost_calculator( model=model, size=size, image_response=completion_response, optional_params=optional_params, ) raise TypeError( "completion_response must be of type ImageResponse for bedrock image cost calculation" ) else: return default_image_cost_calculator( model=model, quality=quality, custom_llm_provider=custom_llm_provider, n=n, size=size, optional_params=optional_params, ) elif ( call_type == CallTypes.speech.value or call_type == CallTypes.aspeech.value ): prompt_characters = litellm.utils._count_characters(text=prompt) elif ( call_type == CallTypes.atranscription.value or call_type == CallTypes.transcription.value ): audio_transcription_file_duration = getattr( completion_response, "duration", 0.0 ) elif ( call_type == CallTypes.rerank.value or call_type == CallTypes.arerank.value ): if completion_response is not None and isinstance( completion_response, RerankResponse ): meta_obj = completion_response.meta if meta_obj is not None: billed_units = meta_obj.get("billed_units", {}) or {} else: billed_units = {} search_units = ( billed_units.get("search_units") or 1 ) # cohere charges per request by default. completion_tokens = search_units # Calculate cost based on prompt_tokens, completion_tokens if ( "togethercomputer" in model or "together_ai" in model or custom_llm_provider == "together_ai" ): # together ai prices based on size of llm # get_model_params_and_category takes a model name and returns the category of LLM size it is in model_prices_and_context_window.json model = get_model_params_and_category(model, call_type=CallTypes(call_type)) # replicate llms are calculate based on time for request running # see https://replicate.com/pricing elif ( model in litellm.replicate_models or "replicate" in model ) and model not in litellm.model_cost: # for unmapped replicate model, default to replicate's time tracking logic return get_replicate_completion_pricing(completion_response, total_time) # type: ignore if model is None: raise ValueError( f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}" ) if custom_llm_provider is not None and custom_llm_provider == "vertex_ai": # Calculate the prompt characters + response characters if len(messages) > 0: prompt_string = litellm.utils.get_formatted_prompt( data={"messages": messages}, call_type="completion" ) prompt_characters = litellm.utils._count_characters(text=prompt_string) if completion_response is not None and isinstance( completion_response, ModelResponse ): completion_string = litellm.utils.get_response_string( response_obj=completion_response ) completion_characters = litellm.utils._count_characters( text=completion_string ) ( prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar, ) = cost_per_token( model=model, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, custom_llm_provider=custom_llm_provider, response_time_ms=total_time, region_name=region_name, custom_cost_per_second=custom_cost_per_second, custom_cost_per_token=custom_cost_per_token, prompt_characters=prompt_characters, completion_characters=completion_characters, cache_creation_input_tokens=cache_creation_input_tokens, cache_read_input_tokens=cache_read_input_tokens, usage_object=cost_per_token_usage_object, call_type=call_type, audio_transcription_file_duration=audio_transcription_file_duration, ) _final_cost = prompt_tokens_cost_usd_dollar + completion_tokens_cost_usd_dollar return _final_cost except Exception as e: raise e def response_cost_calculator( response_object: Union[ ModelResponse, EmbeddingResponse, ImageResponse, TranscriptionResponse, TextCompletionResponse, HttpxBinaryResponseContent, RerankResponse, ], model: str, custom_llm_provider: Optional[str], call_type: Literal[ "embedding", "aembedding", "completion", "acompletion", "atext_completion", "text_completion", "image_generation", "aimage_generation", "moderation", "amoderation", "atranscription", "transcription", "aspeech", "speech", "rerank", "arerank", ], optional_params: dict, cache_hit: Optional[bool] = None, base_model: Optional[str] = None, custom_pricing: Optional[bool] = None, prompt: str = "", ) -> Optional[float]: """ Returns - float or None: cost of response """ try: response_cost: float = 0.0 if cache_hit is not None and cache_hit is True: response_cost = 0.0 else: if isinstance(response_object, BaseModel): response_object._hidden_params["optional_params"] = optional_params response_cost = completion_cost( completion_response=response_object, model=model, call_type=call_type, custom_llm_provider=custom_llm_provider, optional_params=optional_params, custom_pricing=custom_pricing, base_model=base_model, prompt=prompt, ) return response_cost except Exception as e: raise e def rerank_cost( model: str, custom_llm_provider: Optional[str], ) -> Tuple[float, float]: """ Returns - float or None: cost of response OR none if error. """ default_num_queries = 1 _, custom_llm_provider, _, _ = litellm.get_llm_provider( model=model, custom_llm_provider=custom_llm_provider ) try: if custom_llm_provider == "cohere": return cohere_rerank_cost_per_query( model=model, num_queries=default_num_queries ) elif custom_llm_provider == "azure_ai": return azure_ai_rerank_cost_per_query( model=model, num_queries=default_num_queries ) raise ValueError( f"invalid custom_llm_provider for rerank model: {model}, custom_llm_provider: {custom_llm_provider}" ) except Exception as e: raise e def transcription_cost( model: str, custom_llm_provider: Optional[str], duration: float ) -> Tuple[float, float]: return openai_cost_per_second( model=model, custom_llm_provider=custom_llm_provider, duration=duration ) def default_image_cost_calculator( model: str, custom_llm_provider: Optional[str] = None, quality: Optional[str] = None, n: Optional[int] = 1, # Default to 1 image size: Optional[str] = "1024-x-1024", # OpenAI default optional_params: Optional[dict] = None, ) -> float: """ Default image cost calculator for image generation Args: model (str): Model name image_response (ImageResponse): Response from image generation quality (Optional[str]): Image quality setting n (Optional[int]): Number of images generated size (Optional[str]): Image size (e.g. "1024x1024" or "1024-x-1024") Returns: float: Cost in USD for the image generation Raises: Exception: If model pricing not found in cost map """ # Standardize size format to use "-x-" size_str: str = size or "1024-x-1024" size_str = ( size_str.replace("x", "-x-") if "x" in size_str and "-x-" not in size_str else size_str ) # Parse dimensions height, width = map(int, size_str.split("-x-")) # Build model names for cost lookup base_model_name = f"{size_str}/{model}" if custom_llm_provider and model.startswith(custom_llm_provider): base_model_name = ( f"{custom_llm_provider}/{size_str}/{model.replace(custom_llm_provider, '')}" ) model_name_with_quality = ( f"{quality}/{base_model_name}" if quality else base_model_name ) verbose_logger.debug( f"Looking up cost for models: {model_name_with_quality}, {base_model_name}" ) # Try model with quality first, fall back to base model name if model_name_with_quality in litellm.model_cost: cost_info = litellm.model_cost[model_name_with_quality] elif base_model_name in litellm.model_cost: cost_info = litellm.model_cost[base_model_name] else: # Try without provider prefix model_without_provider = f"{size_str}/{model.split('/')[-1]}" model_with_quality_without_provider = ( f"{quality}/{model_without_provider}" if quality else model_without_provider ) if model_with_quality_without_provider in litellm.model_cost: cost_info = litellm.model_cost[model_with_quality_without_provider] elif model_without_provider in litellm.model_cost: cost_info = litellm.model_cost[model_without_provider] else: raise Exception( f"Model not found in cost map. Tried {model_name_with_quality}, {base_model_name}, {model_with_quality_without_provider}, and {model_without_provider}" ) return cost_info["input_cost_per_pixel"] * height * width * n