|
import os |
|
import json |
|
import pickle |
|
import random |
|
import tempfile |
|
import asyncio |
|
import multiprocessing |
|
import io |
|
import numpy as np |
|
import redis |
|
import uvicorn |
|
import nltk |
|
from nltk.stem import WordNetLemmatizer |
|
from tqdm import tqdm |
|
from tensorflow.keras import Sequential |
|
from tensorflow.keras.layers import Dense, Dropout, Input |
|
from tensorflow.keras.optimizers import SGD |
|
from tensorflow.keras.models import load_model, save_model |
|
from fastapi import FastAPI |
|
from fastapi.responses import HTMLResponse |
|
from pydantic import BaseModel |
|
from dotenv import load_dotenv |
|
from datetime import datetime |
|
from sklearn.feature_extraction.text import CountVectorizer |
|
from sklearn.model_selection import train_test_split |
|
from transformers import pipeline |
|
from faker import Faker |
|
from random_word import RandomWords |
|
from textgenrnn import textgenrnn |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
load_dotenv() |
|
|
|
app = FastAPI() |
|
|
|
lemmatizer = WordNetLemmatizer() |
|
redis_password = os.getenv("REDIS_PASSWORD") |
|
r = redis.Redis(host=os.getenv("REDIS_HOST"), port=int(os.getenv("REDIS_PORT")), password=redis_password) |
|
|
|
def create_intents_json(): |
|
intents = { |
|
"intents": [ |
|
{ |
|
"tag": "greeting", |
|
"patterns": ["Hola", "¿Cómo estás?", "Buenos días"], |
|
"responses": ["¡Hola!", "¿Cómo puedo ayudarte?"], |
|
"date": "2021-01-01" |
|
}, |
|
{ |
|
"tag": "goodbye", |
|
"patterns": ["Adiós", "Hasta luego", "Nos vemos"], |
|
"responses": ["¡Hasta luego!", "Cuídate!"], |
|
"date": "2021-01-01" |
|
} |
|
] |
|
} |
|
with open('intents.json', 'w') as f: |
|
json.dump(intents, f, ensure_ascii=False, indent=4) |
|
|
|
def load_and_filter_data(): |
|
with open("intents.json") as file: |
|
intents = json.load(file) |
|
|
|
filtered_intents = {"intents": []} |
|
for intent in intents['intents']: |
|
if "date" in intent: |
|
intent_date = datetime.strptime(intent["date"], "%Y-%m-%d") |
|
if intent_date.year >= 2000 and intent_date <= datetime.now(): |
|
filtered_intents['intents'].append(intent) |
|
|
|
return filtered_intents |
|
|
|
if not os.path.exists('models'): |
|
os.makedirs('models') |
|
|
|
async def train_and_save_model(): |
|
global lemmatizer, r |
|
while True: |
|
words, classes, documents = [], [], [] |
|
ignore_words = ['?', '!'] |
|
intents = load_and_filter_data() |
|
|
|
user_questions = r.lrange('user_questions', 0, -1) |
|
|
|
for question in user_questions: |
|
question = question.decode('utf-8') |
|
processed_words = nltk.word_tokenize(question) |
|
documents.append((processed_words, "user_question")) |
|
words.extend(processed_words) |
|
|
|
for intent in intents['intents']: |
|
for pattern in intent['patterns']: |
|
processed_words = nltk.word_tokenize(pattern) |
|
documents.append((processed_words, intent['tag'])) |
|
words.extend(processed_words) |
|
if intent['tag'] not in classes: |
|
classes.append(intent['tag']) |
|
|
|
|
|
fake = Faker() |
|
random_words = RandomWords() |
|
for _ in range(10): |
|
random_pattern = f"{fake.sentence()}" |
|
random_tag = random_words.get_random_word() |
|
documents.append((nltk.word_tokenize(random_pattern), random_tag)) |
|
words.extend(nltk.word_tokenize(random_pattern)) |
|
if random_tag not in classes: |
|
classes.append(random_tag) |
|
|
|
words = sorted(set(words)) |
|
classes = sorted(set(classes)) |
|
|
|
training = [] |
|
output_empty = [0] * len(classes) |
|
for doc in documents: |
|
bag = [] |
|
pattern_words = [lemmatizer.lemmatize(word.lower()) for word in doc[0]] |
|
for w in words: |
|
bag.append(1 if w in pattern_words else 0) |
|
|
|
output_row = list(output_empty) |
|
output_row[classes.index(doc[1])] = 1 |
|
training.append([bag, output_row]) |
|
|
|
if not training: |
|
await asyncio.sleep(60) |
|
continue |
|
|
|
train_x = np.array([row[0] for row in training]) |
|
train_y = np.array([row[1] for row in training]) |
|
|
|
vectorizer = CountVectorizer() |
|
X = vectorizer.fit_transform([" ".join(doc[0]) for doc in documents]).toarray() |
|
y = [classes.index(doc[1]) for doc in documents] |
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
if r.exists('chatbot_model'): |
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: |
|
temp_file.write(r.get('chatbot_model')) |
|
temp_file_name = temp_file.name |
|
model = load_model(temp_file_name) |
|
os.remove(temp_file.name) |
|
else: |
|
input_layer = Input(shape=(len(X_train[0]),)) |
|
layer1 = Dense(128, activation='relu')(input_layer) |
|
layer2 = Dropout(0.5)(layer1) |
|
layer3 = Dense(64, activation='relu')(layer2) |
|
layer4 = Dropout(0.5)(layer3) |
|
output_layer = Dense(len(classes), activation='softmax')(layer4) |
|
|
|
model = Sequential(layers=[input_layer, layer1, layer2, layer3, layer4, output_layer]) |
|
sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True) |
|
model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy']) |
|
|
|
model.fit(X_train, y_train, epochs=1, batch_size=len(X_train), verbose=0) |
|
|
|
r.set('words', pickle.dumps(words)) |
|
r.set('classes', pickle.dumps(classes)) |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: |
|
save_model(model, temp_file.name) |
|
with open(temp_file.name, 'rb') as f: |
|
r.set('chatbot_model', f.read()) |
|
os.remove(temp_file.name) |
|
|
|
def generate_synonyms(pattern): |
|
synonyms = [] |
|
words = nltk.word_tokenize(pattern) |
|
for word in words: |
|
synsets = nltk.corpus.wordnet.synsets(word) |
|
if synsets: |
|
for syn in synsets: |
|
for lemma in syn.lemmas(): |
|
synonyms.append(lemma.name()) |
|
return list(set(synonyms)) |
|
|
|
async def handle_new_message(message: str): |
|
r.rpush('user_questions', message) |
|
await train_and_save_model() |
|
|
|
class ChatMessage(BaseModel): |
|
message: str |
|
|
|
@app.post("/chat") |
|
async def chat(message: ChatMessage): |
|
words = pickle.loads(r.get('words')) |
|
classes = pickle.loads(r.get('classes')) |
|
with io.BytesIO(r.get('chatbot_model')) as f: |
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: |
|
temp_file.write(f.read()) |
|
temp_file_name = temp_file.name |
|
model = load_model(temp_file_name) |
|
os.remove(temp_file.name) |
|
|
|
sentence_words = nltk.word_tokenize(message.message) |
|
bag = [0] * len(words) |
|
for s in sentence_words: |
|
for i, w in enumerate(words): |
|
if w == s: |
|
bag[i] = 1 |
|
|
|
p = model.predict(np.array([bag]))[0] |
|
ERROR_THRESHOLD = 0.25 |
|
results = [[i, p] for i, p in enumerate(p) if p > ERROR_THRESHOLD] |
|
results.sort(key=lambda x: x[1], reverse=True) |
|
return_list = [] |
|
for i, p in results: |
|
return_list.append({"intent": classes[i], "probability": str(p)}) |
|
|
|
await handle_new_message(message.message) |
|
|
|
return return_list |
|
|
|
@app.post("/tag") |
|
async def tag_question(question: str, tag: str): |
|
r.set(f"tag:{question}", tag) |
|
return {"message": "Etiqueta guardada"} |
|
|
|
html_code = """ |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<title>Chatbot</title> |
|
<style> |
|
body { |
|
font-family: sans-serif; |
|
background-color: #f4f4f4; |
|
margin: 0; |
|
padding: 0; |
|
display: flex; |
|
justify-content: center; |
|
align-items: center; |
|
min-height: 100vh; |
|
} |
|
#container { |
|
background-color: #fff; |
|
border-radius: 5px; |
|
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1); |
|
padding: 30px; |
|
width: 80%; |
|
max-width: 600px; |
|
} |
|
h1 { |
|
text-align: center; |
|
margin-bottom: 20px; |
|
color: #333; |
|
} |
|
#chatbox { |
|
height: 300px; |
|
overflow-y: auto; |
|
padding: 10px; |
|
border: 1px solid #ccc; |
|
border-radius: 5px; |
|
margin-bottom: 10px; |
|
} |
|
#chatbox p { |
|
margin: 5px 0; |
|
} |
|
#user_input { |
|
width: 100%; |
|
padding: 10px; |
|
border: 1px solid #ccc; |
|
border-radius: 5px; |
|
margin-bottom: 10px; |
|
box-sizing: border-box; |
|
} |
|
button { |
|
background-color: #4CAF50; |
|
color: white; |
|
padding: 10px 20px; |
|
border: none; |
|
border-radius: 5px; |
|
cursor: pointer; |
|
} |
|
</style> |
|
</head> |
|
<body> |
|
<div id="container"> |
|
<h1>Chatbot</h1> |
|
<div id="chatbox"></div> |
|
<input type="text" id="user_input" placeholder="Escribe tu mensaje..."> |
|
<button onclick="sendMessage()">Enviar</button> |
|
</div> |
|
<script> |
|
function sendMessage() { |
|
let userInput = document.getElementById('user_input').value; |
|
document.getElementById('user_input').value = ''; |
|
fetch('/chat', { |
|
method: 'POST', |
|
headers: {'Content-Type': 'application/json'}, |
|
body: JSON.stringify({"message": userInput}) |
|
}) |
|
.then(response => response.json()) |
|
.then(data => { |
|
let chatbox = document.getElementById('chatbox'); |
|
chatbox.innerHTML += '<p><b>Tú:</b> ' + userInput + '</p>'; |
|
data.forEach(item => { |
|
chatbox.innerHTML += '<p><b>Bot:</b> ' + item.intent + ' (Probabilidad: ' + item.probability + ')</p>'; |
|
}); |
|
}); |
|
} |
|
</script> |
|
</body> |
|
</html> |
|
""" |
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def root(): |
|
return html_code |
|
|
|
if __name__ == "__main__": |
|
logger.info(f"Application Startup at {datetime.now()}") |
|
create_intents_json() |
|
asyncio.run(train_and_save_model()) |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|