king007's picture
Duplicate from deepset/wikipedia-assistant
039aebb
import torch
from fastapi import FastAPI, Depends, status
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import time
from typing import Dict, List, Optional
import jwt
from decouple import config
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
JWT_SECRET = config("secret")
JWT_ALGORITHM = config("algorithm")
app = FastAPI()
app.ready = False
device = ("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained('vblagoje/bart_lfqa')
model = AutoModelForSeq2SeqLM.from_pretrained('vblagoje/bart_lfqa').to(device)
_ = model.eval()
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
try:
payload = decodeJWT(jwtoken)
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid
def token_response(token: str):
return {
"access_token": token
}
def signJWT(user_id: str) -> Dict[str, str]:
payload = {
"user_id": user_id,
"expires": time.time() + 6000
}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token_response(token)
def decodeJWT(token: str) -> dict:
try:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["expires"] >= time.time() else None
except:
return {}
class LFQAParameters(BaseModel):
min_length: int = 50
max_length: int = 250
do_sample: bool = False
early_stopping: bool = True
num_beams: int = 8
temperature: float = 1.0
top_k: float = None
top_p: float = None
no_repeat_ngram_size: int = 3
num_return_sequences: int = 1
class InferencePayload(BaseModel):
model_input: str
parameters: Optional[LFQAParameters] = LFQAParameters()
@app.on_event("startup")
def startup():
app.ready = True
@app.get("/healthz")
def healthz():
if app.ready:
return PlainTextResponse("ok")
return PlainTextResponse("service unavailable", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
@app.post("/generate/", dependencies=[Depends(JWTBearer())])
def generate(context: InferencePayload):
model_input = tokenizer(context.model_input, truncation=True, padding=True, return_tensors="pt")
param = context.parameters
generated_answers_encoded = model.generate(input_ids=model_input["input_ids"].to(device),
attention_mask=model_input["attention_mask"].to(device),
min_length=param.min_length,
max_length=param.max_length,
do_sample=param.do_sample,
early_stopping=param.early_stopping,
num_beams=param.num_beams,
temperature=param.temperature,
top_k=param.top_k,
top_p=param.top_p,
no_repeat_ngram_size=param.no_repeat_ngram_size,
num_return_sequences=param.num_return_sequences)
answers = tokenizer.batch_decode(generated_answers_encoded, skip_special_tokens=True,
clean_up_tokenization_spaces=True)
results = []
for answer in answers:
results.append({"generated_text": answer})
return results