TestLLM / litellm /cost_calculator.py
Raju2024's picture
Upload 1072 files
e3278e4 verified
# What is this?
## File for 'response_cost' calculation in Logging
import time
from functools import lru_cache
from typing import Any, List, Literal, Optional, Tuple, Union
from pydantic import BaseModel
import litellm
import litellm._logging
from litellm import verbose_logger
from litellm.litellm_core_utils.llm_cost_calc.utils import _generic_cost_per_character
from litellm.llms.anthropic.cost_calculation import (
cost_per_token as anthropic_cost_per_token,
)
from litellm.llms.azure.cost_calculation import (
cost_per_token as azure_openai_cost_per_token,
)
from litellm.llms.azure_ai.cost_calculator import (
cost_per_query as azure_ai_rerank_cost_per_query,
)
from litellm.llms.bedrock.image.cost_calculator import (
cost_calculator as bedrock_image_cost_calculator,
)
from litellm.llms.cohere.cost_calculator import (
cost_per_query as cohere_rerank_cost_per_query,
)
from litellm.llms.databricks.cost_calculator import (
cost_per_token as databricks_cost_per_token,
)
from litellm.llms.deepseek.cost_calculator import (
cost_per_token as deepseek_cost_per_token,
)
from litellm.llms.fireworks_ai.cost_calculator import (
cost_per_token as fireworks_ai_cost_per_token,
)
from litellm.llms.gemini.cost_calculator import cost_per_token as gemini_cost_per_token
from litellm.llms.openai.cost_calculation import (
cost_per_second as openai_cost_per_second,
)
from litellm.llms.openai.cost_calculation import cost_per_token as openai_cost_per_token
from litellm.llms.together_ai.cost_calculator import get_model_params_and_category
from litellm.llms.vertex_ai.cost_calculator import (
cost_per_character as google_cost_per_character,
)
from litellm.llms.vertex_ai.cost_calculator import (
cost_per_token as google_cost_per_token,
)
from litellm.llms.vertex_ai.cost_calculator import cost_router as google_cost_router
from litellm.llms.vertex_ai.image_generation.cost_calculator import (
cost_calculator as vertex_ai_image_cost_calculator,
)
from litellm.types.llms.openai import HttpxBinaryResponseContent
from litellm.types.rerank import RerankResponse
from litellm.types.utils import (
CallTypesLiteral,
LlmProvidersSet,
PassthroughCallTypes,
Usage,
)
from litellm.utils import (
CallTypes,
CostPerToken,
EmbeddingResponse,
ImageResponse,
ModelResponse,
TextCompletionResponse,
TranscriptionResponse,
_cached_get_model_info_helper,
token_counter,
)
def _cost_per_token_custom_pricing_helper(
prompt_tokens: float = 0,
completion_tokens: float = 0,
response_time_ms: Optional[float] = 0.0,
### CUSTOM PRICING ###
custom_cost_per_token: Optional[CostPerToken] = None,
custom_cost_per_second: Optional[float] = None,
) -> Optional[Tuple[float, float]]:
"""Internal helper function for calculating cost, if custom pricing given"""
if custom_cost_per_token is None and custom_cost_per_second is None:
return None
if custom_cost_per_token is not None:
input_cost = custom_cost_per_token["input_cost_per_token"] * prompt_tokens
output_cost = custom_cost_per_token["output_cost_per_token"] * completion_tokens
return input_cost, output_cost
elif custom_cost_per_second is not None:
output_cost = custom_cost_per_second * response_time_ms / 1000 # type: ignore
return 0, output_cost
return None
def cost_per_token( # noqa: PLR0915
model: str = "",
prompt_tokens: int = 0,
completion_tokens: int = 0,
response_time_ms: Optional[float] = 0.0,
custom_llm_provider: Optional[str] = None,
region_name=None,
### CHARACTER PRICING ###
prompt_characters: Optional[int] = None,
completion_characters: Optional[int] = None,
### PROMPT CACHING PRICING ### - used for anthropic
cache_creation_input_tokens: Optional[int] = 0,
cache_read_input_tokens: Optional[int] = 0,
### CUSTOM PRICING ###
custom_cost_per_token: Optional[CostPerToken] = None,
custom_cost_per_second: Optional[float] = None,
### NUMBER OF QUERIES ###
number_of_queries: Optional[int] = None,
### USAGE OBJECT ###
usage_object: Optional[Usage] = None, # just read the usage object if provided
### CALL TYPE ###
call_type: CallTypesLiteral = "completion",
audio_transcription_file_duration: float = 0.0, # for audio transcription calls - the file time in seconds
) -> Tuple[float, float]: # type: ignore
"""
Calculates the cost per token for a given model, prompt tokens, and completion tokens.
Parameters:
model (str): The name of the model to use. Default is ""
prompt_tokens (int): The number of tokens in the prompt.
completion_tokens (int): The number of tokens in the completion.
response_time (float): The amount of time, in milliseconds, it took the call to complete.
prompt_characters (float): The number of characters in the prompt. Used for vertex ai cost calculation.
completion_characters (float): The number of characters in the completion response. Used for vertex ai cost calculation.
custom_llm_provider (str): The llm provider to whom the call was made (see init.py for full list)
custom_cost_per_token: Optional[CostPerToken]: the cost per input + output token for the llm api call.
custom_cost_per_second: Optional[float]: the cost per second for the llm api call.
call_type: Optional[str]: the call type
Returns:
tuple: A tuple containing the cost in USD dollars for prompt tokens and completion tokens, respectively.
"""
if model is None:
raise Exception("Invalid arg. Model cannot be none.")
## RECONSTRUCT USAGE BLOCK ##
if usage_object is not None:
usage_block = usage_object
else:
usage_block = Usage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
cache_creation_input_tokens=cache_creation_input_tokens,
cache_read_input_tokens=cache_read_input_tokens,
)
## CUSTOM PRICING ##
response_cost = _cost_per_token_custom_pricing_helper(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
response_time_ms=response_time_ms,
custom_cost_per_second=custom_cost_per_second,
custom_cost_per_token=custom_cost_per_token,
)
if response_cost is not None:
return response_cost[0], response_cost[1]
# given
prompt_tokens_cost_usd_dollar: float = 0
completion_tokens_cost_usd_dollar: float = 0
model_cost_ref = litellm.model_cost
model_with_provider = model
if custom_llm_provider is not None:
model_with_provider = custom_llm_provider + "/" + model
if region_name is not None:
model_with_provider_and_region = (
f"{custom_llm_provider}/{region_name}/{model}"
)
if (
model_with_provider_and_region in model_cost_ref
): # use region based pricing, if it's available
model_with_provider = model_with_provider_and_region
else:
_, custom_llm_provider, _, _ = litellm.get_llm_provider(model=model)
model_without_prefix = model
model_parts = model.split("/", 1)
if len(model_parts) > 1:
model_without_prefix = model_parts[1]
else:
model_without_prefix = model
"""
Code block that formats model to lookup in litellm.model_cost
Option1. model = "bedrock/ap-northeast-1/anthropic.claude-instant-v1". This is the most accurate since it is region based. Should always be option 1
Option2. model = "openai/gpt-4" - model = provider/model
Option3. model = "anthropic.claude-3" - model = model
"""
if (
model_with_provider in model_cost_ref
): # Option 2. use model with provider, model = "openai/gpt-4"
model = model_with_provider
elif model in model_cost_ref: # Option 1. use model passed, model="gpt-4"
model = model
elif (
model_without_prefix in model_cost_ref
): # Option 3. if user passed model="bedrock/anthropic.claude-3", use model="anthropic.claude-3"
model = model_without_prefix
# see this https://learn.microsoft.com/en-us/azure/ai-services/openai/concepts/models
if call_type == "speech" or call_type == "aspeech":
if prompt_characters is None:
raise ValueError(
"prompt_characters must be provided for tts calls. prompt_characters={}, model={}, custom_llm_provider={}, call_type={}".format(
prompt_characters,
model,
custom_llm_provider,
call_type,
)
)
prompt_cost, completion_cost = _generic_cost_per_character(
model=model_without_prefix,
custom_llm_provider=custom_llm_provider,
prompt_characters=prompt_characters,
completion_characters=0,
custom_prompt_cost=None,
custom_completion_cost=0,
)
if prompt_cost is None or completion_cost is None:
raise ValueError(
"cost for tts call is None. prompt_cost={}, completion_cost={}, model={}, custom_llm_provider={}, prompt_characters={}, completion_characters={}".format(
prompt_cost,
completion_cost,
model_without_prefix,
custom_llm_provider,
prompt_characters,
completion_characters,
)
)
return prompt_cost, completion_cost
elif call_type == "arerank" or call_type == "rerank":
return rerank_cost(
model=model,
custom_llm_provider=custom_llm_provider,
)
elif call_type == "atranscription" or call_type == "transcription":
return openai_cost_per_second(
model=model,
custom_llm_provider=custom_llm_provider,
duration=audio_transcription_file_duration,
)
elif custom_llm_provider == "vertex_ai":
cost_router = google_cost_router(
model=model_without_prefix,
custom_llm_provider=custom_llm_provider,
call_type=call_type,
)
if cost_router == "cost_per_character":
return google_cost_per_character(
model=model_without_prefix,
custom_llm_provider=custom_llm_provider,
prompt_characters=prompt_characters,
completion_characters=completion_characters,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
)
elif cost_router == "cost_per_token":
return google_cost_per_token(
model=model_without_prefix,
custom_llm_provider=custom_llm_provider,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
)
elif custom_llm_provider == "anthropic":
return anthropic_cost_per_token(model=model, usage=usage_block)
elif custom_llm_provider == "openai":
return openai_cost_per_token(model=model, usage=usage_block)
elif custom_llm_provider == "databricks":
return databricks_cost_per_token(model=model, usage=usage_block)
elif custom_llm_provider == "fireworks_ai":
return fireworks_ai_cost_per_token(model=model, usage=usage_block)
elif custom_llm_provider == "azure":
return azure_openai_cost_per_token(
model=model, usage=usage_block, response_time_ms=response_time_ms
)
elif custom_llm_provider == "gemini":
return gemini_cost_per_token(model=model, usage=usage_block)
elif custom_llm_provider == "deepseek":
return deepseek_cost_per_token(model=model, usage=usage_block)
else:
model_info = _cached_get_model_info_helper(
model=model, custom_llm_provider=custom_llm_provider
)
if model_info["input_cost_per_token"] > 0:
## COST PER TOKEN ##
prompt_tokens_cost_usd_dollar = (
model_info["input_cost_per_token"] * prompt_tokens
)
elif (
model_info.get("input_cost_per_second", None) is not None
and response_time_ms is not None
):
verbose_logger.debug(
"For model=%s - input_cost_per_second: %s; response time: %s",
model,
model_info.get("input_cost_per_second", None),
response_time_ms,
)
## COST PER SECOND ##
prompt_tokens_cost_usd_dollar = (
model_info["input_cost_per_second"] * response_time_ms / 1000 # type: ignore
)
if model_info["output_cost_per_token"] > 0:
completion_tokens_cost_usd_dollar = (
model_info["output_cost_per_token"] * completion_tokens
)
elif (
model_info.get("output_cost_per_second", None) is not None
and response_time_ms is not None
):
verbose_logger.debug(
"For model=%s - output_cost_per_second: %s; response time: %s",
model,
model_info.get("output_cost_per_second", None),
response_time_ms,
)
## COST PER SECOND ##
completion_tokens_cost_usd_dollar = (
model_info["output_cost_per_second"] * response_time_ms / 1000 # type: ignore
)
verbose_logger.debug(
"Returned custom cost for model=%s - prompt_tokens_cost_usd_dollar: %s, completion_tokens_cost_usd_dollar: %s",
model,
prompt_tokens_cost_usd_dollar,
completion_tokens_cost_usd_dollar,
)
return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar
def get_replicate_completion_pricing(completion_response: dict, total_time=0.0):
# see https://replicate.com/pricing
# for all litellm currently supported LLMs, almost all requests go to a100_80gb
a100_80gb_price_per_second_public = (
0.001400 # assume all calls sent to A100 80GB for now
)
if total_time == 0.0: # total time is in ms
start_time = completion_response.get("created", time.time())
end_time = getattr(completion_response, "ended", time.time())
total_time = end_time - start_time
return a100_80gb_price_per_second_public * total_time / 1000
def has_hidden_params(obj: Any) -> bool:
return hasattr(obj, "_hidden_params")
def _get_provider_for_cost_calc(
model: Optional[str],
custom_llm_provider: Optional[str] = None,
) -> Optional[str]:
if custom_llm_provider is not None:
return custom_llm_provider
if model is None:
return None
try:
_, custom_llm_provider, _, _ = litellm.get_llm_provider(model=model)
except Exception as e:
verbose_logger.debug(
f"litellm.cost_calculator.py::_get_provider_for_cost_calc() - Error inferring custom_llm_provider - {str(e)}"
)
return None
return custom_llm_provider
def _select_model_name_for_cost_calc(
model: Optional[str],
completion_response: Optional[Any],
base_model: Optional[str] = None,
custom_pricing: Optional[bool] = None,
custom_llm_provider: Optional[str] = None,
) -> Optional[str]:
"""
1. If custom pricing is true, return received model name
2. If base_model is set (e.g. for azure models), return that
3. If completion response has model set return that
4. Check if model is passed in return that
"""
return_model: Optional[str] = None
region_name: Optional[str] = None
custom_llm_provider = _get_provider_for_cost_calc(
model=model, custom_llm_provider=custom_llm_provider
)
if custom_pricing is True:
return_model = model
if base_model is not None:
return_model = base_model
completion_response_model: Optional[str] = getattr(
completion_response, "model", None
)
hidden_params: Optional[dict] = getattr(completion_response, "_hidden_params", None)
if completion_response_model is None and hidden_params is not None:
if (
hidden_params.get("model", None) is not None
and len(hidden_params["model"]) > 0
):
return_model = hidden_params.get("model", model)
if hidden_params is not None and hidden_params.get("region_name", None) is not None:
region_name = hidden_params.get("region_name", None)
if return_model is None and completion_response_model is not None:
return_model = completion_response_model
if return_model is None and model is not None:
return_model = model
if (
return_model is not None
and custom_llm_provider is not None
and not _model_contains_known_llm_provider(return_model)
): # add provider prefix if not already present, to match model_cost
if region_name is not None:
return_model = f"{custom_llm_provider}/{region_name}/{return_model}"
else:
return_model = f"{custom_llm_provider}/{return_model}"
return return_model
@lru_cache(maxsize=16)
def _model_contains_known_llm_provider(model: str) -> bool:
"""
Check if the model contains a known llm provider
"""
_provider_prefix = model.split("/")[0]
return _provider_prefix in LlmProvidersSet
def _get_usage_object(
completion_response: Any,
) -> Optional[Usage]:
usage_obj: Optional[Usage] = None
if completion_response is not None and isinstance(
completion_response, ModelResponse
):
usage_obj = completion_response.get("usage")
return usage_obj
def _infer_call_type(
call_type: Optional[CallTypesLiteral], completion_response: Any
) -> Optional[CallTypesLiteral]:
if call_type is not None:
return call_type
if completion_response is None:
return None
if isinstance(completion_response, ModelResponse):
return "completion"
elif isinstance(completion_response, EmbeddingResponse):
return "embedding"
elif isinstance(completion_response, TranscriptionResponse):
return "transcription"
elif isinstance(completion_response, HttpxBinaryResponseContent):
return "speech"
elif isinstance(completion_response, RerankResponse):
return "rerank"
elif isinstance(completion_response, ImageResponse):
return "image_generation"
elif isinstance(completion_response, TextCompletionResponse):
return "text_completion"
return call_type
def completion_cost( # noqa: PLR0915
completion_response=None,
model: Optional[str] = None,
prompt="",
messages: List = [],
completion="",
total_time: Optional[float] = 0.0, # used for replicate, sagemaker
call_type: Optional[CallTypesLiteral] = None,
### REGION ###
custom_llm_provider=None,
region_name=None, # used for bedrock pricing
### IMAGE GEN ###
size: Optional[str] = None,
quality: Optional[str] = None,
n: Optional[int] = None, # number of images
### CUSTOM PRICING ###
custom_cost_per_token: Optional[CostPerToken] = None,
custom_cost_per_second: Optional[float] = None,
optional_params: Optional[dict] = None,
custom_pricing: Optional[bool] = None,
base_model: Optional[str] = None,
) -> float:
"""
Calculate the cost of a given completion call fot GPT-3.5-turbo, llama2, any litellm supported llm.
Parameters:
completion_response (litellm.ModelResponses): [Required] The response received from a LiteLLM completion request.
[OPTIONAL PARAMS]
model (str): Optional. The name of the language model used in the completion calls
prompt (str): Optional. The input prompt passed to the llm
completion (str): Optional. The output completion text from the llm
total_time (float, int): Optional. (Only used for Replicate LLMs) The total time used for the request in seconds
custom_cost_per_token: Optional[CostPerToken]: the cost per input + output token for the llm api call.
custom_cost_per_second: Optional[float]: the cost per second for the llm api call.
Returns:
float: The cost in USD dollars for the completion based on the provided parameters.
Exceptions:
Raises exception if model not in the litellm model cost map. Register model, via custom pricing or PR - https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json
Note:
- If completion_response is provided, the function extracts token information and the model name from it.
- If completion_response is not provided, the function calculates token counts based on the model and input text.
- The cost is calculated based on the model, prompt tokens, and completion tokens.
- For certain models containing "togethercomputer" in the name, prices are based on the model size.
- For un-mapped Replicate models, the cost is calculated based on the total time used for the request.
"""
try:
call_type = _infer_call_type(call_type, completion_response) or "completion"
if (
(call_type == "aimage_generation" or call_type == "image_generation")
and model is not None
and isinstance(model, str)
and len(model) == 0
and custom_llm_provider == "azure"
):
model = "dall-e-2" # for dall-e-2, azure expects an empty model name
# Handle Inputs to completion_cost
prompt_tokens = 0
prompt_characters: Optional[int] = None
completion_tokens = 0
completion_characters: Optional[int] = None
cache_creation_input_tokens: Optional[int] = None
cache_read_input_tokens: Optional[int] = None
audio_transcription_file_duration: float = 0.0
cost_per_token_usage_object: Optional[Usage] = _get_usage_object(
completion_response=completion_response
)
model = _select_model_name_for_cost_calc(
model=model,
completion_response=completion_response,
custom_llm_provider=custom_llm_provider,
custom_pricing=custom_pricing,
base_model=base_model,
)
verbose_logger.debug(
f"completion_response _select_model_name_for_cost_calc: {model}"
)
if completion_response is not None and (
isinstance(completion_response, BaseModel)
or isinstance(completion_response, dict)
): # tts returns a custom class
if isinstance(completion_response, dict):
usage_obj: Optional[Union[dict, Usage]] = completion_response.get(
"usage", {}
)
else:
usage_obj = getattr(completion_response, "usage", {})
if isinstance(usage_obj, BaseModel) and not isinstance(
usage_obj, litellm.Usage
):
setattr(
completion_response,
"usage",
litellm.Usage(**usage_obj.model_dump()),
)
if usage_obj is None:
_usage = {}
elif isinstance(usage_obj, BaseModel):
_usage = usage_obj.model_dump()
else:
_usage = usage_obj
# get input/output tokens from completion_response
prompt_tokens = _usage.get("prompt_tokens", 0)
completion_tokens = _usage.get("completion_tokens", 0)
cache_creation_input_tokens = _usage.get("cache_creation_input_tokens", 0)
cache_read_input_tokens = _usage.get("cache_read_input_tokens", 0)
if (
"prompt_tokens_details" in _usage
and _usage["prompt_tokens_details"] != {}
and _usage["prompt_tokens_details"]
):
prompt_tokens_details = _usage.get("prompt_tokens_details", {})
cache_read_input_tokens = prompt_tokens_details.get("cached_tokens", 0)
total_time = getattr(completion_response, "_response_ms", 0)
hidden_params = getattr(completion_response, "_hidden_params", None)
if hidden_params is not None:
custom_llm_provider = hidden_params.get(
"custom_llm_provider", custom_llm_provider or None
)
region_name = hidden_params.get("region_name", region_name)
size = hidden_params.get("optional_params", {}).get(
"size", "1024-x-1024"
) # openai default
quality = hidden_params.get("optional_params", {}).get(
"quality", "standard"
) # openai default
n = hidden_params.get("optional_params", {}).get(
"n", 1
) # openai default
else:
if model is None:
raise ValueError(
f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}"
)
if len(messages) > 0:
prompt_tokens = token_counter(model=model, messages=messages)
elif len(prompt) > 0:
prompt_tokens = token_counter(model=model, text=prompt)
completion_tokens = token_counter(model=model, text=completion)
if model is None:
raise ValueError(
f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}"
)
if custom_llm_provider is None:
try:
model, custom_llm_provider, _, _ = litellm.get_llm_provider(
model=model
) # strip the llm provider from the model name -> for image gen cost calculation
except Exception as e:
verbose_logger.debug(
"litellm.cost_calculator.py::completion_cost() - Error inferring custom_llm_provider - {}".format(
str(e)
)
)
if (
call_type == CallTypes.image_generation.value
or call_type == CallTypes.aimage_generation.value
or call_type == PassthroughCallTypes.passthrough_image_generation.value
):
### IMAGE GENERATION COST CALCULATION ###
if custom_llm_provider == "vertex_ai":
if isinstance(completion_response, ImageResponse):
return vertex_ai_image_cost_calculator(
model=model,
image_response=completion_response,
)
elif custom_llm_provider == "bedrock":
if isinstance(completion_response, ImageResponse):
return bedrock_image_cost_calculator(
model=model,
size=size,
image_response=completion_response,
optional_params=optional_params,
)
raise TypeError(
"completion_response must be of type ImageResponse for bedrock image cost calculation"
)
else:
return default_image_cost_calculator(
model=model,
quality=quality,
custom_llm_provider=custom_llm_provider,
n=n,
size=size,
optional_params=optional_params,
)
elif (
call_type == CallTypes.speech.value or call_type == CallTypes.aspeech.value
):
prompt_characters = litellm.utils._count_characters(text=prompt)
elif (
call_type == CallTypes.atranscription.value
or call_type == CallTypes.transcription.value
):
audio_transcription_file_duration = getattr(
completion_response, "duration", 0.0
)
elif (
call_type == CallTypes.rerank.value or call_type == CallTypes.arerank.value
):
if completion_response is not None and isinstance(
completion_response, RerankResponse
):
meta_obj = completion_response.meta
if meta_obj is not None:
billed_units = meta_obj.get("billed_units", {}) or {}
else:
billed_units = {}
search_units = (
billed_units.get("search_units") or 1
) # cohere charges per request by default.
completion_tokens = search_units
# Calculate cost based on prompt_tokens, completion_tokens
if (
"togethercomputer" in model
or "together_ai" in model
or custom_llm_provider == "together_ai"
):
# together ai prices based on size of llm
# get_model_params_and_category takes a model name and returns the category of LLM size it is in model_prices_and_context_window.json
model = get_model_params_and_category(model, call_type=CallTypes(call_type))
# replicate llms are calculate based on time for request running
# see https://replicate.com/pricing
elif (
model in litellm.replicate_models or "replicate" in model
) and model not in litellm.model_cost:
# for unmapped replicate model, default to replicate's time tracking logic
return get_replicate_completion_pricing(completion_response, total_time) # type: ignore
if model is None:
raise ValueError(
f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}"
)
if custom_llm_provider is not None and custom_llm_provider == "vertex_ai":
# Calculate the prompt characters + response characters
if len(messages) > 0:
prompt_string = litellm.utils.get_formatted_prompt(
data={"messages": messages}, call_type="completion"
)
prompt_characters = litellm.utils._count_characters(text=prompt_string)
if completion_response is not None and isinstance(
completion_response, ModelResponse
):
completion_string = litellm.utils.get_response_string(
response_obj=completion_response
)
completion_characters = litellm.utils._count_characters(
text=completion_string
)
(
prompt_tokens_cost_usd_dollar,
completion_tokens_cost_usd_dollar,
) = cost_per_token(
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
custom_llm_provider=custom_llm_provider,
response_time_ms=total_time,
region_name=region_name,
custom_cost_per_second=custom_cost_per_second,
custom_cost_per_token=custom_cost_per_token,
prompt_characters=prompt_characters,
completion_characters=completion_characters,
cache_creation_input_tokens=cache_creation_input_tokens,
cache_read_input_tokens=cache_read_input_tokens,
usage_object=cost_per_token_usage_object,
call_type=call_type,
audio_transcription_file_duration=audio_transcription_file_duration,
)
_final_cost = prompt_tokens_cost_usd_dollar + completion_tokens_cost_usd_dollar
return _final_cost
except Exception as e:
raise e
def response_cost_calculator(
response_object: Union[
ModelResponse,
EmbeddingResponse,
ImageResponse,
TranscriptionResponse,
TextCompletionResponse,
HttpxBinaryResponseContent,
RerankResponse,
],
model: str,
custom_llm_provider: Optional[str],
call_type: Literal[
"embedding",
"aembedding",
"completion",
"acompletion",
"atext_completion",
"text_completion",
"image_generation",
"aimage_generation",
"moderation",
"amoderation",
"atranscription",
"transcription",
"aspeech",
"speech",
"rerank",
"arerank",
],
optional_params: dict,
cache_hit: Optional[bool] = None,
base_model: Optional[str] = None,
custom_pricing: Optional[bool] = None,
prompt: str = "",
) -> Optional[float]:
"""
Returns
- float or None: cost of response
"""
try:
response_cost: float = 0.0
if cache_hit is not None and cache_hit is True:
response_cost = 0.0
else:
if isinstance(response_object, BaseModel):
response_object._hidden_params["optional_params"] = optional_params
response_cost = completion_cost(
completion_response=response_object,
model=model,
call_type=call_type,
custom_llm_provider=custom_llm_provider,
optional_params=optional_params,
custom_pricing=custom_pricing,
base_model=base_model,
prompt=prompt,
)
return response_cost
except Exception as e:
raise e
def rerank_cost(
model: str,
custom_llm_provider: Optional[str],
) -> Tuple[float, float]:
"""
Returns
- float or None: cost of response OR none if error.
"""
default_num_queries = 1
_, custom_llm_provider, _, _ = litellm.get_llm_provider(
model=model, custom_llm_provider=custom_llm_provider
)
try:
if custom_llm_provider == "cohere":
return cohere_rerank_cost_per_query(
model=model, num_queries=default_num_queries
)
elif custom_llm_provider == "azure_ai":
return azure_ai_rerank_cost_per_query(
model=model, num_queries=default_num_queries
)
raise ValueError(
f"invalid custom_llm_provider for rerank model: {model}, custom_llm_provider: {custom_llm_provider}"
)
except Exception as e:
raise e
def transcription_cost(
model: str, custom_llm_provider: Optional[str], duration: float
) -> Tuple[float, float]:
return openai_cost_per_second(
model=model, custom_llm_provider=custom_llm_provider, duration=duration
)
def default_image_cost_calculator(
model: str,
custom_llm_provider: Optional[str] = None,
quality: Optional[str] = None,
n: Optional[int] = 1, # Default to 1 image
size: Optional[str] = "1024-x-1024", # OpenAI default
optional_params: Optional[dict] = None,
) -> float:
"""
Default image cost calculator for image generation
Args:
model (str): Model name
image_response (ImageResponse): Response from image generation
quality (Optional[str]): Image quality setting
n (Optional[int]): Number of images generated
size (Optional[str]): Image size (e.g. "1024x1024" or "1024-x-1024")
Returns:
float: Cost in USD for the image generation
Raises:
Exception: If model pricing not found in cost map
"""
# Standardize size format to use "-x-"
size_str: str = size or "1024-x-1024"
size_str = (
size_str.replace("x", "-x-")
if "x" in size_str and "-x-" not in size_str
else size_str
)
# Parse dimensions
height, width = map(int, size_str.split("-x-"))
# Build model names for cost lookup
base_model_name = f"{size_str}/{model}"
if custom_llm_provider and model.startswith(custom_llm_provider):
base_model_name = (
f"{custom_llm_provider}/{size_str}/{model.replace(custom_llm_provider, '')}"
)
model_name_with_quality = (
f"{quality}/{base_model_name}" if quality else base_model_name
)
verbose_logger.debug(
f"Looking up cost for models: {model_name_with_quality}, {base_model_name}"
)
# Try model with quality first, fall back to base model name
if model_name_with_quality in litellm.model_cost:
cost_info = litellm.model_cost[model_name_with_quality]
elif base_model_name in litellm.model_cost:
cost_info = litellm.model_cost[base_model_name]
else:
# Try without provider prefix
model_without_provider = f"{size_str}/{model.split('/')[-1]}"
model_with_quality_without_provider = (
f"{quality}/{model_without_provider}" if quality else model_without_provider
)
if model_with_quality_without_provider in litellm.model_cost:
cost_info = litellm.model_cost[model_with_quality_without_provider]
elif model_without_provider in litellm.model_cost:
cost_info = litellm.model_cost[model_without_provider]
else:
raise Exception(
f"Model not found in cost map. Tried {model_name_with_quality}, {base_model_name}, {model_with_quality_without_provider}, and {model_without_provider}"
)
return cost_info["input_cost_per_pixel"] * height * width * n