|
""" |
|
This is a file for the AWS Secret Manager Integration |
|
|
|
Handles Async Operations for: |
|
- Read Secret |
|
- Write Secret |
|
- Delete Secret |
|
|
|
Relevant issue: https://github.com/BerriAI/litellm/issues/1883 |
|
|
|
Requires: |
|
* `os.environ["AWS_REGION_NAME"], |
|
* `pip install boto3>=1.28.57` |
|
""" |
|
|
|
import json |
|
import os |
|
from typing import Any, Optional, Union |
|
|
|
import httpx |
|
|
|
import litellm |
|
from litellm._logging import verbose_logger |
|
from litellm.llms.bedrock.base_aws_llm import BaseAWSLLM |
|
from litellm.llms.custom_httpx.http_handler import ( |
|
_get_httpx_client, |
|
get_async_httpx_client, |
|
) |
|
from litellm.proxy._types import KeyManagementSystem |
|
from litellm.types.llms.custom_http import httpxSpecialProvider |
|
|
|
from .base_secret_manager import BaseSecretManager |
|
|
|
|
|
class AWSSecretsManagerV2(BaseAWSLLM, BaseSecretManager): |
|
def __init__(self, **kwargs): |
|
BaseSecretManager.__init__(self, **kwargs) |
|
BaseAWSLLM.__init__(self, **kwargs) |
|
|
|
@classmethod |
|
def validate_environment(cls): |
|
if "AWS_REGION_NAME" not in os.environ: |
|
raise ValueError("Missing required environment variable - AWS_REGION_NAME") |
|
|
|
@classmethod |
|
def load_aws_secret_manager(cls, use_aws_secret_manager: Optional[bool]): |
|
""" |
|
Initialize AWSSecretsManagerV2 and sets litellm.secret_manager_client = AWSSecretsManagerV2() and litellm._key_management_system = KeyManagementSystem.AWS_SECRET_MANAGER |
|
""" |
|
if use_aws_secret_manager is None or use_aws_secret_manager is False: |
|
return |
|
try: |
|
|
|
cls.validate_environment() |
|
litellm.secret_manager_client = cls() |
|
litellm._key_management_system = KeyManagementSystem.AWS_SECRET_MANAGER |
|
|
|
except Exception as e: |
|
raise e |
|
|
|
async def async_read_secret( |
|
self, |
|
secret_name: str, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> Optional[str]: |
|
""" |
|
Async function to read a secret from AWS Secrets Manager |
|
|
|
Returns: |
|
str: Secret value |
|
Raises: |
|
ValueError: If the secret is not found or an HTTP error occurs |
|
""" |
|
endpoint_url, headers, body = self._prepare_request( |
|
action="GetSecretValue", |
|
secret_name=secret_name, |
|
optional_params=optional_params, |
|
) |
|
|
|
async_client = get_async_httpx_client( |
|
llm_provider=httpxSpecialProvider.SecretManager, |
|
params={"timeout": timeout}, |
|
) |
|
|
|
try: |
|
response = await async_client.post( |
|
url=endpoint_url, headers=headers, data=body.decode("utf-8") |
|
) |
|
response.raise_for_status() |
|
return response.json()["SecretString"] |
|
except httpx.TimeoutException: |
|
raise ValueError("Timeout error occurred") |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"Error reading secret from AWS Secrets Manager: %s", str(e) |
|
) |
|
return None |
|
|
|
def sync_read_secret( |
|
self, |
|
secret_name: str, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> Optional[str]: |
|
""" |
|
Sync function to read a secret from AWS Secrets Manager |
|
|
|
Done for backwards compatibility with existing codebase, since get_secret is a sync function |
|
""" |
|
|
|
|
|
if secret_name in [ |
|
"AWS_ACCESS_KEY_ID", |
|
"AWS_SECRET_ACCESS_KEY", |
|
"AWS_REGION_NAME", |
|
"AWS_REGION", |
|
"AWS_BEDROCK_RUNTIME_ENDPOINT", |
|
]: |
|
return os.getenv(secret_name) |
|
|
|
endpoint_url, headers, body = self._prepare_request( |
|
action="GetSecretValue", |
|
secret_name=secret_name, |
|
optional_params=optional_params, |
|
) |
|
|
|
sync_client = _get_httpx_client( |
|
params={"timeout": timeout}, |
|
) |
|
|
|
try: |
|
response = sync_client.post( |
|
url=endpoint_url, headers=headers, data=body.decode("utf-8") |
|
) |
|
return response.json()["SecretString"] |
|
except httpx.TimeoutException: |
|
raise ValueError("Timeout error occurred") |
|
except httpx.HTTPStatusError as e: |
|
verbose_logger.exception( |
|
"Error reading secret from AWS Secrets Manager: %s", |
|
str(e.response.text), |
|
) |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"Error reading secret from AWS Secrets Manager: %s", str(e) |
|
) |
|
return None |
|
|
|
async def async_write_secret( |
|
self, |
|
secret_name: str, |
|
secret_value: str, |
|
description: Optional[str] = None, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> dict: |
|
""" |
|
Async function to write a secret to AWS Secrets Manager |
|
|
|
Args: |
|
secret_name: Name of the secret |
|
secret_value: Value to store (can be a JSON string) |
|
description: Optional description for the secret |
|
optional_params: Additional AWS parameters |
|
timeout: Request timeout |
|
""" |
|
import uuid |
|
|
|
|
|
data = {"Name": secret_name, "SecretString": secret_value} |
|
if description: |
|
data["Description"] = description |
|
|
|
data["ClientRequestToken"] = str(uuid.uuid4()) |
|
|
|
endpoint_url, headers, body = self._prepare_request( |
|
action="CreateSecret", |
|
secret_name=secret_name, |
|
secret_value=secret_value, |
|
optional_params=optional_params, |
|
request_data=data, |
|
) |
|
|
|
async_client = get_async_httpx_client( |
|
llm_provider=httpxSpecialProvider.SecretManager, |
|
params={"timeout": timeout}, |
|
) |
|
|
|
try: |
|
response = await async_client.post( |
|
url=endpoint_url, headers=headers, data=body.decode("utf-8") |
|
) |
|
response.raise_for_status() |
|
return response.json() |
|
except httpx.HTTPStatusError as err: |
|
raise ValueError(f"HTTP error occurred: {err.response.text}") |
|
except httpx.TimeoutException: |
|
raise ValueError("Timeout error occurred") |
|
|
|
async def async_delete_secret( |
|
self, |
|
secret_name: str, |
|
recovery_window_in_days: Optional[int] = 7, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> dict: |
|
""" |
|
Async function to delete a secret from AWS Secrets Manager |
|
|
|
Args: |
|
secret_name: Name of the secret to delete |
|
recovery_window_in_days: Number of days before permanent deletion (default: 7) |
|
optional_params: Additional AWS parameters |
|
timeout: Request timeout |
|
|
|
Returns: |
|
dict: Response from AWS Secrets Manager containing deletion details |
|
""" |
|
|
|
data = { |
|
"SecretId": secret_name, |
|
"RecoveryWindowInDays": recovery_window_in_days, |
|
} |
|
|
|
endpoint_url, headers, body = self._prepare_request( |
|
action="DeleteSecret", |
|
secret_name=secret_name, |
|
optional_params=optional_params, |
|
request_data=data, |
|
) |
|
|
|
async_client = get_async_httpx_client( |
|
llm_provider=httpxSpecialProvider.SecretManager, |
|
params={"timeout": timeout}, |
|
) |
|
|
|
try: |
|
response = await async_client.post( |
|
url=endpoint_url, headers=headers, data=body.decode("utf-8") |
|
) |
|
response.raise_for_status() |
|
return response.json() |
|
except httpx.HTTPStatusError as err: |
|
raise ValueError(f"HTTP error occurred: {err.response.text}") |
|
except httpx.TimeoutException: |
|
raise ValueError("Timeout error occurred") |
|
|
|
def _prepare_request( |
|
self, |
|
action: str, |
|
secret_name: str, |
|
secret_value: Optional[str] = None, |
|
optional_params: Optional[dict] = None, |
|
request_data: Optional[dict] = None, |
|
) -> tuple[str, Any, bytes]: |
|
"""Prepare the AWS Secrets Manager request""" |
|
try: |
|
from botocore.auth import SigV4Auth |
|
from botocore.awsrequest import AWSRequest |
|
except ImportError: |
|
raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") |
|
optional_params = optional_params or {} |
|
boto3_credentials_info = self._get_boto_credentials_from_optional_params( |
|
optional_params |
|
) |
|
|
|
|
|
_, endpoint_url = self.get_runtime_endpoint( |
|
api_base=None, |
|
aws_bedrock_runtime_endpoint=boto3_credentials_info.aws_bedrock_runtime_endpoint, |
|
aws_region_name=boto3_credentials_info.aws_region_name, |
|
) |
|
endpoint_url = endpoint_url.replace("bedrock-runtime", "secretsmanager") |
|
|
|
|
|
if request_data: |
|
data = request_data |
|
else: |
|
data = {"SecretId": secret_name} |
|
if secret_value and action == "PutSecretValue": |
|
data["SecretString"] = secret_value |
|
|
|
body = json.dumps(data).encode("utf-8") |
|
headers = { |
|
"Content-Type": "application/x-amz-json-1.1", |
|
"X-Amz-Target": f"secretsmanager.{action}", |
|
} |
|
|
|
|
|
request = AWSRequest( |
|
method="POST", url=endpoint_url, data=body, headers=headers |
|
) |
|
SigV4Auth( |
|
boto3_credentials_info.credentials, |
|
"secretsmanager", |
|
boto3_credentials_info.aws_region_name, |
|
).add_auth(request) |
|
prepped = request.prepare() |
|
|
|
return endpoint_url, prepped.headers, body |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|