import os import json import pickle import random import tempfile import asyncio import multiprocessing import io import numpy as np import redis import uvicorn import nltk from nltk.stem import WordNetLemmatizer from tqdm import tqdm from tensorflow.keras import Sequential from tensorflow.keras.layers import Dense, Dropout, Input from tensorflow.keras.optimizers import SGD from tensorflow.keras.models import load_model, save_model from fastapi import FastAPI from fastapi.responses import HTMLResponse from pydantic import BaseModel from dotenv import load_dotenv from datetime import datetime from sklearn.feature_extraction.text import CountVectorizer from sklearn.model_selection import train_test_split from faker import Faker from random_word import RandomWords from textgenrnn import textgenrnn load_dotenv() app = FastAPI() lemmatizer = WordNetLemmatizer() faker = Faker() r = redis.Redis(host=os.getenv("REDIS_HOST"), port=int(os.getenv("REDIS_PORT")), password=os.getenv("REDIS_PASSWORD")) random_words = RandomWords() textgen = textgenrnn() def create_intents_json(): intents = { "intents": [ { "tag": "greeting", "patterns": [faker.sentence() for _ in range(5)], "responses": ["¡Hola!", "¿Cómo puedo ayudarte?"], "date": datetime.now().strftime("%Y-%m-%d") }, { "tag": "goodbye", "patterns": [faker.sentence() for _ in range(5)], "responses": ["¡Hasta luego!", "Cuídate!"], "date": datetime.now().strftime("%Y-%m-%d") }, { "tag": "random_word", "patterns": [random_words.get_random_word() for _ in range(5)], "responses": [faker.sentence() for _ in range(5)], "date": datetime.now().strftime("%Y-%m-%d") } ] } with open('intents.json', 'w') as f: json.dump(intents, f, ensure_ascii=False, indent=4) def load_and_filter_data(): with open("intents.json") as file: intents = json.load(file) filtered_intents = {"intents": []} for intent in intents['intents']: if "date" in intent: intent_date = datetime.strptime(intent["date"], "%Y-%m-%d") if intent_date.year >= 2000 and intent_date <= datetime.now(): filtered_intents['intents'].append(intent) return filtered_intents if not os.path.exists('models'): os.makedirs('models') async def train_and_save_model(): global lemmatizer, r while True: words, classes, documents = [], [], [] ignore_words = ['?', '!'] intents = load_and_filter_data() user_questions = r.lrange('user_questions', 0, -1) for question in user_questions: question = question.decode('utf-8') processed_words = nltk.word_tokenize(question) documents.append((processed_words, "user_question")) words.extend(processed_words) for intent in intents['intents']: for pattern in intent['patterns']: processed_words = nltk.word_tokenize(pattern) documents.append((processed_words, intent['tag'])) words.extend(processed_words) if intent['tag'] not in classes: classes.append(intent['tag']) for intent in intents['intents']: for pattern in intent['patterns']: synonyms = generate_synonyms(pattern) for synonym in synonyms: processed_words = nltk.word_tokenize(synonym) documents.append((processed_words, intent['tag'])) words.extend(processed_words) words = sorted(set(words)) classes = sorted(set(classes)) training = [] output_empty = [0] * len(classes) for doc in documents: bag = [] pattern_words = [lemmatizer.lemmatize(word.lower()) for word in doc[0]] for w in words: bag.append(1 if w in pattern_words else 0) output_row = list(output_empty) output_row[classes.index(doc[1])] = 1 training.append([bag, output_row]) if not training: await asyncio.sleep(60) continue train_x = np.array([row[0] for row in training]) train_y = np.array([row[1] for row in training]) vectorizer = CountVectorizer() X = vectorizer.fit_transform([" ".join(doc[0]) for doc in documents]).toarray() y = [classes.index(doc[1]) for doc in documents] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) if r.exists('chatbot_model'): with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: temp_file.write(r.get('chatbot_model')) temp_file_name = temp_file.name model = load_model(temp_file_name) os.remove(temp_file.name) else: input_layer = Input(shape=(len(X_train[0]),)) layer1 = Dense(128, activation='relu')(input_layer) layer2 = Dropout(0.5)(layer1) layer3 = Dense(64, activation='relu')(layer2) layer4 = Dropout(0.5)(layer3) output_layer = Dense(len(classes), activation='softmax')(layer4) model = Sequential(layers=[input_layer, layer1, layer2, layer3, layer4, output_layer]) sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True) model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy']) model.fit(X_train, y_train, epochs=1, batch_size=len(X_train), verbose=0) r.set('words', pickle.dumps(words)) r.set('classes', pickle.dumps(classes)) with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: save_model(model, temp_file.name) with open(temp_file.name, 'rb') as f: r.set('chatbot_model', f.read()) os.remove(temp_file.name) def generate_synonyms(pattern): synonyms = [] words = nltk.word_tokenize(pattern) for word in words: synsets = nltk.corpus.wordnet.synsets(word) if synsets: for syn in synsets: for lemma in syn.lemmas(): synonyms.append(lemma.name()) return list(set(synonyms)) async def handle_new_message(message: str): r.rpush('user_questions', message) await train_and_save_model() class ChatMessage(BaseModel): message: str @app.post("/chat") async def chat(message: ChatMessage): words = pickle.loads(r.get('words')) classes = pickle.loads(r.get('classes')) with io.BytesIO(r.get('chatbot_model')) as f: with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: temp_file.write(f.read()) temp_file_name = temp_file.name model = load_model(temp_file_name) os.remove(temp_file.name) sentence_words = nltk.word_tokenize(message.message) bag = [0] * len(words) for s in sentence_words: for i, w in enumerate(words): if w == s: bag[i] = 1 p = model.predict(np.array([bag]))[0] ERROR_THRESHOLD = 0.25 results = [[i, p] for i, p in enumerate(p) if p > ERROR_THRESHOLD] results.sort(key=lambda x: x[1], reverse=True) return_list = [] for i, p in results: return_list.append({"intent": classes[i], "probability": str(p)}) await handle_new_message(message.message) return return_list @app.post("/tag") async def tag_question(question: str, tag: str): r.set(f"tag:{question}", tag) return {"message": "Etiqueta guardada"} html_code = """ Chatbot

Chatbot

""" @app.get("/", response_class=HTMLResponse) async def root(): return html_code if __name__ == "__main__": print("Iniciando la aplicación...") create_intents_json() asyncio.run(train_and_save_model()) uvicorn.run(app, host="0.0.0.0", port=7860)