|
from abc import ABC, abstractmethod |
|
from typing import Any, Dict, Optional, Union |
|
|
|
import httpx |
|
|
|
from litellm import verbose_logger |
|
|
|
|
|
class BaseSecretManager(ABC): |
|
""" |
|
Abstract base class for secret management implementations. |
|
""" |
|
|
|
@abstractmethod |
|
async def async_read_secret( |
|
self, |
|
secret_name: str, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> Optional[str]: |
|
""" |
|
Asynchronously read a secret from the secret manager. |
|
|
|
Args: |
|
secret_name (str): Name/path of the secret to read |
|
optional_params (Optional[dict]): Additional parameters specific to the secret manager |
|
timeout (Optional[Union[float, httpx.Timeout]]): Request timeout |
|
|
|
Returns: |
|
Optional[str]: The secret value if found, None otherwise |
|
""" |
|
pass |
|
|
|
@abstractmethod |
|
def sync_read_secret( |
|
self, |
|
secret_name: str, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> Optional[str]: |
|
""" |
|
Synchronously read a secret from the secret manager. |
|
|
|
Args: |
|
secret_name (str): Name/path of the secret to read |
|
optional_params (Optional[dict]): Additional parameters specific to the secret manager |
|
timeout (Optional[Union[float, httpx.Timeout]]): Request timeout |
|
|
|
Returns: |
|
Optional[str]: The secret value if found, None otherwise |
|
""" |
|
pass |
|
|
|
@abstractmethod |
|
async def async_write_secret( |
|
self, |
|
secret_name: str, |
|
secret_value: str, |
|
description: Optional[str] = None, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> Dict[str, Any]: |
|
""" |
|
Asynchronously write a secret to the secret manager. |
|
|
|
Args: |
|
secret_name (str): Name/path of the secret to write |
|
secret_value (str): Value to store |
|
description (Optional[str]): Description of the secret. Some secret managers allow storing a description with the secret. |
|
optional_params (Optional[dict]): Additional parameters specific to the secret manager |
|
timeout (Optional[Union[float, httpx.Timeout]]): Request timeout |
|
Returns: |
|
Dict[str, Any]: Response from the secret manager containing write operation details |
|
""" |
|
pass |
|
|
|
@abstractmethod |
|
async def async_delete_secret( |
|
self, |
|
secret_name: str, |
|
recovery_window_in_days: Optional[int] = 7, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> dict: |
|
""" |
|
Async function to delete a secret from the secret manager |
|
|
|
Args: |
|
secret_name: Name of the secret to delete |
|
recovery_window_in_days: Number of days before permanent deletion (default: 7) |
|
optional_params: Additional parameters specific to the secret manager |
|
timeout: Request timeout |
|
|
|
Returns: |
|
dict: Response from the secret manager containing deletion details |
|
""" |
|
pass |
|
|
|
async def async_rotate_secret( |
|
self, |
|
current_secret_name: str, |
|
new_secret_name: str, |
|
new_secret_value: str, |
|
optional_params: Optional[dict] = None, |
|
timeout: Optional[Union[float, httpx.Timeout]] = None, |
|
) -> dict: |
|
""" |
|
Async function to rotate a secret by creating a new one and deleting the old one. |
|
This allows for both value and name changes during rotation. |
|
|
|
Args: |
|
current_secret_name: Current name of the secret |
|
new_secret_name: New name for the secret |
|
new_secret_value: New value for the secret |
|
optional_params: Additional AWS parameters |
|
timeout: Request timeout |
|
|
|
Returns: |
|
dict: Response containing the new secret details |
|
|
|
Raises: |
|
ValueError: If the secret doesn't exist or if there's an HTTP error |
|
""" |
|
try: |
|
|
|
old_secret = await self.async_read_secret( |
|
secret_name=current_secret_name, |
|
optional_params=optional_params, |
|
timeout=timeout, |
|
) |
|
|
|
if old_secret is None: |
|
raise ValueError(f"Current secret {current_secret_name} not found") |
|
|
|
|
|
create_response = await self.async_write_secret( |
|
secret_name=new_secret_name, |
|
secret_value=new_secret_value, |
|
description=f"Rotated from {current_secret_name}", |
|
optional_params=optional_params, |
|
timeout=timeout, |
|
) |
|
|
|
|
|
new_secret = await self.async_read_secret( |
|
secret_name=new_secret_name, |
|
optional_params=optional_params, |
|
timeout=timeout, |
|
) |
|
|
|
if new_secret is None: |
|
raise ValueError(f"Failed to verify new secret {new_secret_name}") |
|
|
|
|
|
await self.async_delete_secret( |
|
secret_name=current_secret_name, |
|
recovery_window_in_days=7, |
|
optional_params=optional_params, |
|
timeout=timeout, |
|
) |
|
|
|
return create_response |
|
|
|
except httpx.HTTPStatusError as err: |
|
verbose_logger.exception( |
|
"Error rotating secret in AWS Secrets Manager: %s", |
|
str(err.response.text), |
|
) |
|
raise ValueError(f"HTTP error occurred: {err.response.text}") |
|
except httpx.TimeoutException: |
|
raise ValueError("Timeout error occurred") |
|
except Exception as e: |
|
verbose_logger.exception( |
|
"Error rotating secret in AWS Secrets Manager: %s", str(e) |
|
) |
|
raise |
|
|