Ghgg / app.py
Yhhxhfh's picture
Update app.py
5f0c401 verified
raw
history blame
10.5 kB
import os
import json
import pickle
import random
import tempfile
import asyncio
import multiprocessing
import io
import numpy as np
import redis
import uvicorn
import nltk
from nltk.stem import WordNetLemmatizer
from tqdm import tqdm
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Dropout, Input
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.models import load_model, save_model
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from dotenv import load_dotenv
from datetime import datetime
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from transformers import pipeline
from faker import Faker
from random_word import RandomWords
from textgenrnn import textgenrnn
import logging
# Configuración de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
app = FastAPI()
lemmatizer = WordNetLemmatizer()
redis_password = os.getenv("REDIS_PASSWORD")
r = redis.Redis(host=os.getenv("REDIS_HOST"), port=int(os.getenv("REDIS_PORT")), password=redis_password)
def create_intents_json():
intents = {
"intents": [
{
"tag": "greeting",
"patterns": ["Hola", "¿Cómo estás?", "Buenos días"],
"responses": ["¡Hola!", "¿Cómo puedo ayudarte?"],
"date": "2021-01-01"
},
{
"tag": "goodbye",
"patterns": ["Adiós", "Hasta luego", "Nos vemos"],
"responses": ["¡Hasta luego!", "Cuídate!"],
"date": "2021-01-01"
}
]
}
with open('intents.json', 'w') as f:
json.dump(intents, f, ensure_ascii=False, indent=4)
def load_and_filter_data():
with open("intents.json") as file:
intents = json.load(file)
filtered_intents = {"intents": []}
for intent in intents['intents']:
if "date" in intent:
intent_date = datetime.strptime(intent["date"], "%Y-%m-%d")
if intent_date.year >= 2000 and intent_date <= datetime.now():
filtered_intents['intents'].append(intent)
return filtered_intents
if not os.path.exists('models'):
os.makedirs('models')
async def train_and_save_model():
global lemmatizer, r
while True:
words, classes, documents = [], [], []
ignore_words = ['?', '!']
intents = load_and_filter_data()
user_questions = r.lrange('user_questions', 0, -1)
for question in user_questions:
question = question.decode('utf-8')
processed_words = nltk.word_tokenize(question)
documents.append((processed_words, "user_question"))
words.extend(processed_words)
for intent in intents['intents']:
for pattern in intent['patterns']:
processed_words = nltk.word_tokenize(pattern)
documents.append((processed_words, intent['tag']))
words.extend(processed_words)
if intent['tag'] not in classes:
classes.append(intent['tag'])
# Generar contenido adicional para intents.json
fake = Faker()
random_words = RandomWords()
for _ in range(10): # Generar 10 nuevas entradas
random_pattern = f"{fake.sentence()}"
random_tag = random_words.get_random_word()
documents.append((nltk.word_tokenize(random_pattern), random_tag))
words.extend(nltk.word_tokenize(random_pattern))
if random_tag not in classes:
classes.append(random_tag)
words = sorted(set(words))
classes = sorted(set(classes))
training = []
output_empty = [0] * len(classes)
for doc in documents:
bag = []
pattern_words = [lemmatizer.lemmatize(word.lower()) for word in doc[0]]
for w in words:
bag.append(1 if w in pattern_words else 0)
output_row = list(output_empty)
output_row[classes.index(doc[1])] = 1
training.append([bag, output_row])
if not training:
await asyncio.sleep(60)
continue
train_x = np.array([row[0] for row in training])
train_y = np.array([row[1] for row in training])
vectorizer = CountVectorizer()
X = vectorizer.fit_transform([" ".join(doc[0]) for doc in documents]).toarray()
y = [classes.index(doc[1]) for doc in documents]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
if r.exists('chatbot_model'):
with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file:
temp_file.write(r.get('chatbot_model'))
temp_file_name = temp_file.name
model = load_model(temp_file_name)
os.remove(temp_file.name)
else:
input_layer = Input(shape=(len(X_train[0]),))
layer1 = Dense(128, activation='relu')(input_layer)
layer2 = Dropout(0.5)(layer1)
layer3 = Dense(64, activation='relu')(layer2)
layer4 = Dropout(0.5)(layer3)
output_layer = Dense(len(classes), activation='softmax')(layer4)
model = Sequential(layers=[input_layer, layer1, layer2, layer3, layer4, output_layer])
sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True)
model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy'])
model.fit(X_train, y_train, epochs=1, batch_size=len(X_train), verbose=0)
r.set('words', pickle.dumps(words))
r.set('classes', pickle.dumps(classes))
with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file:
save_model(model, temp_file.name)
with open(temp_file.name, 'rb') as f:
r.set('chatbot_model', f.read())
os.remove(temp_file.name)
def generate_synonyms(pattern):
synonyms = []
words = nltk.word_tokenize(pattern)
for word in words:
synsets = nltk.corpus.wordnet.synsets(word)
if synsets:
for syn in synsets:
for lemma in syn.lemmas():
synonyms.append(lemma.name())
return list(set(synonyms))
async def handle_new_message(message: str):
r.rpush('user_questions', message)
await train_and_save_model()
class ChatMessage(BaseModel):
message: str
@app.post("/chat")
async def chat(message: ChatMessage):
words = pickle.loads(r.get('words'))
classes = pickle.loads(r.get('classes'))
with io.BytesIO(r.get('chatbot_model')) as f:
with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file:
temp_file.write(f.read())
temp_file_name = temp_file.name
model = load_model(temp_file_name)
os.remove(temp_file.name)
sentence_words = nltk.word_tokenize(message.message)
bag = [0] * len(words)
for s in sentence_words:
for i, w in enumerate(words):
if w == s:
bag[i] = 1
p = model.predict(np.array([bag]))[0]
ERROR_THRESHOLD = 0.25
results = [[i, p] for i, p in enumerate(p) if p > ERROR_THRESHOLD]
results.sort(key=lambda x: x[1], reverse=True)
return_list = []
for i, p in results:
return_list.append({"intent": classes[i], "probability": str(p)})
await handle_new_message(message.message)
return return_list
@app.post("/tag")
async def tag_question(question: str, tag: str):
r.set(f"tag:{question}", tag)
return {"message": "Etiqueta guardada"}
html_code = """
<!DOCTYPE html>
<html>
<head>
<title>Chatbot</title>
<style>
body {
font-family: sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
}
#container {
background-color: #fff;
border-radius: 5px;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
padding: 30px;
width: 80%;
max-width: 600px;
}
h1 {
text-align: center;
margin-bottom: 20px;
color: #333;
}
#chatbox {
height: 300px;
overflow-y: auto;
padding: 10px;
border: 1px solid #ccc;
border-radius: 5px;
margin-bottom: 10px;
}
#chatbox p {
margin: 5px 0;
}
#user_input {
width: 100%;
padding: 10px;
border: 1px solid #ccc;
border-radius: 5px;
margin-bottom: 10px;
box-sizing: border-box;
}
button {
background-color: #4CAF50;
color: white;
padding: 10px 20px;
border: none;
border-radius: 5px;
cursor: pointer;
}
</style>
</head>
<body>
<div id="container">
<h1>Chatbot</h1>
<div id="chatbox"></div>
<input type="text" id="user_input" placeholder="Escribe tu mensaje...">
<button onclick="sendMessage()">Enviar</button>
</div>
<script>
function sendMessage() {
let userInput = document.getElementById('user_input').value;
document.getElementById('user_input').value = '';
fetch('/chat', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({"message": userInput})
})
.then(response => response.json())
.then(data => {
let chatbox = document.getElementById('chatbox');
chatbox.innerHTML += '<p><b>Tú:</b> ' + userInput + '</p>';
data.forEach(item => {
chatbox.innerHTML += '<p><b>Bot:</b> ' + item.intent + ' (Probabilidad: ' + item.probability + ')</p>';
});
});
}
</script>
</body>
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def root():
return html_code
if __name__ == "__main__":
logger.info(f"Application Startup at {datetime.now()}")
create_intents_json()
asyncio.run(train_and_save_model())
uvicorn.run(app, host="0.0.0.0", port=7860)