import os import json import pickle import tempfile import asyncio import io import numpy as np import redis import uvicorn import nltk from nltk.stem import WordNetLemmatizer from tensorflow.keras import Sequential from tensorflow.keras.layers import Dense, Dropout, Input from tensorflow.keras.optimizers import SGD from tensorflow.keras.models import load_model, save_model from fastapi import FastAPI from fastapi.responses import HTMLResponse from pydantic import BaseModel from dotenv import load_dotenv from faker import Faker import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) load_dotenv() app = FastAPI() lemmatizer = WordNetLemmatizer() redis_password = os.getenv("REDIS_PASSWORD") r = redis.Redis(host=os.getenv("REDIS_HOST"), port=int(os.getenv("REDIS_PORT")), password=redis_password) nltk.download('punkt') nltk.download('wordnet') nltk.download('omw-1.4') nltk.download('averaged_perceptron_tagger') nltk.download('punkt_tab') def create_intents_json(): intents = { "intents": [ { "tag": "greeting", "patterns": ["Hola", "¿Cómo estás?", "Buenos días"], "responses": ["¡Hola!", "¿Cómo puedo ayudarte?"], "date": "2021-01-01" }, { "tag": "goodbye", "patterns": ["Adiós", "Hasta luego", "Nos vemos"], "responses": ["¡Hasta luego!", "Cuídate!"], "date": "2021-01-01" } ] } with open('intents.json', 'w') as f: json.dump(intents, f, ensure_ascii=False, indent=4) def load_and_filter_data(): with open("intents.json") as file: intents = json.load(file) filtered_intents = {"intents": []} for intent in intents['intents']: if "date" in intent: intent_date = datetime.strptime(intent["date"], "%Y-%m-%d") if intent_date.year >= 2000 and intent_date <= datetime.now(): filtered_intents['intents'].append(intent) return filtered_intents if not os.path.exists('models'): os.makedirs('models') async def train_and_save_model(): global lemmatizer, r while True: words, classes, documents = [], [], [] intents = load_and_filter_data() user_questions = r.lrange('user_questions', 0, -1) for question in user_questions: question = question.decode('utf-8') processed_words = nltk.word_tokenize(question) documents.append((processed_words, "user_question")) words.extend(processed_words) for intent in intents['intents']: for pattern in intent['patterns']: processed_words = nltk.word_tokenize(pattern) documents.append((processed_words, intent['tag'])) words.extend(processed_words) if intent['tag'] not in classes: classes.append(intent['tag']) fake = Faker() for _ in range(10): random_pattern = f"{fake.sentence()}" random_tag = fake.word() documents.append((nltk.word_tokenize(random_pattern), random_tag)) words.extend(nltk.word_tokenize(random_pattern)) if random_tag not in classes: classes.append(random_tag) words = sorted(set(words)) classes = sorted(set(classes)) training = [] output_empty = [0] * len(classes) for doc in documents: bag = [] pattern_words = [lemmatizer.lemmatize(word.lower()) for word in doc[0]] for w in words: bag.append(1 if w in pattern_words else 0) output_row = list(output_empty) output_row[classes.index(doc[1])] = 1 training.append([bag, output_row]) if not training: await asyncio.sleep(60) continue train_x = np.array([row[0] for row in training]) train_y = np.array([row[1] for row in training]) if r.exists('chatbot_model'): with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: temp_file.write(r.get('chatbot_model')) temp_file_name = temp_file.name model = load_model(temp_file_name) os.remove(temp_file.name) else: input_layer = Input(shape=(len(train_x[0]),)) layer1 = Dense(128, activation='relu')(input_layer) layer2 = Dropout(0.5)(layer1) layer3 = Dense(64, activation='relu')(layer2) layer4 = Dropout(0.5)(layer3) output_layer = Dense(len(classes), activation='softmax')(layer4) model = Sequential(layers=[input_layer, layer1, layer2, layer3, layer4, output_layer]) sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True) model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy']) model.fit(train_x, train_y, epochs=1, batch_size=len(train_x), verbose=0) r.set('words', pickle.dumps(words)) r.set('classes', pickle.dumps(classes)) with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: save_model(model, temp_file.name) with open(temp_file.name, 'rb') as f: r.set('chatbot_model', f.read()) os.remove(temp_file.name) def generate_synonyms(pattern): synonyms = [] words = nltk.word_tokenize(pattern) for word in words: synsets = nltk.corpus.wordnet.synsets(word) if synsets: for syn in synsets: for lemma in syn.lemmas(): synonyms.append(lemma.name()) return list(set(synonyms)) async def handle_new_message(message: str): r.rpush('user_questions', message) await train_and_save_model() class ChatMessage(BaseModel): message: str @app.post("/chat") async def chat(message: ChatMessage): words = pickle.loads(r.get('words')) classes = pickle.loads(r.get('classes')) with io.BytesIO(r.get('chatbot_model')) as f: with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: temp_file.write(f.read()) temp_file_name = temp_file.name model = load_model(temp_file_name) os.remove(temp_file.name) sentence_words = nltk.word_tokenize(message.message) bag = [0] * len(words) for s in sentence_words: for i, w in enumerate(words): if w == s: bag[i] = 1 p = model.predict(np.array([bag]))[0] ERROR_THRESHOLD = 0.25 results = [[i, p] for i, p in enumerate(p) if p > ERROR_THRESHOLD] results.sort(key=lambda x: x[1], reverse=True) return_list = [] for i, p in results: return_list.append({"intent": classes[i], "probability": str(p)}) await handle_new_message(message.message) return return_list @app.post("/tag") async def tag_question(question: str, tag: str): r.set(f"tag:{question}", tag) return {"message": "Etiqueta guardada"} html_code = """ Chatbot

Chatbot

""" @app.get("/", response_class=HTMLResponse) async def root(): return html_code if __name__ == "__main__": logger.info(f"Application Startup at {datetime.now()}") create_intents_json() asyncio.run(train_and_save_model()) uvicorn.run(app, host="0.0.0.0", port=7860)