import json import logging import time from concurrent.futures import ThreadPoolExecutor from contextvars import ContextVar from typing import Any, Dict, Generator, List from anthropic import Anthropic from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import JSONResponse, StreamingResponse, HTMLResponse from fastapi.security import HTTPBearer from starlette.concurrency import run_in_threadpool from pathlib import Path import markdown from pygments.formatters import HtmlFormatter from schemas import OpenAIChatCompletionForm, FilterForm # logger logger = logging.getLogger() # FastAPI app initialization app = FastAPI() security = HTTPBearer() # Context variable for token storage token_context = ContextVar('token', default=None) # Endpoints that don't require authentication PUBLIC_ENDPOINTS = {"/"} # Available Anthropic models AVAILABLE_MODELS = [ "claude-3-haiku-20240307", "claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-5-sonnet-20241022" ] @app.middleware("http") async def auth_middleware(request: Request, call_next): """ Middleware for handling authentication and response logging. Args: request: The incoming HTTP request call_next: The next middleware in the chain Returns: Response: The processed HTTP response """ if request.url.path in PUBLIC_ENDPOINTS: start_time = time.perf_counter() response = await call_next(request) process_time = time.perf_counter() - start_time response.headers["X-Process-Time"] = str(process_time) return response try: auth_header = request.headers.get('Authorization') if not auth_header: raise HTTPException( status_code=401, detail="No authorization header" ) scheme, token = auth_header.split() if scheme.lower() != 'bearer': raise HTTPException( status_code=401, detail="Invalid authentication scheme" ) token_context.set(token) start_time = time.perf_counter() response = await call_next(request) process_time = time.perf_counter() - start_time response.headers["X-Process-Time"] = str(process_time) return response except HTTPException as http_ex: logger.error( f"HTTP Exception - Status: {http_ex.status_code} - " f"Detail: {http_ex.detail} - Path: {request.url.path}" ) return JSONResponse( status_code=http_ex.status_code, content={"detail": http_ex.detail} ) except Exception as e: logger.error( f"Unexpected error in middleware - Error: {str(e)} - " f"Path: {request.url.path}", exc_info=True ) return JSONResponse( status_code=500, content={"detail": "Internal server error"} ) def get_anthropic_client(): """ Get an authenticated Anthropic client using the current token. Returns: Anthropic: An authenticated Anthropic client instance Raises: HTTPException: If no authorization token is found """ token = token_context.get() if not token: raise HTTPException(status_code=401, detail="No authorization token found") return Anthropic(api_key=token) @app.get("/v1") @app.get("/") async def read_root(): """Root endpoint for API health check.""" try: # Lecture du README.md readme_path = Path("README.md") if not readme_path.exists(): return HTMLResponse(content="

README.md non trouvé

") md_text = readme_path.read_text(encoding='utf-8') md_text = '\n'.join(md_text.split('\n')[10:]) # Conversion Markdown vers HTML html = markdown.markdown( md_text, extensions=[ 'markdown.extensions.fenced_code', 'markdown.extensions.tables', 'markdown.extensions.codehilite', 'markdown.extensions.sane_lists' ] ) # Lecture du CSS css_file = Path("main.css") custom_css = css_file.read_text(encoding='utf-8') if css_file.exists() else "" # CSS pour la coloration syntaxique code_css = HtmlFormatter(style='default').get_style_defs('.codehilite') # Construction de la page HTML html_content = f"""
{html}
""" return HTMLResponse(content=html_content) except Exception as e: return HTMLResponse( content=f"

Erreur: {str(e)}

", status_code=500 ) @app.get("/v1/models") @app.get("/models") async def get_models(): """ Get available Anthropic models. Returns: JSONResponse: List of available models and their details """ get_anthropic_client() # Verify token validity models = [ { "id": model_id, "object": "model", "name": f"🤖 {model_id}", "created": int(time.time()), "owned_by": "anthropic", "pipeline": {"type": "custom", "valves": False} } for model_id in AVAILABLE_MODELS ] return JSONResponse( content={ "data": models, "object": "list", "pipelines": True, } ) def stream_message( model: str, messages: List[Dict[str, Any]] ) -> Generator[str, None, None]: """ Stream messages using the specified model. Args: model: The model identifier to use messages: List of messages to process Returns: Generator: Stream of SSE formatted responses """ client = get_anthropic_client() response = client.messages.create( model=model, max_tokens=1024, messages=messages, stream=True ) def event_stream() -> Generator[str, None, None]: message_id = None for chunk in response: if not message_id: message_id = f"chatcmpl-{int(time.time())}" if chunk.type == 'content_block_delta': data = { "id": message_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "delta": { "content": ( chunk.delta.text if hasattr(chunk.delta, 'text') else "" ) }, "logprobs": None, "finish_reason": None, } ], } yield f"data: {json.dumps(data)}\n\n" elif chunk.type == 'content_block_stop': data = { "id": message_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "delta": {}, "logprobs": None, "finish_reason": "stop", } ], } yield f"data: {json.dumps(data)}\n\n" yield "data: [DONE]\n\n" return event_stream() def send_message(model: str, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """ Send a message via the Anthropic provider without streaming. Args: model: The model identifier to use messages: List of messages to process Returns: dict: The formatted response from the model """ client = get_anthropic_client() response = client.messages.create( model=model, max_tokens=1024, messages=messages ) content = response.content[0].text if response.content else "" return { "id": response.id, "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": content, }, "logprobs": None, "finish_reason": "stop", } ], } @app.post("/v1/chat/completions") @app.post("/chat/completions") async def generate_chat_completion(form_data: OpenAIChatCompletionForm): """ Generate chat completions from the model. Args: form_data: The chat completion request parameters Returns: Union[StreamingResponse, dict]: Either a streaming response or a complete message """ messages = [ {"role": message.role, "content": message.content} for message in form_data.messages ] model = form_data.model def job(): """Handle both streaming and non-streaming modes.""" if form_data.stream: return StreamingResponse( stream_message(model=model, messages=messages), media_type="text/event-stream" ) return send_message(model=model, messages=messages) with ThreadPoolExecutor() as executor: return await run_in_threadpool(job) @app.post("/v1/{pipeline_id}/filter/inlet") @app.post("/{pipeline_id}/filter/inlet") async def filter_inlet(pipeline_id: str, form_data: FilterForm): """ Handle inlet filtering for the pipeline. Args: pipeline_id: The ID of the pipeline form_data: The filter parameters Returns: dict: The processed request body """ return form_data.body @app.post("/v1/{pipeline_id}/filter/outlet") @app.post("/{pipeline_id}/filter/outlet") async def filter_outlet(pipeline_id: str, form_data: FilterForm): """ Handle outlet filtering for the pipeline. Args: pipeline_id: The ID of the pipeline form_data: The filter parameters Returns: dict: The processed request body """ return form_data.body